一般描述为,即使是先启动订阅者,再启动发布者,订阅者也有可能会丢失前一部分数据。你无法得知SUB是何时开始接收消息的。
这是因为订阅者向发布者建立连接也是耗费时间的,虽然时间极短,但不为0。这个时间内发布者发布的内容将没有订阅者能够接收。
几种处理方式:
对于可以容忍数据丢失的应用来说,不必理会丢失的数据。比如接收天气信息的应用,你可能只关注最新的天气信息。
因为数据丢失的原因是发布者在没有稳定接收者的情况下仍然发送了数据,所以可以让发布者等待一段时间再发送数据。
最简单的方式是sleep一段时间。
-
不知道sleep多久合适
-
即使sleep也不能保证sub者一定就准备好了
-
在程序中加sleep不是一种优雅的设计
发布者可以在确保订阅者已经启动成功的情况下再发送数据,只需要订阅者在准备好后通知发布者。
这个不难实现,订阅者在准备好后,首先使用req/rep模式向发布者发送一个特定的请求,发布者接收请求并应答,然后再发布真正的数据。
这种方式增加了简单一步操作,但保证了数据的完整。
注意:context是线程安全的,但socket非线程安全。在多个线程中使用同一个socket会导致程序崩溃(不提倡使用锁,它会导致竞争并减慢性能,不符合zmq的设计理念)。
-
使用proxy
一种比较常见的场景是,发布者使用多个线程来发布不同的数据,所有的数据通过一个endpoint发送。
这与动态发现问题类似,对于一个应用场景,可能会随时增加发布者或者订阅者,构建一个合适的系统可以减小编码和出错的机会。
如图所示,使用一个proxy可以轻松解决这个问题。
-
增加了中间层。中间层是静态的,它不会经常变动
-
pub可以随意增减,它们都connect到xsub
-
sub也可以随意增减,它们都connect到xpub
-
xsub收到的所有数据会通过proxy转发到xpub,从而发布到各sub
对于这种情况,可以在多个发布线程中分别创建socket,connect到xsub,从而避免多线程共用socket。
这种情况下,当发布线程较多时,会导致socket堆积,最终导致系统文件描述符过多而失败。可以使用线程池,每个线程使用自己的socket。
还有一种使用代理的情况是:
这里proxy类似于一个桥,连接了两个不同的网络。也可以作为协议网关,用于连接两个使用不同协议的网络。
结合天气服务,实现一个Proxy的例程如下:
#
include
"zhelpers.hpp"
int
main
(
int
argc
,
char
*
argv
[
]
)
zmq
::
context_t
context
(
1
)
;
zmq
::
socket_t
frontend
(
context
,
ZMQ_SUB
)
;
frontend
.
connect
(
"tcp://192.168.55.210:5556"
)
;
zmq
::
socket_t backend
(
context
,
ZMQ_PUB
)
;
backend
.
bind
(
"tcp://10.1.1.0:8100"
)
;
frontend
.
setsockopt
(
ZMQ_SUBSCRIBE
,
""
,
0
)
;
while
(
1
)
{
while
(
1
)
{
zmq
::
message_t message
;
int
more
;
size_t more_size
=
sizeof
(
more
)
;
frontend
.
recv
(
&
message
)
;
frontend
.
getsockopt
(
ZMQ_RCVMORE
,
&
more
,
&
more_size
)
;
backend
.
send
(
message
,
more
?
ZMQ_SNDMORE
:
0
)
;
if
(
!
more
)
break
;
return
0
;
-
使用push/pull
使用push/pull建立一个发布端模型,pull接收到所有数据通过proxy转到pub,再发送出去。
这种方式是使用xpub/xsub的变体。因为实际使用中,在单进程内使用sub绑定,pub连接的方式无法接收到数据,而push/pull是一个可行的替代。
启动代理: