工作中常有这样的场景,并发执行一些任务,并等待所有的任务执行完成,进行后续处理,这里总结了几种方法用于满足这种业务场景。 ### 二、闭锁方式 闭锁是Java早期提供的一种并发锁,其特点是每个任务颁发一个令牌,任务执行完成释放令牌,主进程可以一直阻塞等待所有的令牌被释放,当所有令牌都被释放后,主进程可以继续执行。依据闭锁的这种特效可以满足上面的任务场景。 ```jav

工作中常有这样的场景,并发执行一些任务,并等待所有的任务执行完成,进行后续处理,这里总结了几种方法用于满足这种业务场景。

二、闭锁方式

闭锁是Java早期提供的一种并发锁,其特点是每个任务颁发一个令牌,任务执行完成释放令牌,主进程可以一直阻塞等待所有的令牌被释放,当所有令牌都被释放后,主进程可以继续执行。依据闭锁的这种特效可以满足上面的任务场景。

public class CountDownLatchService {
    private final CountDownLatch lock;
    private final ExecutorService executorService;
    private List<Long> aList;
    public CountDownLatchService(ExecutorService executorService, List<Long> aList) {
        this.executorService = executorService;
        this.lock = new CountDownLatch(aList.size());
        this.aList = aList;
    public void process() throws InterruptedException {
        aList.forEach(s -> {
            executorService.submit(new Task(s));
        lock.await(10, TimeUnit.SECONDS);
    class Task implements Runnable {
        private Long job;
        public Task(Long job) {
            this.job = job;
        @Override
        public void run() {
            try {
                System.out.println(job);
            } finally {
                lock.countDown();

三、Future方式

通过for循环提交异步任务执行,返回的Future列表,再通过for循环获取每个Future中的结果。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class FutureService {
    private final ExecutorService executorService;
    public FutureService(ExecutorService executorService) {
        this.executorService = executorService;
    public void process() {
        List<Long> aList = LongStream.rangeClosed(0, 1000).boxed().collect(Collectors.toList());
        List<Future> futures = new ArrayList<>(aList.size());
        aList.forEach(s -> {
            Future future = executorService.submit(() -> System.out.println(s));
            futures.add(future);
        futures.forEach(s -> {
            try {
                s.get(10, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();

四、CompletableFuture方式

通过Java8新提供的CompletableFuture类,可以通过allOf方法构建一批异步任务对象,然后通过get方法阻塞等待所有任务的完成。

import com.google.common.collect.Lists;
import com.taobao.eagleeye.EagleEye;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
public class ParallelService {
    public <T> void parallelExecute(ExecutorService executorService, List<T> list, Consumer<T> func) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        final Object rpcContext = EagleEye.currentRpcContext();
        CompletableFuture[] futureList = list.stream().map(s -> CompletableFuture.runAsync(() -> {
            try {
                EagleEye.setRpcContext(rpcContext);
                func.accept(s);
            } finally {
                EagleEye.clearRpcContext();
        }, executorService)).toArray(CompletableFuture[]::new);
        wait(futureList);
    public  <T, R> List<R> parallelGet(ExecutorService executorService, List<T> list, Function<T, R> func) {
        if (CollectionUtils.isEmpty(list)) {
            return Lists.newArrayList();
        final Object rpcContext = EagleEye.currentRpcContext();
        CompletableFuture[] futureList = list.stream().map(s -> CompletableFuture.supplyAsync(() -> {
            try {
                EagleEye.setRpcContext(rpcContext);
                r = func.apply(s);
            } finally {
                EagleEye.clearRpcContext();
            return r;
            }, executorService)).toArray(CompletableFuture[]::new);
        wait(futureList);
        List<R> result = new ArrayList<>(list.size());
        for (CompletableFuture future : futureList) {
            result.add((R) future.getNow(null));
        return result;
    private void wait(CompletableFuture[] futureList) {
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureList);
        try {
            combinedFuture.get(10, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();

五、Stream方式

通过向线程池提交一个parallelStream的foreach任务,然后通过get阻塞等待所有任务的完成。需要注意的是线程池必须是ForkJoinPool,因为parallelStream内部实现就是使用的ForkJoinPool。

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class ParallelStreamService {
    public void process() throws ExecutionException, InterruptedException {
        List<Long> aList = LongStream.rangeClosed(0, 1000).boxed().collect(Collectors.toList());
        ForkJoinPool customThreadPool = ForkJoinPool.commonPool();
        try {
            ForkJoinTask task = customThreadPool.submit(() -> aList.parallelStream().forEach(s -> {
                System.out.println(Thread.currentThread().getName() + ":" + s);
            task.get();
        } finally {
            customThreadPool.shutdown();
                让线程按顺序执行8种方法
            
  本文使用了7中方法实现在多线程中让线程按顺序运行的方法,涉及到多线程中许多常用的方法,不止为了知道如何让线程按顺序运行,更是让读者对多线程的使用有更深刻的了解。 使用的方法如下:
面经 - 【多线程】现在有T1、T2、T3三个线程,你怎样保证T2在T1执行完后执行,T3在T2执行完后执行?
面经 - 【多线程】现在有T1、T2、T3三个线程,你怎样保证T2在T1执行完后执行,T3在T2执行完后执行?