相关文章推荐
英勇无比的石榴  ·  ROW_NUMBER() OVER ...·  5 月前    · 
孤独的橙子  ·  c# - Convert ...·  1 年前    · 

boost::asio提供了一个跨平台的异步编程IO模型库,io_service类在多线程编程模型中提供了任务队列和任务分发功能。

io_service最常用的接口是:run, post, stop。

本文简要介绍io_service的使用,详细内容可以参阅相关reference。

启动一个线程

使用run()启动。

run()会阻塞,直到:

  • 所有的任务已经完成并且没有任务需要分发处理
  • 或者调用了stop()

简单测试例程如下:

#include <boost/asio/io_service.hpp>
#include <iostream>
int main(int argc, char *argv[])
		// 创建io_service
    boost::asio::io_service io_service;
    // 附加任务
    boost::asio::io_service::work work(io_service);
		// 启动并阻塞
    io_service.run();
    std::cout << "Yes, run() is returned!" << std::endl;
    return 0;

注意,程序打印文本,也就是说io_service阻塞在了run()。我们使用了work类,它会使得run()一直在任务待执行而不会退出,如果注释掉这一行,可以看到run会立即执行完毕并打印文本。

使用work时会阻塞run,可以使用work类型的指针来结束所有任务。

boost::asio::io_service io_service;
boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service ));
// 结束任务
work.reset();
// 立即返回
io_service.run();
启动线程池

在多个线程中调用run()即可开启线程池,io_service负责执行任务处理。所有在线程池中等待的线程是平等的,io_service会随机选择一个线程去执行任务。

使用thread库

这里使用了boost::thread库,例子如下:

#include <boost/asio/io_service.hpp>
#include <boost/thread/thread.hpp>
#include <iostream>
boost::asio::io_service io_service;
void WorkerThread()
	std::cout << "Thread Start\n";
	io_service.run();
	std::cout << "Thread Finish\n";
int main( int argc, char * argv[] )
	boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service ));
	std::cout << "Press [return] to exit." << std::endl;
	boost::thread_group worker_threads;
	for( int x = 0; x < 4; ++x )
		worker_threads.create_thread( WorkerThread );
	std::cin.get();
	io_service.stop();
	worker_threads.join_all();
	return 0;

stop()会告知io_service,所有的任务需要终止。它的调用可能会使已经进入队列的任务得不到执行。

如果想让它们都执行,最好先销毁work对象。如果想让系统马上停止,则调用stop()。

使用bind

使用boost::bind可以把函数调用包装成为对象。

调用常规函数的示例:

#include <boost/bind.hpp>
#include <iostream>
void F2( int i, float f )
	std::cout << "i: " << i << std::endl;
	std::cout << "f: " << f << std::endl;
int main( int argc, char * argv[] )
	boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象
	return 0;

调用类成员函数示例:

class MyClass
public:
	void F3( int i, float f )
		std::cout << "i: " << i << std::endl;
		std::cout << "f: " << f << std::endl;
int main( int argc, char * argv[] )
	MyClass c;
	boost::bind( &MyClass::F3, &c, 42, 3.14f )();
	return 0;

回到io_service,使用bind可以改写程序:

#include <boost/asio/io_service.hpp>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <iostream>
boost::mutex global_stream_lock;
void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service )
	global_stream_lock.lock();
	std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl;
	global_stream_lock.unlock();
	io_service->run();
	global_stream_lock.lock();
	std::cout << "[" << boost::this_thread::get_id() <<	"] Thread Finish" << std::endl;
	global_stream_lock.unlock();
int main( int argc, char * argv[] )
	boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service);
	boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service ));
	global_stream_lock.lock();
	std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl;
	global_stream_lock.unlock();
	boost::thread_group worker_threads;
	for( int x = 0; x < 4; ++x )
		worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) );
	std::cin.get();
	io_service->stop();
	worker_threads.join_all();
	return 0;
post任务

至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。

可以使用post/dispatch分发任务,它们的区别是:

  • 如果可以,dispatch会立即执行任务,否则把任务加入到queue
  • post只会把任务加入到队列

具体使用何种方式需要根据具体情况确定。

如下的程序打印数字:

boost::mutex global_stream_lock;
void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service )
	global_stream_lock.lock();
	std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl;
	global_stream_lock.unlock();
	io_service->run();
	global_stream_lock.lock();
	std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl;
	global_stream_lock.unlock();
