其中,获取源文件这一步,如果使用for循环去排队获取(下载),10个10M的文件可能就需要30s(依照当前带宽情况而定)。
如果使用多线程去下载,最大可以将时间缩短到3s。因为各个文件的下载并不存在串行条件。
而拼接操作的执行必须得先保证源文件全部下载成功,所以在这个时候我们就必须感知到子线程的结果和是否发生异常:
当我们想获取子线程执行结果(或者是批量获取)时候该怎么办?
结果是如何被获取的?为什么可以获取到?是不是有什么操作可以获取到来自未来的东西?
异常是怎么捕获的?不是说子线程的异常主线程没法感知吗?那这里通过什么入口感知的呢?
今天我们从使用场景和原理层面来聊聊
Future 和 Callable
一、为什么需要Future 和 Callable
我们先从老朋友
Runnable
讲起。
我们知道,
Runnable
有如下两个缺陷:
不能返回值
不能抛出checked exception
那为什么它要这样设计呢?
如果说
Run
方法可以抛出异常,那谁来处理这个异常?执行
Thread.start()
的
Main
吗?这显然是不合适的,所以才建议在
Run
方法里面进行
try...catch
。
那如果我非要拿到返回值,非要抛出异常呢?这个时候就引出了
Callable
和
Future
。
二、Future 和 Callable有什么用?它们是什么关系?
Callable
可以理解为
Runnable
的一个扩展:
public interface Callable<V> {
V call() throws Exception;
它既可以返回一个值,也可以抛出异常。那它的异常抛出后怎么接收呢?谁在接收?返回值又怎么获取?
不要着急,且看Future
。
Callable
描述的是计算任务,Future
表示一个任务的生命周期,可以理解为,它存放了一个未来的值。
public interface Future<V> {
* 用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false
boolean cancel(boolean mayInterruptIfRunning);
* 表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true
boolean isCancelled();
* 表示任务是否已经完成,若任务完成,则返回true
boolean isDone();
* 用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
V get() throws InterruptedException, ExecutionException;
* 在一定时间内阻塞获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
现在有一个这样的场景:
如果有一个很耗时的计算任务,这个时候我想开一个子线程去执行,主线程继续干自己的事情,等某一个时候,主线程想知道计算结果了,再去拿,这个该如何实现?
我们可以使用Future.get
来获取Callable
接口返回的执行结果。
在callable.call()方法未执行完成之前,调用future.get()方法的线程会被阻塞,直到结果返回。
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
System.out.println("我现在需要一个很复杂的计算结果,开一个子线程去算");
Future<Integer> future = service.submit(new CallableTask());
System.out.println("开启完毕,子线程已经开始算了,我继续处理我的工作");
try {
Thread.sleep(2000);
System.out.println("我想知道计算结果了");
System.out.println("获取到计算结果: " + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
service.shutdown();
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程开始复杂的计算");
Thread.sleep(3000);
return new Random().nextInt();
三、结果获取和异常接收是如何实现的?
Future
接口定义了一系列的方法,用于控制任务和获取任务的结果。
我们找一个典型的实现来分析一下,FutureTask
他实现了RunnableFuture
接口,而RunnableFuture
继承了Runnable
和Future
public class FutureTask<V> implements RunnableFuture<V> {......}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
那我们先来看看这货是怎么使用的:
public class FutureTaskDemo {
public static void main(String[] args) {
CallableTask callableTask = new CallableTask();
FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
new Thread(futureTask).start();
try {
System.out.println("任务运行结果:" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程正在计算");
Thread.sleep(3000);
return 1;
1、Future和Callable是如何结合的?
FutureTask
在被线程执行的时候,其实线程的start
方法会去调用Runnable
的run
,FutureTask
是实现了Runnable
接口的,我们看看他的run方法干了什么:
public void run() {
......
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
......
if (ran)
set(result);
} finally {
......
还记得我们在构造FutureTask
的时候需要传入一个Callable
吗?在这儿就调用了Callable
的call
方法。
2、来自未来的结果又放在哪儿?
在上面那段代码中,调用call
方法后,是不是使用了一个result去接收返回的结果,那这个结果显然就是计算的结果了嘛。
然后就有一行:set(result);
我们来追踪一下它被设置到了哪儿:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
那我们找到这个成员变量以及说明:
private Object outcome;
当我们使用get()
的时候,就是从这儿拿值咯。
3、调用get时候是如何被阻塞又如何唤醒的?
我们来看下面一张图
我们先看一下这段代码:
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(20);
Future<Integer> future = service.submit(
() -> {
System.out.println(Thread.currentThread().getName() + ": 子线程准备抛出异常");
throw new IllegalArgumentException("Callable抛出异常");
try {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "数到数字: " + i);
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + "判断子线程任务是否执行完成: " + future.isDone());
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + "得到InterruptedException异常");
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + "得到ExecutionException异常");
执行结果:
根据这个结果我们可以看出:
只有主线程执行get()
方法的时候,才能感知到子线程抛出的异常
虽然我们抛出的是IllegalArgumentException
,但是仍然捕获的是ExecutionException
** 那这个异常又是如何被设置的?**
我们再来到run()
方法:
public void run() {
......
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
if (ran)
set(result);
} finally {
......
子线程自己在捕获到异常后,调用了setException()
方法将任务状态置为 "EXCEPTIONAL":
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
然后我们来到get()
方法中调用的report()
方法:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
如果任务状态为 "EXCEPTIONAL",则抛出ExecutionException
四、任务可以取消吗?
使用cancel()
方法可以达到取消的效果,但不可能做到想取消就可以立即取消,它分为如下3种情况:
如果任务还没有开始执行,那这种情况最简单,任务会被正常取消,未来了也不会被执行,方法返回true
如果任务已完成,或者已取消,那cancel()
方法会执行失败,返回false
如果任务已经开始执行,那么执行cancel()
方法并不会直接取消该任务,而是根据我们传入的参数mayInterruptIfRunning
做判断:
mayInterruptIfRunning = true
: 发送中断信号给正在执行的线程,并将任务状态置为 "中断"(至于线程响不响应这个中断信号,那就看线程自己了)
mayInterruptIfRunning = false
: 不发送中断信号,将任务状态置为 "CANCELLED"
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
System.out.println(Thread.currentThread().getName() + ": 我现在需要一个很复杂的计算结果,开一个子线程去算");
Future<Integer> future = service.submit(new CallableTask());
System.out.println(Thread.currentThread().getName() + ": 开启完毕,子线程已经开始算了,我继续处理我的工作");
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + ": 执行取消子线程的任务");
System.out.println(Thread.currentThread().getName() + ": 取消结果: " + future.cancel(true));
Thread.sleep(4000);
System.out.println(Thread.currentThread().getName() + ": 准备获取计算结果");
System.out.println("获取到计算结果: " + future.get());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ": InterruptedException");
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println(Thread.currentThread().getName() + ": ExecutionException");
e.printStackTrace();
} catch (CancellationException e){
System.out.println(Thread.currentThread().getName() + ": CancellationException");
e.printStackTrace();
service.shutdown();
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + ": 子线程开始复杂的计算");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + ": 子线程结束复杂的计算");
return 0;
使用future.cancel(true)
(因为sleep期间可以响应中断,所以收到了中断信号):
五、如何批量获取任务结果?
我们最先想到的方法就是,使用一个Future
数组:
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(20);
ArrayList<Future> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Future<Integer> future = service.submit(new CallableTask());
futures.add(future);
for (int i = 0; i < 20; i++) {
Future<Integer> future = futures.get(i);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt();
但是仔细思考一下,如果是这样的场景呢:
第一个任务执行需要1分钟,都是后面的执行都只需要10s不到。那这个时候,其实主线程是会阻塞在等待第一个任务执行结束再去拿到其他的结果。
这种场景可以使用两种方式解决:
使用带有timeout
的get
使用CompletableFuture
今天我们先从场景入手:当我们想获取子线程执行结果时候该怎么办?从而引出了Future
和Callable
。
然后我们开始思考:
结果是如何被获取的?为什么可以获取到?是不是有什么操作可以获取到来自未来的东西?
异常是怎么捕获的?不是说子线程的异常主线程没法感知吗?那这里通过什么入口感知的呢?
我执行cancel
方法真的取消了正在执行的任务吗?
这些问题我们从 “用法 + 源码” 的层面进行了一一的解读。