工作中常有这样的场景,并发执行一些任务,并等待所有的任务执行完成,进行后续处理,这里总结了几种方法用于满足这种业务场景。
### 二、闭锁方式
闭锁是Java早期提供的一种并发锁,其特点是每个任务颁发一个令牌,任务执行完成释放令牌,主进程可以一直阻塞等待所有的令牌被释放,当所有令牌都被释放后,主进程可以继续执行。依据闭锁的这种特效可以满足上面的任务场景。
```jav
工作中常有这样的场景,并发执行一些任务,并等待所有的任务执行完成,进行后续处理,这里总结了几种方法用于满足这种业务场景。
二、闭锁方式
闭锁是Java早期提供的一种并发锁,其特点是每个任务颁发一个令牌,任务执行完成释放令牌,主进程可以一直阻塞等待所有的令牌被释放,当所有令牌都被释放后,主进程可以继续执行。依据闭锁的这种特效可以满足上面的任务场景。
public class CountDownLatchService {
private final CountDownLatch lock;
private final ExecutorService executorService;
private List<Long> aList;
public CountDownLatchService(ExecutorService executorService, List<Long> aList) {
this.executorService = executorService;
this.lock = new CountDownLatch(aList.size());
this.aList = aList;
public void process() throws InterruptedException {
aList.forEach(s -> {
executorService.submit(new Task(s));
lock.await(10, TimeUnit.SECONDS);
class Task implements Runnable {
private Long job;
public Task(Long job) {
this.job = job;
@Override
public void run() {
try {
System.out.println(job);
} finally {
lock.countDown();
三、Future方式
通过for循环提交异步任务执行,返回的Future列表,再通过for循环获取每个Future中的结果。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class FutureService {
private final ExecutorService executorService;
public FutureService(ExecutorService executorService) {
this.executorService = executorService;
public void process() {
List<Long> aList = LongStream.rangeClosed(0, 1000).boxed().collect(Collectors.toList());
List<Future> futures = new ArrayList<>(aList.size());
aList.forEach(s -> {
Future future = executorService.submit(() -> System.out.println(s));
futures.add(future);
futures.forEach(s -> {
try {
s.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
四、CompletableFuture方式
通过Java8新提供的CompletableFuture类,可以通过allOf方法构建一批异步任务对象,然后通过get方法阻塞等待所有任务的完成。
import com.google.common.collect.Lists;
import com.taobao.eagleeye.EagleEye;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
public class ParallelService {
public <T> void parallelExecute(ExecutorService executorService, List<T> list, Consumer<T> func) {
if (CollectionUtils.isEmpty(list)) {
return;
final Object rpcContext = EagleEye.currentRpcContext();
CompletableFuture[] futureList = list.stream().map(s -> CompletableFuture.runAsync(() -> {
try {
EagleEye.setRpcContext(rpcContext);
func.accept(s);
} finally {
EagleEye.clearRpcContext();
}, executorService)).toArray(CompletableFuture[]::new);
wait(futureList);
public <T, R> List<R> parallelGet(ExecutorService executorService, List<T> list, Function<T, R> func) {
if (CollectionUtils.isEmpty(list)) {
return Lists.newArrayList();
final Object rpcContext = EagleEye.currentRpcContext();
CompletableFuture[] futureList = list.stream().map(s -> CompletableFuture.supplyAsync(() -> {
try {
EagleEye.setRpcContext(rpcContext);
r = func.apply(s);
} finally {
EagleEye.clearRpcContext();
return r;
}, executorService)).toArray(CompletableFuture[]::new);
wait(futureList);
List<R> result = new ArrayList<>(list.size());
for (CompletableFuture future : futureList) {
result.add((R) future.getNow(null));
return result;
private void wait(CompletableFuture[] futureList) {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureList);
try {
combinedFuture.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
五、Stream方式
通过向线程池提交一个parallelStream的foreach任务,然后通过get阻塞等待所有任务的完成。需要注意的是线程池必须是ForkJoinPool,因为parallelStream内部实现就是使用的ForkJoinPool。
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class ParallelStreamService {
public void process() throws ExecutionException, InterruptedException {
List<Long> aList = LongStream.rangeClosed(0, 1000).boxed().collect(Collectors.toList());
ForkJoinPool customThreadPool = ForkJoinPool.commonPool();
try {
ForkJoinTask task = customThreadPool.submit(() -> aList.parallelStream().forEach(s -> {
System.out.println(Thread.currentThread().getName() + ":" + s);
task.get();
} finally {
customThreadPool.shutdown();
让线程按顺序执行8种方法
本文使用了7中方法实现在多线程中让线程按顺序运行的方法,涉及到多线程中许多常用的方法,不止为了知道如何让线程按顺序运行,更是让读者对多线程的使用有更深刻的了解。 使用的方法如下:
面经 - 【多线程】现在有T1、T2、T3三个线程,你怎样保证T2在T1执行完后执行,T3在T2执行完后执行?
面经 - 【多线程】现在有T1、T2、T3三个线程,你怎样保证T2在T1执行完后执行,T3在T2执行完后执行?