public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
......
Executors提供了一系列获取线程池的静态方法,相当于线程池工厂,是对ThreadPoolExecutor的一个封装,简化了用户切换Executor和ExecutorService的各种实现细节。
ThreadPoolExecutor是对Executor以及ExecutorService的实现,提供了具体的线程池实现。
4. ExecutorService的生命周期?
这个问题在上面已经做了解说,ExecutorService的生命周期通过接口定义可以分为运行中,关闭,终止三种状态。
ThreadPoolExecutor在具体实现上提供了更加详细的五种状态:RUNNING、SHUTDOWN、STOP、TIDYING以及TERMINATED。各种状态的说明以及转换可以看上一个问题的答案。
5. 线程池中的线程能设置超时吗?
线程池中的线程是可以进行超时控制的,通过ExecutorService的submit来提交任务,这样会返回一个Future类型的结果,来看看Future接口的代码:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
复制代码
Future定义了get()以及get(long timeout, TimeUnit unit)方法,get()方法会阻塞当前调用,一直到获取到返回结果,get(long timeout, TimeUnit unit)会在指定时间内阻塞,当超时后会抛出TimeoutException错误。这样就可以达到线程超时控制的目的。简单使用示例如下:
Future<String> future = executor.submit(callable);
try {
future.get(2000, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
} catch (ExecutionException e1) {
} catch (TimeoutException e1) {
复制代码
这里有一个问题就是因为get方法是阻塞的---通过LockSupport.park实现,那么线程池中线程比较多的情况下要怎么获取每个线程的超时呢?这里除了自定义线程池实现或者自定义线程工厂来实现之外,使用ThreadPoolExecutor本身的功能我也没想到更好的办法。有一个非常笨的解决方案是开启同线程池数量相等的线程进行监听。大家如果有更好的办法可以留言提出。
6. 怎么取消线程池中的线程?
这个问题和上面的问题解决方案一样,同样也是通过ExecutorService的submit来提交任务,获取Future,调用Future中的cancel方法来达到目的。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
复制代码
cancel方法有一个mayInterruptIfRunning参数,当为true时,代表任务能接受并处理中断,调用了interrupt方法。如果为false,代表如果任务没启动就不要运行它,不会调用interrupt方法。
取消的本质实际上还是通过interrupt来实现的,这就是说,如果线程本身不能响应中断,就算调用了cancel方法也没用。一般情况下通过lockInterruptibly、park和await方法阻塞的线程都是能响应中断的,运行中的线程就需要开发者自己实现中断了。
7. 如何设置一个合适的线程池大小?
如何设置一个合适的线程池大小,这个问题我觉得是没有一个固定公式的。或者可以说,只有一些简单的设置规则,但放到具体业务中,又各有不同,只能根据现场环境测试过后再来分析。
设置合适的线程池大小分为两部分,一部分是最大线程池大小,一部分是最小线程池大小。在ThreadPoolExecutor中体现在最大线程数(maximumPoolSize)和核心线程数(corePoolSize)。
最大线程池大小的设置首先跟当前机器cpu核心数密切相关,一般情况来说要想最大化利用cpu,设置为cpu核心数就可以了,比如4核cpu服务器可以设置为4。但实际情况又大有不同,因为往往我们执行的任务都会涉及到IO,比如任务中执行了一个从数据库查询数据的操作,那么这段时间cpu实际上是没有最大化利用的,这样我们就可以适当扩大maximumPoolSize的大小。在有些情况下任务会是cpu密集型的,如果这样设置更多的线程不仅不会提高效率,反而因为线程的创建销毁以及切换开销而大大降低了效率,所以说最大线程池的大小需要根据业务情况具体测试后才能设置一个合适的大小。
最小线程池大小相比较最大线程池大小设置起来相对容易一些,因为最小线程一般来说是可以根据业务情况来预估进行设置,比如大多数情况下会有2个任务在运行,很小概率会有超过2个任务运行,那么直接设置最小线程池大小为2就可以。但有一点需要知道的是每间隔多长时间会有超过2个任务,如果每2分钟会有一次超过2个任务的情况,那么我们可以将线程过期时间设置的稍微久一点,比如4分钟,这样就算频繁的超过2个任务,也可以利用缓存的线程池。
总的来说设置最大和最小线程池都是一个没有固定公式的问题,都需要考虑实际业务情况和机器配置,根据实际业务情况多做测试才能做到最优化设置。在一切没有决定之前,可以使用软件架构的KISS原则,设置最大以及最小线程数都为cpu核心数即可,后续在做优化。
8. 当使用有界队列时,如何设置一个合适的队列大小?
要设置合适的队列大小,先要明白队列什么时候会被使用。在ThreadPoolExecutor的实现中,使用队列的情况有点特殊。它会先使用核心线程池大小的线程,之后会将任务加入队列中,再之后队列满了之后才会扩大到最大线程池大小的线程。也就是说队列的使用并不是等待线程不够用了才使用,而是等待核心线程不够用了就使用。我不是太能理解这样设计的意图,按《Java性能权威权威指南》一书中的说法是这样提供了两个节流阀,第一个是队列,第二个是最大线程池。但这样做并不能给使用者最优的体验,既然要使用最大线程池,那为什么不在第一次就使用呢?
知道了ThreadPoolExecutor使用线程池的时机,那么再来预估合适的队列大小就很方便了。如果单个任务执行时间在100ms,最小线程数是2,使用者能忍受的最大延时在2s,那么我们可以这样简单推算出队列大小:2/2s/100ms=10,这样满队列时最大延时就在2s之内。当然还有其他一些影响因素,比如部分任务超过或者小于100ms,最大线程池的利用等等,可以在这基础上做简单调整。
9. 当使用有界队列时,如果队列已满,如何选择合适的拒绝策略?
ThreadPoolExecutor中提供了四种RejectedExecutionHandler,每种分工都比较明确,选择起来并不困难。它们分别是:AbortPolicy、DiscardPolicy、DiscardOldestPolicy以及CallerRunsPolicy。下面贴出了他们的源码并做了简单说明,使用的时候可以根据需要自行选择。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
复制代码
10. 如何统计线程池中的线程执行时间?
要统计线程池中的线程执行时间,就需要了解线程池中的线程是在什么地方,什么时机执行的?知道了线程的执行状态,然后在线程执行前后添加自己的处理就可以了,所以先来找到ThreadPoolExecutor中线程具体执行的代码:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread()
Runnable task = w.firstTask
w.firstTask = null
w.unlock()
boolean completedAbruptly = true
try {
while (task != null || (task = getTask()) != null) {
w.lock()
// If pool is stopping, ensure thread is interrupted
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt()
try {
beforeExecute(wt, task)
Throwable thrown = null
try {
task.run()
} catch (RuntimeException x) {
thrown = x
} catch (Error x) {
thrown = x
} catch (Throwable x) {
thrown = x
} finally {
afterExecute(task, thrown)
} finally {
task = null
w.completedTasks++
w.unlock()
completedAbruptly = false
} finally {
processWorkerExit(w, completedAbruptly)
复制代码
可以看到runWorker方法中在task.run()也就是任务执行前后分别执行了beforeExecute以及afterExecute方法,着两个方法在ThreadPoolExecutor的继承类中都是可重写的,提供了极大的灵活性。我们可以在继承ThreadPoolExecutor之后在任务执行前后做任何自己需要做的事情,当然也就包括任务执行的时间统计了。
顺便说一句,熟悉spring源码的同学看到这里是不是发现和spring中的postprocesser前后置处理器有异曲同工之妙?区别在于一个是通过继承来覆盖,一个是通过接口来实现。
其实线程池框架涉及到的问题远不止这些,包括ThreadFactory、ForkJoinPool等等还有很多值得花时间研究的地方。本文也只是阅读jdk源码、《Java并发编程实战》以及《Java性能优化权威指南》之后的一点点总结,如有错误遗漏的地方,希望大家能多多指出。
参考资料:
《Java并发编程实战》
《Java性能优化权威指南》