项目中大家基本都使用过线程池,节省线程创建和销毁的成本。Java JDK提供了几种线程池,那么如何选择合适的线程池?这就需要对每一种线程池有较详细的了解,然后根据实际业务类型,选择对应的线程池。 本文主要介绍FixedThreadPool。
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
public
static
ExecutorService
newFixedThreadPool
(
int
nThreads
)
{
return
new
ThreadPoolExecutor
(
nThreads
,
nThreads
,
0
L
,
TimeUnit
.
MILLISECONDS
,
new
LinkedBlockingQueue
<
Runnable
>
(
)
)
;
public
ThreadPoolExecutor
(
int
corePoolSize
,
int
maximumPoolSize
,
long
keepAliveTime
,
TimeUnit unit
,
BlockingQueue
<
Runnable
>
workQueue
)
{
this
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
unit
,
workQueue
,
Executors
.
defaultThreadFactory
(
)
,
defaultHandler
)
;
简要说明,FixedThreadPool,也就是可重用固定线程数的线程池。 它corePoolSize和 maximumPoolSize是一样的。并且他的keepAliveTime=0, 也就是当线程池中的线程数大于corePoolSize, 多余的空闲线程会被***立即***终止。
它的基本执行过程如下
1, 如果当前运行的线程数少于corePoolSize, 会立刻创建新线程执行任务。
2,当线程数到达corePoolSize后,将任务加入到LinkedBlockingQueue中。
3,当线程执行完任务后,会循环从LinkedBlockingQueue中获取任务来执行。
FixedThreadPool使用了LinkedBlockingQueue, 也就是无界队列(队列最大可容纳Integer.MAX_VALUE), 因此会造成以下影响:
a, 线程池线程数到达corePoolSize后,任务会被存放在LinkedBlockingQueue中
b, 因为无界队列,运行中(未调用shutdown()或者shutdownNow()方法)的不会拒绝任务(队列无界,可以放入"无限"任务)
提交到线程池的任务
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
@Getter
class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
@Override
public void run() {
try {
long threadId = Thread.currentThread().getId();
Long duration = (long) (Math.random() * 10);
log.info("Doing a task during : {}, threadId={}" , name , threadId);
TimeUnit.SECONDS.sleep(duration);
log.info("done a task during : {}, threadId={}" , name , threadId);
} catch (InterruptedException e) {
e.printStackTrace();
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class FixedThreadPoolExecutorDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
MyTask task = new MyTask("Task " + i);
if (i == 2) {
try {
TimeUnit.SECONDS.sleep(70);
} catch (InterruptedException e) {
e.printStackTrace();
int activeCount = executor.getActiveCount();
long taskCount = executor.getTaskCount();
BlockingQueue queue = executor.getQueue();
log.info("A new task has been added {}, activeCount={}, taskCount={}, queue={}",
task.getName(), activeCount, taskCount, queue.size());
executor.execute(task);
System.out.println("Maximum threads inside pool " + executor.getMaximumPoolSize());
executor.shutdown();
重点看日志,从日志我们可以看出它的运行过程与前面的分析一致。
11:47:28.963 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 0, activeCount=0, taskCount=0, queue=0
11:47:28.972 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 1, activeCount=1, taskCount=1, queue=0
11:47:28.973 [pool-1-thread-1] INFO com.yq.threadpool.MyTask - Doing a task during : Task 0, threadId=9
11:47:28.976 [pool-1-thread-2] INFO com.yq.threadpool.MyTask - Doing a task during : Task 1, threadId=10
11:47:31.976 [pool-1-thread-1] INFO com.yq.threadpool.MyTask - done a task during : Task 0, threadId=9
11:47:36.976 [pool-1-thread-2] INFO com.yq.threadpool.MyTask - done a task during : Task 1, threadId=10
11:48:38.972 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 2, activeCount=0, taskCount=2, queue=0
11:48:38.973 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 3, activeCount=1, taskCount=3, queue=0
11:48:38.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - Doing a task during : Task 2, threadId=11 (task2 和task3提交时,虽然thread 9和10已经完成了task0和task1,但是线程池为task2生成了thread 11,这是因为线程池设定的线程数是4,现在还没有到达4个线程,所以新提交的任务,就为该任务创建一个线程)
11:48:38.974 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 4, activeCount=2, taskCount=4, queue=0
11:48:38.974 [pool-1-thread-4] INFO com.yq.threadpool.MyTask - Doing a task during : Task 3, threadId=12 (task2 和task3提交时,虽然thread 9和10已经完成了task0和task1,但是线程池位task3生成了thread 12,这是因为线程池设定的线程数是4,现在还没有到达4个线程(现在有3个),所以新提交的任务,就为该任务创建一个线程)
11:48:38.974 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 5, activeCount=2, taskCount=5, queue=1
11:48:38.974 [pool-1-thread-1] INFO com.yq.threadpool.MyTask - Doing a task during : Task 4, threadId=9 (线程总数应达到4个了,新的任务需要等待,然后循环使用以前的线程,这里使用thread 9)
11:48:38.975 [pool-1-thread-1] INFO com.yq.threadpool.MyTask - done a task during : Task 4, threadId=9
11:48:38.975 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 6, activeCount=3, taskCount=6, queue=1
11:48:38.975 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 7, activeCount=3, taskCount=7, queue=1
11:48:38.975 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 8, activeCount=3, taskCount=8, queue=2
11:48:38.975 [main] INFO com.yq.threadpool.FixedThreadPoolExecutorDemo2 - A new task has been added Task 9, activeCount=3, taskCount=9, queue=3
Maximum threads inside pool 4
11:48:38.975 [pool-1-thread-2] INFO com.yq.threadpool.MyTask - Doing a task during : Task 5, threadId=10
11:48:38.976 [pool-1-thread-1] INFO com.yq.threadpool.MyTask - Doing a task during : Task 6, threadId=9
11:48:41.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - done a task during : Task 2, threadId=11
11:48:41.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - Doing a task during : Task 7, threadId=11
11:48:41.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - done a task during : Task 7, threadId=11
11:48:41.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - Doing a task during : Task 8, threadId=11
11:48:43.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - done a task during : Task 8, threadId=11
11:48:43.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - Doing a task during : Task 9, threadId=11
11:48:43.976 [pool-1-thread-2] INFO com.yq.threadpool.MyTask - done a task during : Task 5, threadId=10
11:48:45.974 [pool-1-thread-3] INFO com.yq.threadpool.MyTask - done a task during : Task 9, threadId=11
11:48:45.976 [pool-1-thread-1] INFO com.yq.threadpool.MyTask - done a task during : Task 6, threadId=9
11:48:46.974 [pool-1-thread-4] INFO com.yq.threadpool.MyTask - done a task during : Task 3, threadId=12
Process finished with exit code 0
项目中大家基本都使用过线程池,节省线程创建和销毁的成本。Java JDK提供了几种线程池,那么如何选择合适的线程池?这就需要对每一种线程池有较详细的了解,然后根据实际业务类型,选择对应的线程池。 本文主要介绍FixedThreadPool。定义 /** * Creates a thread pool that reuses a fixed number of threads ...
1、 ThreadPoolExecutor 数据成员
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 主要用于存储线程池的工作状态以及池中正在运行的线程数。显然要在一个整型变量存储两个数据,只能将其一分为二。其中高3bit用于存储线程池的状态,低位的29bit用于存储正在运行的线程数。
线程...
· 使用自定义ThreadPoolExecutor
· 使用Executors.newCachedThreadPool()
· 使用Executors.newFixedThreadPool(int)
· 使用Executors.newSingleThreadExecutor()
其中使用2,3,4来创建线程池时,其内部也是通过ThreadPoolExecutor来生成线程池的。我们来分析下ThreadPoolExecutor的构造参数以及内部实现。
ThreadPoolExecutor完整的构造方法如下(其
线程池的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,那么超出数量的线程排队等候,等其他线程执行完毕再从队列中取出任务来执行。
在开发过程中,合理地使用线程池能够带来3个好处:
降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
提高响应速度。 当任务到达时,任务可以不需要等到线程创建就能立即执行;
提高线程的可管理性。 线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。
创建线程池。
对要循环的处理的数据进行 foreach 操作。
在上一步的循环处理操作中为 Future 的 List 添加元素,返回值从线程池对象的 submit 方法中通过 Callable 的方式获取或使用 lambda 表达式操作。
在 Callable 或 lambda 表达式 中编写要处理的业务逻辑并将返回值返回。
在 foreach 循环外停止线程池。
while 循环,如果线程没有处理结束
(1)每次new Thread新建对象,性能差
(2)线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM
(3)缺少更多的功能,如更多执行、定期执行、线程中断
2.线程池的好处
(1)重用存在的线程,减少对象创建、消亡的开销,性能佳
(2)可以有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
各个参数的含义
corePoolSize:线程池初始化时,核心线程数大小。
maximumPoolSize:线程池最大线程数。当核心线程用完了,队列里也装满了,会创建新的线程来执行任务,但不能超过最大线程数。
keepAliveTime:线...
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线
程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程
也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
10.1.2 Execut...
ThreadPoolExecutor(线程池执行器)是Java中一个多线程执行器,它允许程序员在程序中实现线程池,从而减少了线程的启动和销毁所花费的时间和资源,从而使程序更具有可扩展性和可靠性。
通过ThreadPoolExecutor,我们可以定义一个线程池的核心线程数、最大线程数、线程生命周期、任务队列、拒绝策略等参数,从而使线程池的行为更加符合我们的需求。
在ThreadPoolExecutor中,一个线程池包含三个重要的元素:任务队列、线程和线程池管理器。其中,任务队列用于存放等待执行的任务;线程是用于执行任务的执行线程,它们会从任务队列中取出任务并执行;线程池管理器用于管理线程,包括线程池的创建、销毁、线程的数量控制等。
线程池执行器的设计思想是:避免频繁地创建和销毁线程,因为这样会占用较多的系统资源。而使用线程池可以重复利用已经创建好的线程,从而降低线程创建和销毁的开销,提高效率。
总之,ThreadPoolExecutor是Java中一个非常强大的多线程执行器,它为我们提供了一种优化程序性能的有效方式,使得我们的程序可以更加高效、稳定地运行。