• 创建context
  • 创建socket,设置ZMQ_PUB模式
  • bind 端口
  • 循环发布消息 send

socket特性:

特性
兼容的对端套接字 ZMQ_SUB
方向性 单向
发送/接收模式 仅发送
进入路由策略
流出路由策略 扇出(呈扇形发出)
ZMQ_HWM 选项行为 丢弃

sub端:

  • 创建context
  • 创建socket,设置ZMQ_SUB模式
  • connect 到pub端
  • setsockopt 设置ZMQ_SUBSCRIBE订阅的消息
  • 循环接收recv
特性
兼容的对端套接字 ZMQ_PUB
方向性 单向
发送/接收模式 仅接收
进入路由策略 平等排队
流出路由策略
ZMQ_HWM 选项行为 丢弃
  • pub端socket不能使用recv函数,同样,sub端不能使用send函数
  • 当pub端由于到达了高水位而使ZMQ_PUB套接字进入静默模式的时候,所有发送到这个有问题的订阅者的消息都会被丢弃,直到静默模式终止
  • pub端socket的zmq_send()函数永远不会阻塞
  • sub端刚创建socket后是无法订阅到任何消息的,必须使用setsockopt设置订阅的消息后才能接收到
  • sub端是根据参数前缀进行过滤的 。订阅的内容以pub端发出的内容从头开始匹配为过滤条件,完全匹配订阅内容的消息会被sub接收,如订阅内容为 "a" 时,所有以a开头的消息都会接收
  • 当订阅内容为 "" ,长度为0时为订阅所有内容,因为所有消息都匹配成功
  • 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息
  • 如果采用tcp的连接方式,sub很慢时,消息将会堆积在pub,可以通过设置ZMQ_HWM选项来保护发布者,对于重要的消息,也可以写入硬盘等待发送
  • pub和sub谁bind谁connect并无严格要求(虽本质并无区别),但仍建议pub使用bind,sub使用connect,在实际测试中,使用sub绑定而pub connect时,sub端无法接收到消息
  • 一个显著的问题是, “slow joiner”可能导致发布者的第一笔消息总是丢失 ,下文会进一步说明该问题
  • 一个订阅者(subcriber)可以链接超过一个发布者(publisher)。数据到达后将交叉存取(公平队列),以保证发布者之间的数据不会淹没
  • 从ZeroMQ v3.x版本开始,使用tcp://或者ipc://协议连接时会在发布者进行消息过滤,使用epgm://协议仍在订阅者过滤;在ZeroMQ v2.x,所有消息过滤都发生在订阅者

关于slow joiner问题

一般描述为,即使是先启动订阅者,再启动发布者,订阅者也有可能会丢失前一部分数据。你无法得知SUB是何时开始接收消息的。

这是因为订阅者向发布者建立连接也是耗费时间的,虽然时间极短,但不为0。这个时间内发布者发布的内容将没有订阅者能够接收。

几种处理方式:

对于可以容忍数据丢失的应用来说,不必理会丢失的数据。比如接收天气信息的应用,你可能只关注最新的天气信息。

  • 使用延时发布策略

因为数据丢失的原因是发布者在没有稳定接收者的情况下仍然发送了数据,所以可以让发布者等待一段时间再发送数据。

最简单的方式是sleep一段时间。

  1. 不知道sleep多久合适
  2. 即使sleep也不能保证sub者一定就准备好了
  3. 在程序中加sleep不是一种优雅的设计
  • 通知发布者模式

发布者可以在确保订阅者已经启动成功的情况下再发送数据,只需要订阅者在准备好后通知发布者。

这个不难实现,订阅者在准备好后,首先使用req/rep模式向发布者发送一个特定的请求,发布者接收请求并应答,然后再发布真正的数据。

这种方式增加了简单一步操作,但保证了数据的完整。

多线程发送问题

注意:context是线程安全的,但socket非线程安全。在多个线程中使用同一个socket会导致程序崩溃(不提倡使用锁,它会导致竞争并减慢性能,不符合zmq的设计理念)。

  1. 使用proxy

一种比较常见的场景是,发布者使用多个线程来发布不同的数据,所有的数据通过一个endpoint发送。

这与动态发现问题类似,对于一个应用场景,可能会随时增加发布者或者订阅者,构建一个合适的系统可以减小编码和出错的机会。

