线程池使用 FutureTask 时如果把拒绝策略设置为 DiscardPolicy DiscardOldestPolicy ,并且在被拒绝的任务的Future对象上调用了无参get方法,那么调用线程会一直被阻塞

import java . util . concurrent . * ; * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/11/21 0:11 * @mark: show me the code , change the world public class FutureTest { // 1 线程池单个线程,队列大小为1 - 初始化线程池 private final static ThreadPoolExecutor tpe = new ThreadPoolExecutor ( 1 , 1 , 1 , TimeUnit . MINUTES , new ArrayBlockingQueue < Runnable > ( 1 ) , new ThreadPoolExecutor . DiscardPolicy ( ) ) ; public static void main ( String [ ] args ) throws ExecutionException , InterruptedException { // 2 添加你任务1 Future futureOne = tpe . submit ( ( ) -> { System . out . println ( "开始处理业务1" ) ; try { Thread . sleep ( 2000 ) ; } catch ( InterruptedException e ) { e . printStackTrace ( ) ; System . out . println ( "业务1执行结束" ) ; return "Result1" ; } ) ; // 3 添加你任务2 Future futureTwo = tpe . submit ( ( ) -> { System . out . println ( "开始处理业务2" ) ; System . out . println ( "业务2执行结束" ) ; return "Result2" ; } ) ; // 4 添加任务3 Future futureThree = null ; try { futureThree = tpe . submit ( ( ) -> System . out . println ( "开始处理业务3" ) ) ; } catch ( Exception e ) { System . out . print ( e . getLocalizedMessage ( ) ) ; // 5 等待任务1执行完毕 System . out . println ( "任务1返回结果: " + futureOne . get ( ) ) ; // 6 等待任务2执行完毕 System . out . println ( "任务2返回结果: " + futureTwo . get ( ) ) ; // 7 等待任务3执行完毕 System . out . println ( "任务3返回结果: " + futureThree == null ? null : futureThree . get ( ) ) ; //关闭线程池,阻塞知道所有任务执行完毕 tpe . shutdown ( ) ;

让我们来分析下

  • 代码(1)创建了一个单线程和一个队列元素个数为1的线程池,并且把拒绝策略设置为 DiscardPolicy。

  • 代码(2)向线程池提交了一个异步任务one,并且这个任务会由唯一的线程来执行,任务在打印【开始处理业务1】 后会阻塞该线程2s。

  • 代码(3)向线程池提交了一个异步任务two,这时候会把任务two放入阻塞队列。

  • 代码(4)向线程池提交任务three,由于队列已满所以触发拒绝策略丢弃任务three。

  • 从执行结果看,在任务one阻塞的5s内,主线程执行到了代码(5)并等待任务one执行完毕,当任务one执行完毕后代码(5)返回,主线程打印出【任务1 null】。任务one执行完成后线程池的唯一线程会去队列里面取出任务two并执行,所以输出【开始处理业务2】,然后代码(6)返回,这时候主线程输出【任务2 null】。然后执行代码(7)等待任务three执行完毕。

  • 从执行结果看,代码(7)会一直阻塞而不会返回,至此问题产生。如果把拒绝策略修改为DiscardOldestPolicy,也会存在有一个任务的get方法一直阻塞,只是现在是任务two被阻塞

  • 但是如果把拒绝策略设置为默认的AbortPolicy则会正常返回,并且会输出如下结果

开始处理业务1
Task java.util.concurrent.FutureTask@27bc2616 rejected from java.util.concurrent.ThreadPoolExecutor@3941a79c[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]业务1执行结束
任务1返回结果: Result1
开始处理业务2
业务2执行结束
任务2返回结果:  Result2
Exception in thread "main" java.lang.NullPointerException
	at com.artisan.bfzm.chapter11.FutureTest.main(FutureTest.java:58)

要分析这个问题,需要看线程池的submit方法都做了什么,submit方法的代码如下

* @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 1 装饰Runnable对象为Future对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); // 6 返回Future对象 return ftask;

看下 execute方法

* Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null public void execute(Runnable command) { if (command == null) throw new NullPointerException(); * Proceed in 3 steps: * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. // 2 如果线程个数小于核心线程数量,则新增线程处理 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); // 3. 如果线程都在工作且当前线程个数已经达到核心线程数,就把任务放入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); // 4 尝试新增处理线程 else if (!addWorker(command, false)) // 5 新增失败则触发拒绝策略 reject(command);
  • 代码(1)装饰Runnable为FutureTask对象,然后调用线程池的execute方法。

  • 代码(2)判断如果线程个数小于核心线程数则新增处理线程。

  • 代码(3)判断如果当前线程个数已经达到核心线程数则将任务放入队列 。

  • 代码(4)尝试新增处理线程。失败则执行代码(5),否则直接使用新线程处理。

  • 代码(5)执行具体拒绝策略,从这里也可以看出,使用业务线程执行拒绝策略。

  • 所以要找到上面例子中问题所在,只需要看代码(5)对被拒绝任务的影响,这里先看下拒绝策略DiscardPolicy的代码。

    * A handler for rejected tasks that silently discards the * rejected task. public static class DiscardPolicy implements RejectedExecutionHandler { * Creates a {@code DiscardPolicy}. public DiscardPolicy() { } * Does nothing, which has the effect of discarding task r. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    拒绝策略的rejectedExecution方法什么都没做,代码(4)调用submit后会返回一个Future对象。

    Future是有状态的,Future的状态枚举值如下

    Java Review - 线程池使用FutureTask的小坑_FutureTask_07
    在代码(1)中使用newTaskFor方法将Runnable任务转换为FutureTask,

       protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
         * Creates a {@code FutureTask} that will, upon running, execute the
         * given {@code Callable}.
         * @param  callable the callable task
         * @throws NullPointerException if the callable is null
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
    

    而在FutureTask的构造函数里面设置的状态就是NEW。

    所以使用DiscardPolicy策略提交后返回了一个状态为NEW的Future对象。

    那么我们下面就需要看下当调用Future的无参get方法时Future变为什么状态才会返回,那就要看下FutureTask的get()方法代码。

       public V get() throws InterruptedException, ExecutionException {
            int s = state;
            //当前状态值 <= COMPLETING需要等待,否则调用report返回 
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
         * Returns result or throws exception for completed task.
         * @param s completed state value
        @SuppressWarnings("unchecked")
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            // 状态值为NORMAL的时候正常返回
            if (s == NORMAL)
                return (V)x;
           // 状态值大于等于CANCELLED的时候抛出异常
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
    
    • 也就是说,当Future的状态>COMPLETING时调用get方法才会返回,而明显DiscardPolicy策略在拒绝元素时并没有设置该Future的状态,后面也没有其他机会可以设置该Future的状态,所以Future的状态一直是NEW,所以一直不会返回。

    • 同理,DiscardOldestPolicy策略也存在这样的问题,最老的任务被淘汰时没有设置被淘汰任务对应Future的状态。

    • 那么默认的AbortPolicy策略为啥没问题呢?其实在执行AbortPolicy策略时,代码(5)会直接抛出RejectedExecutionException异常,也就是submit方法并没有返回Future对象,这时候futureThree是null。

    • * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. final void reject(Runnable command) { handler.rejectedExecution(command, this);
      • 所以当使用Future时,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于一直等待,超时时间到了就会自动返回。

      • 如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略,在执行策略时设置该Future的状态大于COMPLETING即可。但是我们查看FutureTask提供的方法,会发现只有cancel方法是public的,并且可以设置FutureTask的状态大于COMPLETING,则重写拒绝策略的具体代码如下。

      • import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/11/21 1:40 * @mark: show me the code , change the world public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()){ if (null != r && r instanceof FutureTask) { ((FutureTask) r).cancel(true);

        使用这个策略时,由于在cancel的任务上调用get()方法会抛出异常,所以代码(7)需要使用try-catch块捕获异常,因此将代码(7)修改为如下所示。

        Java Review - 线程池使用FutureTask的小坑_get方法_11
        执行结果为

        当然这相比正常情况多了一个异常捕获操作。最好的情况是,重写拒绝策略时设置FutureTask的状态为NORMAL,但是这需要重写FutureTask方法,因为FutureTask并没有提供接口让我们设置。

        通过案例介绍了在线程池中使用FutureTask时,当拒绝策略为DiscardPolicy和DiscardOldestPolicy时,在被拒绝的任务的FutureTask对象上调用get()方法会导致调用线程一直阻塞,所以在日常开发中尽量使用带超时参数的get方法以避免线程一直阻塞。