相关文章推荐
强健的豌豆  ·  C# 类(Class) | 菜鸟教程·  2 年前    · 
帅气的菠菜  ·  java swing ...·  2 年前    · 
不爱学习的野马  ·  SQL ...·  2 年前    · 

1. Work-Stealing算法

Work-Stealing算法的理念在于让空闲的线程从忙碌的线程的双端队列中偷取任务 .

默认情况下, 一个工作线程从它自己内部的双端队列的头部获取任务. 当线程的的队列中没有任务,它从另外的繁忙的线程的双端队列(或者全局的双端队列)的尾部获取任务,因为队列的尾部是最有可能存在还未执行的任务.

这种方式减小了线程之间对任务的竞争的可能性,它也使得线程以最大可能性去获取可执行的线程,因为它们总是在最有可能存在还未执行的任务的地方寻找任务.

2. 线程池的组成

一般来说实现一个线程池主要包括以下几个组成部分:

  • 线程管理器 :用于创建并管理线程池 。

  • 工作线程 :线程池中实际执行任务的线程 。 在初始化线程时会预先创建好固定数目的线程在池中 ,这些初始化的线程一般是处于空闲状态 ,不消耗CPU,占用较小的内存空间 。

  • 任务接口 :每个任务必须实现的接口 ,当线程池中的可执行的任务时 ,被工作线程调试执行。 把任务抽象出来形成任务接口 ,可以做到线程池与具体的任务无关 。

  • 任务队列 :用来存放没有处理的任务,提 供一种缓冲机制。 实现这种结构有好几种方法 ,常用的是队列 ,主要是利用它先进先出的工作原理;另外一种是链表之类的数据结构 ,可以动态为它分配内存空间 ,应用中比较灵活

3. 步骤说明

创建和启动一个Task类,但是不同的是线程池中的每一个线程都有一个本地队列。线程池通过一个任务调度器来分配任务,当主程序创建了一个Task后,由于创建这个Task的线程不是线程池中的线程,则任务调度器会把该Task放入全局队列中。
step1
下面的演示图,Task1和Task2都是主程序创建的,因此都是放在全局队列中,当工作者线程处理Task2时,创建了一个Task3,此时Task3被放入本地队列

为什么要设计本地队列?这样做的优势是充分利用并行。随着越来越多线程竞争工作项,所有的线程访问单一的队列并不是最优的,并且也不安全。所以,将任务放入本地队列,并且由同一个线程处理,这就避免了竞争。

本地队列中的Task,线程会按照LIFO的方式去处理。这是因为在大多数场景下,最后创建的Task可能仍然在cache中,处理它能够提供缓存命中率。显然这意味放弃部分公平性而保证性能。如下面的演示图,

工作者线程1创建了Task2,Task2创建了Task3,Task4,Task5,但最先处理的还是Task5。

线程窃取work stealing

当 A线程开始执行的时候,优先总是处理本地队列中的任务,当它发现本地队列已经空了,那么它会去全局队列中获取Task,当全局队列中也是空的,那么就会发 生工作窃取(work stealing)。任务调度器会把该线程池中额外的任务分配给A线程处理,其效果就好比该线程会才从其他线程的队列中“窃取”一个Task来执行。这样的目的是提高了cpu的使用效率

4. 源码