如图所示,使用一个proxy可以轻松解决这个问题。

  • 增加了中间层。中间层是静态的,它不会经常变动
  • pub可以随意增减,它们都connect到xsub
  • sub也可以随意增减,它们都connect到xpub
  • xsub收到的所有数据会通过proxy转发到xpub,从而发布到各sub

对于这种情况,可以在多个发布线程中分别创建socket,connect到xsub,从而避免多线程共用socket。

这种情况下,当发布线程较多时,会导致socket堆积,最终导致系统文件描述符过多而失败。可以使用线程池,每个线程使用自己的socket。

还有一种使用代理的情况是:

这里proxy类似于一个桥,连接了两个不同的网络。也可以作为协议网关,用于连接两个使用不同协议的网络。

结合天气服务,实现一个Proxy的例程如下:

// Weather proxy device C++ // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> # include "zhelpers.hpp" int main ( int argc , char * argv [ ] ) zmq :: context_t context ( 1 ) ; // This is where the weather server sits zmq :: socket_t frontend ( context , ZMQ_SUB ) ; frontend . connect ( "tcp://192.168.55.210:5556" ) ; // This is our public endpoint for subscribers zmq :: socket_t backend ( context , ZMQ_PUB ) ; backend . bind ( "tcp://10.1.1.0:8100" ) ; // Subscribe on everything frontend . setsockopt ( ZMQ_SUBSCRIBE , "" , 0 ) ; // Shunt messages out to our own subscribers while ( 1 ) { while ( 1 ) { zmq :: message_t message ; int more ; size_t more_size = sizeof ( more ) ; // Process all parts of the message frontend . recv ( & message ) ; frontend . getsockopt ( ZMQ_RCVMORE , & more , & more_size ) ; backend . send ( message , more ? ZMQ_SNDMORE : 0 ) ; if ( ! more ) break ; // Last message part return 0 ;
  1. 使用push/pull

使用push/pull建立一个发布端模型,pull接收到所有数据通过proxy转到pub,再发送出去。

这种方式是使用xpub/xsub的变体。因为实际使用中,在单进程内使用sub绑定,pub连接的方式无法接收到数据,而push/pull是一个可行的替代。

启动代理:

// 代码片断
void StartPubProxy(string& port)
        // 面向client的socket,多线程发来所有数据
        zmq::socket_t frontend(m_ctx, ZMQ_PULL);
        frontend.bind("inproc://#0");
        // 面向services的socket,提供对外端口,并实际发布数据
        zmq::socket_t backend(m_ctx, ZMQ_PUB);
        string strBind = "tcp://*:" + port;
        backend.bind(strBind);
        // 创建proxy
        zmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);
    catch(const std::exception& e)
        std::cerr <<"zmq启动转发代理失败:" << e.what() << '\n';

原发布线程向"inproc://#0"push数据即可。

  1. 使用boost asio单线程处理数据

在有些情况下,加锁的多线程未必比无锁单线程更快。

对于多线程发布模型来说,可以把它们要发送的数据通过strand.post到io_service的单线程队列里,由工作线程异步处理。

这种方式简单方便,关于使用asio创建线程的使用可以参考:boost::asio::io_service创建线程池简单实例

保护发布者

前方讲述,当sub端处理较慢时,pub端在到达高水位线后会丢弃数据。对于重要的应用,由于不能对sub端的性能作出任何假设,所以需要一定的策略来保证。

ZMQ_HWM

ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量),ZMQ_RCVHWM:对进入socket的消息设置高水位。

ZMQ_SNDHWM属性将会设置socket参数指定的socket对外发送的消息的高水位。高水位是一个硬限制,它会限制每一个与此socket相连的在内存中排队的未处理的消息数目的最大值。

0值代表着没有限制,默认值为1000,就在bind/connect之前设置该属性。如果设置为无限,可能会导致发布者崩溃。

如果已经到达了规定的限制,socket就需要进入一种异常的状态,表现形式因socket类型而异。socket会进行适当的调节,比如阻塞或者丢弃已发送的消息。

总是给套接字设置一个基于期望的订阅方数量的最大值,你打算用于队列的内存的数量,和一个消息平均大小的高水位线。例如,如果你希望有5000个订阅方,有1G的内存可有,消息平均200字节,那么一个安全的高水位线应该是(1000000000/200/5000)=1000.

