private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
} finally {
mainLock.unlock();
if (workerAdded) {
t.start();
workerStarted = true;
} finally {
if (! workerStarted)
addWorkerFailed(w);
return workerStarted;
Worker
类是什么?
Worker
类为ThreadPoolExecutor
类的一个内部类,对线程池中的线程进行了包装,并提供了调度等待队列中任务的能力。
Worker
类的成员变量
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker
的构造方法
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
可以看到 Worker
集成了AQS, 同时实现了Runnable接口,在构造方法中已经完成了Thread线程的创建,那么其run()
方法干了什么呢?
Worker的run()
方法
public void run() {
runWorker(this);
可以看到其run方法中调用了runWorker()
方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} finally {
task = null;
w.completedTasks++;
w.unlock();
getTask()
方法
private Runnable getTask() {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
至此,线程池在何时通过何种方式调起任务队列中等待的任务就已经大致清晰了,其流程大致如下:
- 当调用
ThreadPoolExecutor
的execute()
方法时,会调用addWorker()
方法; addWorker()
中会创建Worker类的实例,该实例对线程池中的线程进行了包装,其实现了Runnable接口,Worker实例创建成功后会调用线程的start()
方法;- 在
Worker
的run方法中通过runTask()
方法执行任务,并循环通过getTask()
方法获取队列中等待的任务去执行。
任务调度
任务调度是线程池的主要入口(execute(Runnable r)),接下来任务如何执行都是有这个阶段决定的;
所有的任务调度都是有execute方法完成,检查现在线程池的运行状态,运行的线程数,运行策略,决定接下来执行的流程,是直接申请线程执行,还是缓冲到队列中执行,亦或是直接拒绝该任务;
执行过程如下:
1.首先检查线程池的运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING状态下执行任务
2.如果workCount < corePoolSiz
要执行的任务数 >核心线程数,并且 < 最大连接数 并且 等待队列已满,则创建新线程;
要执行的任务数 >核心线程数,并且>最大连接数 并且 等待队列已满,则调用拒绝策略来处理该任务(抛异常)
线程池线程池线程池可以看做是。它的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后 启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕, 再从队列中取出任务来执行。他的主要特点为:线程复用;控制最大并发数;管理线程。每一个 Thread 的类都有一个 start 方法。当调用 start 启动线程时 Java 虚拟机会调用该类的 run 方法。那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法。
先说使用:
ThreadPoolExecutor类参数,核心线程数,最大线程数(连核心数算在里面),非核心线程销毁之前等待任务的最长时间,前面时间的单位,等待队列大小。
提交优先级:核心线程>等待队列>非核心线程
执行优先级:核心线程>非核心线程>等待队列
如果核心线程10个,最大线程数20,等待队列10,在执行30个线程的时候。
会先执行任务1-10,然后执行任务21-30,最后执行任务11-20.
首先任务1-10会直接给核心线程执行,核心线程满了之后,任务会被放到等待队列;等待
首先检测线程池运行状态,如果不是running,则直接拒绝。
如果workCount < corePoolSize,则创建并启动一个线程线程来执行提交任务。
如果workCoun
要了解线程池的执行流程,我们首先就要知道什么是线程池?线程池就是里面含有若干线程的容器,没有任务时,线程池里面的这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行任务。如果所有的线程都处于忙碌状态,线程池就会创建一个新的线程,亦或者是将任务放到工作队列中等待。
大部分并发应用程序都是围绕“任务执行”来构造的:任务通常是一些抽象的且离散的工作单元。
一、在线程中执行任务
当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的,独立性有助于实现并发。
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序尽可能支持多的用户,从而降低每个用户的服务成本,而...