boost::asio提供了一个跨平台的异步编程IO模型库,io_service类在多线程编程模型中提供了任务队列和任务分发功能。
io_service最常用的接口是:run, post, stop。
本文简要介绍io_service的使用,详细内容可以参阅相关reference。
使用run()启动。
run()会阻塞,直到:
简单测试例程如下:
#include <boost/asio/io_service.hpp> #include <iostream> int main(int argc, char *argv[]) // 创建io_service boost::asio::io_service io_service; // 附加任务 boost::asio::io_service::work work(io_service); // 启动并阻塞 io_service.run(); std::cout << "Yes, run() is returned!" << std::endl; return 0; 注意,程序打印文本,也就是说io_service阻塞在了run()。我们使用了work类,它会使得run()一直在任务待执行而不会退出,如果注释掉这一行,可以看到run会立即执行完毕并打印文本。 使用work时会阻塞run,可以使用work类型的指针来结束所有任务。 boost::asio::io_service io_service; boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); // 结束任务 work.reset(); // 立即返回 io_service.run(); 启动线程池 在多个线程中调用run()即可开启线程池,io_service负责执行任务处理。所有在线程池中等待的线程是平等的,io_service会随机选择一个线程去执行任务。 使用thread库 这里使用了boost::thread库,例子如下: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <iostream> boost::asio::io_service io_service; void WorkerThread() std::cout << "Thread Start\n"; io_service.run(); std::cout << "Thread Finish\n"; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); std::cout << "Press [return] to exit." << std::endl; boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( WorkerThread ); std::cin.get(); io_service.stop(); worker_threads.join_all(); return 0; stop()会告知io_service,所有的任务需要终止。它的调用可能会使已经进入队列的任务得不到执行。 如果想让它们都执行,最好先销毁work对象。如果想让系统马上停止,则调用stop()。 使用bind 使用boost::bind可以把函数调用包装成为对象。 调用常规函数的示例: #include <boost/bind.hpp> #include <iostream> void F2( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象 return 0; 调用类成员函数示例: class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
#include <boost/asio/io_service.hpp> #include <iostream> int main(int argc, char *argv[]) // 创建io_service boost::asio::io_service io_service; // 附加任务 boost::asio::io_service::work work(io_service); // 启动并阻塞 io_service.run(); std::cout << "Yes, run() is returned!" << std::endl; return 0; 注意,程序打印文本,也就是说io_service阻塞在了run()。我们使用了work类,它会使得run()一直在任务待执行而不会退出,如果注释掉这一行,可以看到run会立即执行完毕并打印文本。 使用work时会阻塞run,可以使用work类型的指针来结束所有任务。
注意,程序打印文本,也就是说io_service阻塞在了run()。我们使用了work类,它会使得run()一直在任务待执行而不会退出,如果注释掉这一行,可以看到run会立即执行完毕并打印文本。
使用work时会阻塞run,可以使用work类型的指针来结束所有任务。
boost::asio::io_service io_service; boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); // 结束任务 work.reset(); // 立即返回 io_service.run(); 启动线程池 在多个线程中调用run()即可开启线程池,io_service负责执行任务处理。所有在线程池中等待的线程是平等的,io_service会随机选择一个线程去执行任务。 使用thread库 这里使用了boost::thread库,例子如下: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <iostream> boost::asio::io_service io_service; void WorkerThread() std::cout << "Thread Start\n"; io_service.run(); std::cout << "Thread Finish\n"; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); std::cout << "Press [return] to exit." << std::endl; boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( WorkerThread ); std::cin.get(); io_service.stop(); worker_threads.join_all(); return 0; stop()会告知io_service,所有的任务需要终止。它的调用可能会使已经进入队列的任务得不到执行。 如果想让它们都执行,最好先销毁work对象。如果想让系统马上停止,则调用stop()。 使用bind 使用boost::bind可以把函数调用包装成为对象。 调用常规函数的示例: #include <boost/bind.hpp> #include <iostream> void F2( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象 return 0; 调用类成员函数示例: class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
boost::asio::io_service io_service; boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); // 结束任务 work.reset(); // 立即返回 io_service.run();
启动线程池 在多个线程中调用run()即可开启线程池,io_service负责执行任务处理。所有在线程池中等待的线程是平等的,io_service会随机选择一个线程去执行任务。 使用thread库 这里使用了boost::thread库,例子如下: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <iostream> boost::asio::io_service io_service; void WorkerThread() std::cout << "Thread Start\n"; io_service.run(); std::cout << "Thread Finish\n"; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); std::cout << "Press [return] to exit." << std::endl; boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( WorkerThread ); std::cin.get(); io_service.stop(); worker_threads.join_all(); return 0; stop()会告知io_service,所有的任务需要终止。它的调用可能会使已经进入队列的任务得不到执行。 如果想让它们都执行,最好先销毁work对象。如果想让系统马上停止,则调用stop()。 使用bind 使用boost::bind可以把函数调用包装成为对象。 调用常规函数的示例: #include <boost/bind.hpp> #include <iostream> void F2( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象 return 0; 调用类成员函数示例: class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
在多个线程中调用run()即可开启线程池,io_service负责执行任务处理。所有在线程池中等待的线程是平等的,io_service会随机选择一个线程去执行任务。
这里使用了boost::thread库,例子如下:
#include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <iostream> boost::asio::io_service io_service; void WorkerThread() std::cout << "Thread Start\n"; io_service.run(); std::cout << "Thread Finish\n"; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); std::cout << "Press [return] to exit." << std::endl; boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( WorkerThread ); std::cin.get(); io_service.stop(); worker_threads.join_all(); return 0; stop()会告知io_service,所有的任务需要终止。它的调用可能会使已经进入队列的任务得不到执行。 如果想让它们都执行,最好先销毁work对象。如果想让系统马上停止,则调用stop()。 使用bind 使用boost::bind可以把函数调用包装成为对象。 调用常规函数的示例: #include <boost/bind.hpp> #include <iostream> void F2( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象 return 0; 调用类成员函数示例: class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
#include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <iostream> boost::asio::io_service io_service; void WorkerThread() std::cout << "Thread Start\n"; io_service.run(); std::cout << "Thread Finish\n"; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( io_service )); std::cout << "Press [return] to exit." << std::endl; boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( WorkerThread ); std::cin.get(); io_service.stop(); worker_threads.join_all(); return 0; stop()会告知io_service,所有的任务需要终止。它的调用可能会使已经进入队列的任务得不到执行。 如果想让它们都执行,最好先销毁work对象。如果想让系统马上停止,则调用stop()。
stop()会告知io_service,所有的任务需要终止。它的调用可能会使已经进入队列的任务得不到执行。
如果想让它们都执行,最好先销毁work对象。如果想让系统马上停止,则调用stop()。
使用bind 使用boost::bind可以把函数调用包装成为对象。 调用常规函数的示例: #include <boost/bind.hpp> #include <iostream> void F2( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象 return 0; 调用类成员函数示例: class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
使用boost::bind可以把函数调用包装成为对象。
调用常规函数的示例:
#include <boost/bind.hpp> #include <iostream> void F2( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象 return 0; 调用类成员函数示例: class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
#include <boost/bind.hpp> #include <iostream> void F2( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) boost::bind( &F2, 42, 3.14f )();// 最后的括号表明调用该函数,如果去掉括号表示仅封装对象 return 0; 调用类成员函数示例:
调用类成员函数示例:
class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序: #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
class MyClass public: void F3( int i, float f ) std::cout << "i: " << i << std::endl; std::cout << "f: " << f << std::endl; int main( int argc, char * argv[] ) MyClass c; boost::bind( &MyClass::F3, &c, 42, 3.14f )(); return 0; 回到io_service,使用bind可以改写程序:
回到io_service,使用bind可以改写程序:
#include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0; post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
#include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service(new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Press [return] to exit." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 4; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); std::cin.get(); io_service->stop(); worker_threads.join_all(); return 0;
post任务 至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。 可以使用post/dispatch分发任务,它们的区别是: 如果可以,dispatch会立即执行任务,否则把任务加入到queuepost只会把任务加入到队列 具体使用何种方式需要根据具体情况确定。 如下的程序打印数字: boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
至此,启动了线程,其实并没有给线程做任务。也就是说,启动的线程处于空闲状态。
可以使用post/dispatch分发任务,它们的区别是:
具体使用何种方式需要根据具体情况确定。
如下的程序打印数字:
boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。 strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void Dispatch( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Post( int x ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] " << __FUNCTION__ << " x = " << x << std::endl; global_stream_lock.unlock(); void Run3( boost::shared_ptr< boost::asio::io_service > io_service ) for( int x = 0; x < 3; ++x ) io_service->dispatch( boost::bind( &Dispatch, x * 2 ) ); io_service->post( boost::bind( &Post, x * 2 + 1 ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 1; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); io_service->post( boost::bind( &Run3, io_service ) ); work.reset(); worker_threads.join_all(); return 0; 可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。
可以看到,所有的任务得到了执行,但执行顺序可能不同。这个示例也可以看出混合使用post和dispatch可能存在的问题。
strand顺序处理 strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。 #include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
strand提供顺序化的事件执行器。意思是,如果以“work1->work2->work3”的顺序post,不管有多少个工作线程,它们依然会以这样的顺序执行任务。
#include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下: #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
#include <boost/asio/io_service.hpp> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/asio/strand.hpp> #include <iostream> boost::mutex global_stream_lock; void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service ) global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Start" << std::endl; global_stream_lock.unlock(); io_service->run(); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] Thread Finish" << std::endl; global_stream_lock.unlock(); void PrintNum( int x ) std::cout << "[" << boost::this_thread::get_id() << "] x: " << x << std::endl; int main( int argc, char * argv[] ) boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service); boost::shared_ptr< boost::asio::io_service::work > work(new boost::asio::io_service::work( *io_service )); boost::asio::io_service::strand strand( *io_service ); global_stream_lock.lock(); std::cout << "[" << boost::this_thread::get_id() << "] The program will exit when all work has finished." << std::endl; global_stream_lock.unlock(); boost::thread_group worker_threads; for( int x = 0; x < 2; ++x ) worker_threads.create_thread( boost::bind( &WorkerThread, io_service ) ); boost::this_thread::sleep( boost::posix_time::milliseconds( 1000 ) ); //strand.post( boost::bind( &PrintNum, 1 ) ); //strand.post( boost::bind( &PrintNum, 2 ) ); //strand.post( boost::bind( &PrintNum, 3 ) ); //strand.post( boost::bind( &PrintNum, 4 ) ); //strand.post( boost::bind( &PrintNum, 5 ) ); io_service->post( boost::bind( &PrintNum, 1 ) ); io_service->post( boost::bind( &PrintNum, 2 ) ); io_service->post( boost::bind( &PrintNum, 3 ) ); io_service->post( boost::bind( &PrintNum, 4 ) ); io_service->post( boost::bind( &PrintNum, 5 ) ); work.reset(); worker_threads.join_all(); return 0; 去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand! 总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下:
去掉了打印数据时的锁。如果使用post,会得到比较混乱的输出,而如果使用strand,则能得到清晰的输出,这就是strand!
总之,使用io_service能够方便地构建多线程处理程序。它的总体使用流程总结如下:
#include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料: A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
#include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> * Create an asio::io_service and a thread_group (through pool in essence) boost::asio::io_service ioService; boost::thread_group threadpool; * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. boost::asio::io_service::work work(ioService); * This will add 2 threads to the thread pool. (You could just put it in a for loop) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) threadpool.create_thread( boost::bind(&boost::asio::io_service::run, &ioService) * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" * You can use strand when necessary, if so, remember add "strand.h" ioService.post(boost::bind(myTask, "Hello World!")); ioService.post(boost::bind(clearCache, "./cache")); ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit")); * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. ioService.stop(); * Will wait till all the threads in the thread pool are finished with * their assigned tasks and 'join' them. Just assume the threads inside * the threadpool will be destroyed by this method. threadpool.join_all(); 参考资料:
参考资料:
A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++? 到目前为止,我们已经审查过的服务器始终在执行 `async_accept`,因此它们始终至少有一个已计划的任务,所以我们实际上不需要以这种方式保持它们运行。但是客户端不执行 `async_accept`,这对于它在某个时候没有计划的任务是正常的。要防止 `io_context::run` 返回,您应该使用 `boost::asio::executor_work_guard`(以前的 `io_context::work`,目前已弃用)类实例。`io_context::run` 运行直到所有计划的任务都完成。 从第一个boost.asio的教程开始,boost文档就一直在告诉我们:使用boost.asio第一步就是要创建一个io_service对象。那么io_service是个什么东西呢? boost.asio文档说,io_service为下面的这些异步IO对象提供最核心的IO功能: boost::asio::ip::tcp::socketboost::asio::ip::tcp::acc 之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考知乎用户www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts单线程模式而今天将设计IOServicePool类型的多线程模型,如下图所示多线程模式IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。 最近在学习licode发现源码有使用到boost::asio::io_service,因此在网上找文章了解一下,顺便写篇博客记录 io_service 负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处 例子(我们有3个异步操作,2个socket连接操作和一个计时器等待操作): 有一个io_service实例和一个处理线程的单线程例子: io_service ... #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <iostream> <br />构造函数<br /><br />构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 <br /><br />Dispatch和post的区别<br /><br />Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 <br /><br />Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个io_service的实例。io_service是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。 如果你选择用同步的方式来创建你的应用,你则不需要考虑我将在这一节向你展示的东西。 你有多种不同的方式来使用io_service。在下面的例子中,我们有3个异 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.d... IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。 asio::io_serviceio_service;asio::ip::tcp::socketsocket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序... Bost asio库与线程池的使用 Boost.Asio 有两种支持多线程的方式 第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_context,并且每个线程都调用各自的io_context的run()方法。 另一种支持多线程的方式:全局只分配一个io_context,并且让这个io_context在多个线程之间共享,每个线程都调用全局的io_service的run()方法。 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_context
A guide to getting started with boost::asio 浅谈 Boost.Asio 的多线程模型 How to create a thread pool using boost in C++?