?MQ - The Guide
Unable to receive messages when binding subscriber socket
What is a simple example of a working XSUB / XPUB proxy in zeromq
How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?
Proxy with inproc frontend
ZMQ模式详解——发布/订阅模式
ZeroMQ基础入门

关于zmq的基本简介,请参考ZeroMQ基础入门。pub/sub模式介绍发布/订阅模式,全称为Publish/Subscribe,支持多个发布者/多订阅者,使用在消息单向传输的应用场景,消息总是从发布者发送到订阅者。一般的使用流程为:pub端:创建context创建socket,设置ZMQ_PUB模式bind端口循环发布消息sendsocket特性:特性值... 参考: http://zguide.zeromq.org/page:all PUB/SUB模式: 发布/订阅模式。该模式是单向的,发布者send msg, 订阅者receive msg.  一旦有消息发出,pub会发送给所有的sub。. 模式如下图: 可以看出发布者绑定绑定一个端口,订阅者通过连接发布者接受订阅的消息。 官网描述这种模式要注意以下几点: 1. pub/sub模式... const string host = string("tcp://127.0.0.1:5803"); const string topic("gnss_sensor"); void *context = zmq_ctx_new(); assert(nullptr != context); void *socket = zmq_socket(context, ZMQ_PUB); assert(nullptr != socket); int recv_tim
5. Advanced Pub-Sub Patterns | ØMQ - The Guidehttps://zguide.zeromq.org/docs/chapter5/ 我们将介绍: 何时使用发布订阅 如何处理太慢的订阅者(自杀蜗牛模式) 如何设计高速订阅者(黑盒模式) 如何监控发布-订阅网络(Espresso 模式) 如何构建共享键值存储(克隆模式) 如何使用反应器简化复杂的服务器 如何使用 Binary Star 模式向服务器添加故障转移 Pub-Sub 的优点和缺点 ZeroMQ
    zmq::context_t context(1); //若在类中进行封装时,应与socket_t的生命周期一致。     zmq::socket_t pubSocket(context,ZMQ_PUB);     pubSocket.bind("t
zmqpubsub zmqpubsub是在ZeroMQ之上的一个简单的Go pubsub实现。 它抽象了底层的ZeroMQ机械,以为发布-订阅消息传递模式提供Go友好的API。 go get -tags zmq_3_x github.com/hpcloud/zmqpubsub 首先设置一个经纪人: var Broker zmqpubsub. Broker func init () { Broker . PubAddr = "tcp://127.0.0.1:4000" Broker . SubAddr = "tcp://127.0.0.1:4001" Broker . BufferSize = 100 func main () { Broker . MustRun () 代理指定发布者/订阅者将连接到的地址或从中连接的地址。 当建立TCP连接的时候,会关联到一些定时器,其中一些定时器用来处理keepalive事务。 当该定时器到0时,发送到对端一个keepalive探测包(下称保活包),保活包没有真实的数据,并且需要对端响应ACK。 由于TCP/...
Qt是一个跨平台的应用开发框架,提供了丰富的类库和工具,使得开发人员可以快速地开发高效、强交互性的应用程序。在Qt开发中,使用ZeroMQ(简称zmq)来进行通信,可以实现快速、可靠、异步的数据传输。 ZeroMQ是一个高性能、可重用的消息传递库,支持多种网络协议和消息传递模式。使用ZeroMQ可以快速地实现基于消息的通信模式,而无需关心具体的传输细节。当然,ZeroMQ也支持多种传输协议,如TCP、inproc、ipc等。 Qt使用zmq进行通信时,需要使用zmq的API来创建一个socket,用于接收或发送消息。在Qt中,可以使用QSocketNotifier类来监听zmq的socket,当有数据来时,会触发相应的信号,从而实现数据的接收与处理。 同时,在Qt中还需要使用QThread类来实现多线程处理,以避免在主线程中阻塞的情况。Qt提供了多种线程池和异步调用机制,开发者可以根据具体需求来选择合适的处理方式。 总之,Qt提供了一套完整的API和工具,使得开发者可以方便地使用zmq进行数据通信。结合Qt丰富的类库和工具,开发者可以快速地构建高效、可靠的应用程序,满足不同用户的需求。