在阿里巴巴手册中有一条建议:
【强制】线程池不允许使用 Executors 去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
如果经常基于Executors提供的工厂方法创建线程池,很容易忽略线程池内部的实现。特别是拒绝策略,因使用Executors创建线程池时不会传入这个参数,直接采用默认值,所以常常被忽略。
下面我们就来了解一下线程池相关的实现原理、API以及实例。
线程池的作用
在实践应用中创建线程池主要是为了:
ThreadPoolExecutor
ThreadPoolExecutor可以实现线程池的创建。ThreadPoolExecutor相关类图如下:
从类图可以看出,ThreadPoolExecutor最终实现了Executor接口,是线程池创建的真正实现者。
Executor两级调度模型
Executor模型
在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。在Java虚拟机层面,用户将多个任务提交给Executor框架,Executor负责分配线程执行它们;在操作系统层面,操作系统再将这些线程分配给处理器执行。
ThreadPoolExecutor的三个角色
ThreadPoolExecutor接受两种类型的任务:Callable和Runnable。
任务执行器
Executor框架最核心的接口是Executor,它表示任务的执行器。
通过上面类图可以看出,Executor的子接口为ExecutorService。再往底层有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor(集成自ThreadPoolExecutor)。
Future接口表示异步的执行结果,它的实现类为FutureTask。
三个角色之间的处理逻辑图如下:
FutureTask逻辑
线程池处理流程
线程池处理流程
一个线程从被提交(submit)到执行共经历以下流程:
线程池在执行execute方法时,主要有以下四种情况:
线程池执行excute方法
线程池采取上述的流程进行设计是为了减少获取全局锁的次数。在线程池完成预热(当前运行的线程数大于或等于corePoolSize)之后,几乎所有的excute方法调用都执行步骤二。
线程的状态流转
顺便再回顾一下线程的状态的转换,在JDK中Thread类中提供了一个枚举类,例举了线程的各个状态:
- public enum State {
- NEW,
- RUNNABLE,
- BLOCKED,
- WAITING,
- TIMED_WAITING,
- TERMINATED;
- }
一共定义了6个枚举值,其实代表的是5种类型的线程状态:
线程关系转换图:
线程状态转换
当new Thread()说明这个线程处于NEW(新建状态);调用Thread.start()方法表示这个线程处于RUNNABLE(运行状态);但是RUNNABLE状态中又包含了两种状态:READY(就绪状态)和RUNNING(运行中)。调用start()方法,线程不一定获得了CPU时间片,这时就处于READY,等待CPU时间片,当获得了CPU时间片,就处于RUNNING状态。
在运行中调用synchronized同步的代码块,没有获取到锁,这时会处于BLOCKED(阻塞状态),当重新获取到锁时,又会变为RUNNING状态。在代码执行的过程中可能会碰到Object.wait()等一些等待方法,线程的状态又会转变为WAITING(等待状态),等待被唤醒,当调用了Object.notifyAll()唤醒了之后线程执行完就会变为TERMINATED(终止状态)。
线程池的状态
线程池中状态通过2个二进制位(bit)来表示线程池的5个状态:RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED:
ThreadPoolExecutor API
ThreadPoolExecutor创建线程池API:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
参数解释:
适当的阻塞队列
当创建的线程数等于corePoolSize,会将任务加入阻塞队列(BlockingQueue),维护着等待执行的Runnable对象。
阻塞队列通常有如下类型:
明确的拒绝策略
当任务处理不过来时,线程池开始执行拒绝策略。
支持的拒绝策略:
线程池关闭
Executors
Executors是一个帮助类,提供了创建几种预配置线程池实例的方法:newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool等。
如果查看源码就会发现,Executors本质上就是实现了几类默认的ThreadPoolExecutor。而阿里巴巴开发手册,不建议采用Executors默认的,让使用者直接通过ThreadPoolExecutor来创建。
Executors.newSingleThreadExecutor()
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
- new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
该类型线程池的结构图:
newSingleThreadExecutor
该线程池的特点:
Executors.newFixedThreadPool()
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
- new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
该类型线程池的结构图:
newFixedThreadPool
该线程池的特点:
Executors.newCachedThreadPool()
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
- new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
该类型线程池的结构图:
newCachedThreadPool
Executors.newScheduledThreadPool()
创建一个定长线程池,支持定时及周期性任务执行。
- new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
该线程池类图:
newScheduledThreadPool
该线程池的特点:
Executors.newWorkStealingPool()
JDK8引入,创建持有足够线程的线程池支持给定的并行度,并通过使用多个队列减少竞争。
- public static ExecutorService newWorkStealingPool() {
- return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- null, true);
- }
Executors方法的弊端
1)newFixedThreadPool 和 newSingleThreadExecutor:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。2)newCachedThreadPool 和 newScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
合理配置线程池大小
合理配置线程池,需要先分析任务特性,可以从以下角度来进行分析:
另外,还需要查看系统的内核数:
- Runtime.getRuntime().availableProcessors());
根据任务所需要的CPU和IO资源可以分为:
任务实现类:
- /**
- * 任务实现线程
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class MyThread implements Runnable{
- private final Integer number;
- public MyThread(int number){
- this.number = number;
- }
- public Integer getNumber() {
- return number;
- }
- @Override
- public void run() {
- try {
- // 业务处理
- TimeUnit.SECONDS.sleep(1);
- System.out.println("Hello! ThreadPoolExecutor - " + getNumber());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
自定义阻塞提交的ThreadLocalExcutor:
- /**
- * 自定义阻塞提交的ThreadPoolExecutor
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class CustomBlockThreadPoolExecutor {
- private ThreadPoolExecutor pool = null;
- /**
- * 线程池初始化方法
- */
- public void init() {
- // 核心线程池大小
- int poolSize = 2;
- // 最大线程池大小
- int maxPoolSize = 4;
- // 线程池中超过corePoolSize数目的空闲线程最大存活时间:30+单位TimeUnit
- long keepAliveTime = 30L;
- // ArrayBlockingQueue<Runnable> 阻塞队列容量30
- int arrayBlockingQueueSize = 30;
- pool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime,
- TimeUnit.SECONDS, new ArrayBlockingQueue<>(arrayBlockingQueueSize), new CustomThreadFactory(),
- new CustomRejectedExecutionHandler());
- }
- /**
- * 关闭线程池方法
- */
- public void destroy() {
- if (pool != null) {
- pool.shutdownNow();
- }
- }
- public ExecutorService getCustomThreadPoolExecutor() {
- return this.pool;
- }
- /**
- * 自定义线程工厂类,
- * 生成的线程名词前缀、是否为守护线程以及优先级等
- */
- private static class CustomThreadFactory implements ThreadFactory {
- private final AtomicInteger count = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- String threadName = CustomBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
- t.setName(threadName);
- return t;
- }
- }
- /**
- * 自定义拒绝策略对象
- */
- private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- // 核心改造点,将blockingqueue的offer改成put阻塞提交
- try {
- executor.getQueue().put(r);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 当提交任务被拒绝时,进入拒绝机制,实现拒绝方法,把任务重新用阻塞提交方法put提交,实现阻塞提交任务功能,防止队列过大,OOM
- */
- public static void main(String[] args) {
- CustomBlockThreadPoolExecutor executor = new CustomBlockThreadPoolExecutor();
- // 初始化
- executor.init();
- ExecutorService pool = executor.getCustomThreadPoolExecutor();
- for (int i = 1; i < 51; i++) {
- MyThread myThread = new MyThread(i);
- System.out.println("提交第" + i + "个任务");
- pool.execute(myThread);
- }
- pool.shutdown();
- try {
- // 阻塞,超时时间到或者线程被中断
- if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
- // 立即关闭
- executor.destroy();
- }
- } catch (InterruptedException e) {
- executor.destroy();
- }
- }
- }
看似简单的线程池创建,其中却蕴含着各类知识,融合贯通,根据具体场景采用具体的参数进行设置才能够达到最优的效果。
总结一下就是:
参考文章:
[1]https://www.jianshu.com/p/94852bd1a283
[2]https://blog.csdn.net/jek123456/article/details/90601351
[3]https://blog.csdn.net/z_s_z2016/article/details/81674893
[4]https://zhuanlan.zhihu.com/p/33264000
[5]https://www.cnblogs.com/semi-sub/p/13021786.html