* StealThreadPool.h * Created on: 2019-9-16 * Author: fasiondog # pragma once # ifndef HIKYUU_UTILITIES_THREAD_STEALTHREADPOOL_H # define HIKYUU_UTILITIES_THREAD_STEALTHREADPOOL_H //#include <fmt/format.h> # include <future> # include <thread> # include <chrono> # include <vector> # include "ThreadSafeQueue.h" # include "WorkStealQueue.h" namespace hku { * @brief 分布偷取式线程池 * @note 主要用于存在递归情况,任务又创建任务加入线程池的情况,否则建议使用普通的线程池 * @details * @ingroup ThreadPool class StealThreadPool { public : * 默认构造函数,创建和当前系统CPU数一致的线程数 StealThreadPool ( ) : StealThreadPool ( std :: thread :: hardware_concurrency ( ) ) { } * 构造函数,创建指定数量的线程 * @param n 指定的线程数 explicit StealThreadPool ( size_t n ) : m_done ( false ) , m_init_finished ( false ) , m_worker_num ( n ) { try { for ( size_t i = 0 ; i < m_worker_num ; i ++ ) { // 创建工作线程及其任务队列 m_queues . push_back ( std :: unique_ptr < WorkStealQueue > ( new WorkStealQueue ) ) ; m_threads . push_back ( std :: thread ( & StealThreadPool :: worker_thread , this , i ) ) ; } catch ( . . . ) { m_done = true ; throw ; m_init_finished = true ; * 析构函数,等待并阻塞至线程池内所有任务完成 ~ StealThreadPool ( ) { if ( ! m_done ) { join ( ) ; /** 获取工作线程数 */ size_t worker_num ( ) const { return m_worker_num ; /** 先线程池提交任务后返回的对应 future 的类型 */ template < typename ResultType > using task_handle = std :: future < ResultType > ; /** 向线程池提交任务 */ template < typename FunctionType > task_handle < typename std :: result_of < FunctionType ( ) > :: type > submit ( FunctionType f ) { typedef typename std :: result_of < FunctionType ( ) > :: type result_type ; std :: packaged_task < result_type ( ) > task ( f ) ; task_handle < result_type > res ( task . get_future ( ) ) ; if ( m_local_work_queue ) { // 本地线程任务从前部入队列(递归成栈) // 因为在大多数场景下,最后创建的Task可能仍然在cache中,处理它能够提供缓存命中率 // 显然这意味放弃部分公平性而保证性能 m_local_work_queue - > push_front ( std :: move ( task ) ) ; } else { m_master_work_queue . push ( std :: move ( task ) ) ; m_cv . notify_one ( ) ; return res ; /** 返回线程池结束状态 */ bool done ( ) const { return m_done ; * 等待各线程完成当前执行的任务后立即结束退出 void stop ( ) { m_done = true ; // 同时加入结束任务指示,以便在dll退出时也能够终止 for ( size_t i = 0 ; i < m_worker_num ; i ++ ) { m_queues [ i ] - > push_front ( std :: move ( FuncWrapper ( ) ) ) ; m_cv . notify_all ( ) ; // 唤醒所有工作线程 for ( size_t i = 0 ; i < m_worker_num ; i ++ ) { if ( m_threads [ i ] . joinable ( ) ) { m_threads [ i ] . join ( ) ; * 等待并阻塞至线程池内所有任务完成 * @note 至此线程池能工作线程结束不可再使用 void join ( ) { // 指示各工作线程在未获取到工作任务时,停止运行 for ( size_t i = 0 ; i < m_worker_num ; i ++ ) { m_master_work_queue . push ( std :: move ( FuncWrapper ( ) ) ) ; // 唤醒所有工作线程 m_cv . notify_all ( ) ; // 等待线程结束 for ( size_t i = 0 ; i < m_worker_num ; i ++ ) { if ( m_threads [ i ] . joinable ( ) ) { m_threads [ i ] . join ( ) ; m_done = true ; private : typedef FuncWrapper task_type ; std :: atomic_bool m_done ; // 线程池全局需终止指示 bool m_init_finished ; // 线程池是否初始化完毕 size_t m_worker_num ; // 工作线程数量 std :: condition_variable m_cv ; // 信号量,无任务时阻塞线程并等待 std :: mutex m_cv_mutex ; // 配合信号量的互斥量 ThreadSafeQueue < task_type > m_master_work_queue ; // 主线程任务队列 std :: vector < std :: unique_ptr < WorkStealQueue > > m_queues ; // 任务队列(每个工作线程一个) std :: vector < std :: thread > m_threads ; // 工作线程 // 线程本地变量 inline static thread_local WorkStealQueue * m_local_work_queue = nullptr ; // 本地任务队列 inline static thread_local size_t m_index = 0 ; //在线程池中的序号 inline static thread_local bool m_thread_need_stop = false ; // 线程停止运行指示 void worker_thread ( size_t index ) { m_thread_need_stop = false ; m_index = index ; m_local_work_queue = m_queues [ m_index ] . get ( ) ; while ( ! m_thread_need_stop && ! m_done ) { run_pending_task ( ) ; void run_pending_task ( ) { // 从本地队列提前工作任务,如本地无任务则从主队列中提取任务 // 如果主队列中提取的任务是空任务,则认为需结束本线程,否则从其他工作队列中偷取任务 task_type task ; if ( pop_task_from_local_queue ( task ) ) { task ( ) ; std :: this_thread :: yield ( ) ; } else if ( pop_task_from_master_queue ( task ) ) { if ( ! task . isNullTask ( ) ) { task ( ) ; std :: this_thread :: yield ( ) ; } else { m_thread_need_stop = true ; } else if ( pop_task_from_other_thread_queue ( task ) ) { task ( ) ; std :: this_thread :: yield ( ) ; } else { // std::this_thread::yield(); std :: unique_lock < std :: mutex > lk ( m_cv_mutex ) ; m_cv . wait ( lk , [ = ] { return this - > m_done || ! this - > m_master_work_queue . empty ( ) ; } ) ; bool pop_task_from_master_queue ( task_type & task ) { return m_master_work_queue . try_pop ( task ) ; bool pop_task_from_local_queue ( task_type & task ) { return m_local_work_queue && m_local_work_queue - > try_pop ( task ) ; bool pop_task_from_other_thread_queue ( task_type & task ) { // 线程池尚未初始化化完成时,其他任务队列可能尚未创建 // 此时不能从其他队列偷取任务 if ( ! m_init_finished ) { return false ; for ( size_t i = 0 ; i < m_worker_num ; ++ i ) { size_t index = ( m_index + i + 1 ) % m_worker_num ; if ( m_queues [ index ] - > try_steal ( task ) ) { return true ; return false ; } ; // namespace hku } /* namespace hku */ # endif /* HIKYUU_UTILITIES_THREAD_STEALTHREADPOOL_H */

References:

[1]. http://www.danielmoth.com/Blog/New-And-Improved-CLR-4-Thread-Pool-Engine.aspx

[2]. https://www.cnblogs.com/ok-wolf/p/7761755.html

[3]. github中Work Stealing Pool源码实现

import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; * 线程池 * 1.固定个数
WorkStealingPool 的作用 这是Java8新增的创建 线程池 的方法,如果不主动设置它的并发数,那么这个方法就会以当前机器的CPU处理器个数为线程个数,这个 线程池 会并行处理任务,不能够保证任务执行的顺序。 public class TestMain { //格式化 static SimpleDateFormat sim = new SimpleDateFo...
一、一般来说实现一个 线程池 主要包括以下几个组成部分: 1)线程管理器 :用于创建并管理 线程池 。 2)工作线程 : 线程池 中实际执行任务的线程 。 在初始化线程时会预先创建好固定数目的线程在池中 ,这些初始化的线程一般是处于空闲状态 ,不消耗CPU,占用较小的内存空间 。 3)任务接口 :每个任务必须实现的接口 ,当 线程池 中的可执行的任务时 ,被工作线程调试执行。 把任务抽象出来形成任务接口 ...
import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.ut...