zmq pub-sub, push-pull模式没有客服端服务端启动先后顺序的限制,与普通的socket通信不一样,必须先启动服务端。
以下是测试程序,pub.py为服务端,sub.py客户端。
pub.py
# coding: utf-8
import zmq
import time
import threading
import os
import stat
# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
TOPIC_LIST = ["lator", "att"]
def unlink_ipc(path):
index = path.rfind('ipc://')
if index < 0:
return
fpath = path[len('ipc://'):]
#if os.path.exists(fpath):
os.unlink(fpath)
def pub(pubaddr, topic):
context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.set_hwm(100)
#unlink_ipc(pubaddr)
sock.bind(pubaddr)
counter = 1
os.chmod(pubaddr[len('ipc://'):], stat.S_IRWXO + stat.S_IRWXG + stat.S_IRWXU)
zpath = sock.getsockopt(zmq.LAST_ENDPOINT)
print zpath
while True:
messagedata = "this is msg fro topic one %s" % counter
print "%s %s" % (topic, messagedata)
sock.send("%s %s" % (topic, messagedata))
counter = counter + 1
time.sleep(1)
if __name__ == "__main__":
t1 = threading.Thread(target=pub, args=(LOG_TYPE_PUB_PATH, "lator"))
t2 = threading.Thread(target=pub, args=(LOG_SUB_PATH, "att"))
t1.start()
t2.start()
t1.join()
t2.join()
sub.py
# coding: utf-8
import os
import zmq
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream
# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
TOPIC_LIST = ["lator", "att"]
def unlink_ipc(path):
index = path.rfind('ipc://')
if index < 0:
return
fpath = path[len('ipc://'):]
if os.path.exists(fpath):
os.unlink(fpath)
def recv_func(msg):
print msg
def main2():
loop_instance = IOLoop.instance()
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.set_hwm(100)
sock.connect(LOG_TYPE_PUB_PATH)
sock.connect(LOG_SUB_PATH)
for key in TOPIC_LIST:
if isinstance(key, str):
sock.setsockopt(zmq.SUBSCRIBE, key)
elif isinstance(key, unicode):
sock.setsockopt_string(zmq.SUBSCRIBE, key)
else:
print("log_broker to set subscribe error:%s" % key)
sock = ZMQStream(sock, loop_instance)
sock.on_recv(recv_func)
loop_instance.start()
if __name__ == "__main__":
main2()
zmq pub-sub, push-pull模式没有客服端服务端启动先后顺序的限制,与普通的socket通信不一样,必须先启动服务端。以下是测试程序,pub.py为服务端,sub.py客户端。pub.py# coding: utf-8import zmqimport timeimport threadingimport osimport stat# 分类后的日志的zm...
zmqpubsub
zmqpubsub是在ZeroMQ之上的一个简单的Go
pubsub实现。
它抽象了底层的ZeroMQ机械,以为发布-订阅消息传递模式提供Go友好的API。
go get -tags
zmq_3_x github.com/hpcloud/
zmqpubsub
首先设置一个经纪人:
var Broker
zmqpubsub. Broker
func init () {
Broker .
PubAddr = "tcp://127.0.0.1:4000"
Broker .
SubAddr = "tcp://127.0.0.1:4001"
Broker . BufferSize = 100
func main () {
Broker . MustRun ()
代理指定发布者/订阅者将连接到的地址或从中连接的地址。
pub/
sub模式介绍
发布/订阅模式,全称为
Publish/
Subscribe,支持多个发布者/多订阅者,使用在消息单向传输的应用场景,消息总是从发布者发送到订阅者。
一般的使用流程为:
pub端:
创建context
创建socket,设置
ZMQ_
PUB模式
bind端口
循环发布消息send
socket特性:
ZMQ 的三个基本模型
ZMQ 提供了三个基本的通信模型,分别是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”
请求应答模式(Request-Reply)(rep 和 req)
消息双向的,有来有往,req端请求的消息,rep端必须答复给req端
订阅发布模式 (pub 和 sub)
消息单向的,有去无回的。可按照发布端可发...
5. Advanced Pub-Sub Patterns | ØMQ - The Guidehttps://zguide.zeromq.org/docs/chapter5/
我们将介绍:
何时使用发布订阅
如何处理太慢的订阅者(自杀蜗牛模式)
如何设计高速订阅者(黑盒模式)
如何监控发布-订阅网络(Espresso 模式)
如何构建共享键值存储(克隆模式)
如何使用反应器简化复杂的服务器
如何使用 Binary Star 模式向服务器添加故障转移
Pub-Sub 的优点和缺点
ZeroMQ
ØMQ支持多种模式,具体可以参阅:https://blog.csdn.net/qq_41453285/article/details/106865539
本文介绍ØMQ的“发布-订阅”模式
二、发布-订阅模式
发布-订阅模式由https://rfc.zeromq.org/spec/29/正式定义
在发布-订阅模式中,有一个发布者用来发送消息,该模式中有很多订阅者会接收发布者发布的消息
ØMQ的套接字类型有4种:
ZMQ_
PUB
ZMQ_
SUB
ZMQ_X
PUB
const string host = string("tcp://127.0.0.1:5803");
const string topic("gnss_sensor");
void *context =
zmq_ctx_new();
assert(nullptr != context);
void *socket =
zmq_socket(context,
ZMQ_
PUB);
assert(nullptr != socket);
int recv_tim
一个英俊的小伙子:A哥,
一位漂亮的小姑娘:B妹。
A哥:是一位聋哑人,但是会写字,常去奈何桥边钓鱼,每天都会趁钓鱼的空隙,在奈何桥上看美女,重点他每次看到一个他喜欢的,他都会在桥头的木牌上表达自己的欣赏。
B妹收到了诅咒,只有在奈何桥边有男人向她表白,她就能摆脱凡体,变成仙女,所以B妹每天都会去奈何桥边。
某一天清晨,A哥(发送者)看到了B妹(接收者),于是,A哥没办法直接表述自己
ZMQ专题学习之一:初识ZeroMQ
ZeroMQ号称是“史上最快的消息队列”,基于c语言开发的。引用官方说明定义:“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
ZMQ的明确目标是“成为标准网络协议...
Qt是一个跨平台的应用开发框架,提供了丰富的类库和工具,使得开发人员可以快速地开发高效、强交互性的应用程序。在Qt开发中,使用ZeroMQ(简称zmq)来进行通信,可以实现快速、可靠、异步的数据传输。
ZeroMQ是一个高性能、可重用的消息传递库,支持多种网络协议和消息传递模式。使用ZeroMQ可以快速地实现基于消息的通信模式,而无需关心具体的传输细节。当然,ZeroMQ也支持多种传输协议,如TCP、inproc、ipc等。
Qt使用zmq进行通信时,需要使用zmq的API来创建一个socket,用于接收或发送消息。在Qt中,可以使用QSocketNotifier类来监听zmq的socket,当有数据来时,会触发相应的信号,从而实现数据的接收与处理。
同时,在Qt中还需要使用QThread类来实现多线程处理,以避免在主线程中阻塞的情况。Qt提供了多种线程池和异步调用机制,开发者可以根据具体需求来选择合适的处理方式。
总之,Qt提供了一套完整的API和工具,使得开发者可以方便地使用zmq进行数据通信。结合Qt丰富的类库和工具,开发者可以快速地构建高效、可靠的应用程序,满足不同用户的需求。