void Dispatch( int x )
	global_stream_lock.lock();
	std::cout << "[" <<  boost::this_thread::get_id()  << "] " << __FUNCTION__  << " x = " << x <<  std::endl;
	global_stream_lock.unlock();
void Post( int x )
	global_stream_lock.lock();
	std::cout << "[" <<  boost::this_thread::get_id()  << "] " << __FUNCTION__  << " x = " << x <<  std::endl;
	global_stream_lock.unlock();
void Run3( boost::shared_ptr< boost::asio::io_service > io_service )
	for( int x = 0; x < 3; ++x )
		io_service->dispatch( boost::bind( &Dispatch, x * 2 ) );
		io_service->post( boost::bind( &Post, x * 2 + 1 ) );
		boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) );
int main( int argc, char * argv[] )
	boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service);
	boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service ));
	global_stream_lock.lock();
	std::cout << "[" <<  boost::this_thread::get_id() << "] The program will exit when all  work has finished." <<  std::endl;
	global_stream_lock.unlock();
	boost::thread_group worker_threads;
	for( int x = 0; x < 1; ++x )
		worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) );
	io_service->post( boost::bind( &Run3, io_service ) );
	work.reset();
	worker_threads.join_all();
	return 0;

可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。

strand顺序处理

strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。

#include <boost/asio/io_service.hpp>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio/strand.hpp>
#include <iostream>
boost::mutex global_stream_lock;
void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service )
	global_stream_lock.lock();
	std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl;
	global_stream_lock.unlock();
	io_service->run();
	global_stream_lock.lock();
	std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl;
	global_stream_lock.unlock();
void PrintNum( int x )
	std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl;
int main( int argc, char * argv[] )
	boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service);
	boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service ));
	boost::asio::io_service::strand strand( *io_service );
	global_stream_lock.lock();
	std::cout << "[" <<  boost::this_thread::get_id()  << "] The program will exit when all  work has finished." <<  std::endl;
	global_stream_lock.unlock();
	boost::thread_group worker_threads;
	for( int x = 0; x < 2; ++x )
		worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) );
	boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) );
	//strand.post( boost::bind( &PrintNum, 1 ) );
	//strand.post( boost::bind( &PrintNum, 2 ) );
	//strand.post( boost::bind( &PrintNum, 3 ) );
	//strand.post( boost::bind( &PrintNum, 4 ) );
	//strand.post( boost::bind( &PrintNum, 5 ) );
	io_service->post( boost::bind( &PrintNum, 1 ) );
	io_service->post( boost::bind( &PrintNum, 2 ) );
	io_service->post( boost::bind( &PrintNum, 3 ) );
	io_service->post( boost::bind( &PrintNum, 4 ) );
	io_service->post( boost::bind( &PrintNum, 5 ) );
	work.reset();
	worker_threads.join_all();
	return 0;

去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand!

总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下:

#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
 * Create an asio::io_service and a thread_group (through pool in essence)
boost::asio::io_service ioService;
boost::thread_group threadpool;
 * This will start the ioService processing loop. All tasks 
 * assigned with ioService.post() will start executing. 
boost::asio::io_service::work work(ioService);
 * This will add 2 threads to the thread pool. (You could just put it in a for loop)
threadpool.create_thread(
    boost::bind(&boost::asio::io_service::run, &ioService)
threadpool.create_thread(
    boost::bind(&boost::asio::io_service::run, &ioService)
 * This will assign tasks to the thread pool. 
 * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions"
 * You can use strand when necessary, if so, remember add "strand.h"
ioService.post(boost::bind(myTask, "Hello World!"));
ioService.post(boost::bind(clearCache, "./cache"));
ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit"));
 * This will stop the ioService processing loop. Any tasks
 * you add behind this point will not execute.
ioService.stop();
 * Will wait till all the threads in the thread pool are finished with 
 * their assigned tasks and 'join' them. Just assume the threads inside
 * the threadpool will be destroyed by this method.
threadpool.join_all();

参考资料:

A guide to getting started with boost::asio
浅谈 Boost.Asio 的多线程模型
How to create a thread pool using boost in C++?

到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户​www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include &lt;boost/asio.hpp&gt; #include &lt;boost/shared_ptr.hpp&gt; #include &lt;boost/thread.hpp&gt; #include &lt;iostream&gt; <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service实例io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。        如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context