zmq pub-sub, push-pull模式没有客服端服务端启动先后顺序的限制,与普通的socket通信不一样,必须先启动服务端。

以下是测试程序,pub.py为服务端,sub.py客户端。

pub.py

# coding: utf-8
import zmq
import time
import threading
import os
import stat
# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
TOPIC_LIST = ["lator", "att"]
def unlink_ipc(path):
    index = path.rfind('ipc://')
    if index < 0:
        return
    fpath = path[len('ipc://'):]
    #if os.path.exists(fpath):
    os.unlink(fpath)
def pub(pubaddr, topic):
    context = zmq.Context()
    sock = context.socket(zmq.PUB)
    sock.set_hwm(100)
    #unlink_ipc(pubaddr)
    sock.bind(pubaddr)
    counter = 1
    os.chmod(pubaddr[len('ipc://'):], stat.S_IRWXO + stat.S_IRWXG + stat.S_IRWXU)
    zpath = sock.getsockopt(zmq.LAST_ENDPOINT)
    print zpath
    while True:
        messagedata = "this is msg fro topic one %s" % counter
        print "%s %s" % (topic, messagedata)
        sock.send("%s %s" % (topic, messagedata))
        counter = counter + 1
        time.sleep(1)
if __name__ == "__main__":
    t1 = threading.Thread(target=pub, args=(LOG_TYPE_PUB_PATH, "lator"))
    t2 = threading.Thread(target=pub, args=(LOG_SUB_PATH, "att"))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

sub.py

# coding: utf-8
import os
import zmq
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream
# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
TOPIC_LIST = ["lator", "att"]
def unlink_ipc(path):
    index = path.rfind('ipc://')
    if index < 0:
        return
    fpath = path[len('ipc://'):]
    if os.path.exists(fpath):
        os.unlink(fpath)
def recv_func(msg):
    print msg
def main2():
    loop_instance = IOLoop.instance()
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.SUB)
    sock.set_hwm(100)
    sock.connect(LOG_TYPE_PUB_PATH)
    sock.connect(LOG_SUB_PATH)
    for key in TOPIC_LIST:
        if isinstance(key, str):
            sock.setsockopt(zmq.SUBSCRIBE, key)
        elif isinstance(key, unicode):
            sock.setsockopt_string(zmq.SUBSCRIBE, key)
        else:
            print("log_broker to set subscribe error:%s" % key)
    sock = ZMQStream(sock, loop_instance)
    sock.on_recv(recv_func)
    loop_instance.start()
if __name__ == "__main__":
    main2()
zmq pub-sub, push-pull模式没有客服端服务端启动先后顺序的限制,与普通的socket通信不一样,必须先启动服务端。以下是测试程序,pub.py为服务端,sub.py客户端。pub.py# coding: utf-8import zmqimport timeimport threadingimport osimport stat# 分类后的日志的zm...
zmqpubsub zmqpubsub是在ZeroMQ之上的一个简单的Go pubsub实现。 它抽象了底层的ZeroMQ机械,以为发布-订阅消息传递模式提供Go友好的API。 go get -tags zmq_3_x github.com/hpcloud/zmqpubsub 首先设置一个经纪人: var Broker zmqpubsub. Broker func init () { Broker . PubAddr = "tcp://127.0.0.1:4000" Broker . SubAddr = "tcp://127.0.0.1:4001" Broker . BufferSize = 100 func main () { Broker . MustRun () 代理指定发布者/订阅者将连接到的地址或从中连接的地址。 pub/sub模式介绍 发布/订阅模式,全称为Publish/Subscribe,支持多个发布者/多订阅者,使用在消息单向传输的应用场景,消息总是从发布者发送到订阅者。 一般的使用流程为: pub端: 创建context 创建socket,设置ZMQ_PUB模式 bind端口 循环发布消息send socket特性:
ZMQ 的三个基本模型 ZMQ 提供了三个基本的通信模型,分别是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline” 请求应答模式(Request-Reply)(rep 和 req) 消息双向的,有来有往,req端请求的消息,rep端必须答复给req端 订阅发布模式 (pubsub) 消息单向的,有去无回的。可按照发布端可发...
5. Advanced Pub-Sub Patterns | ØMQ - The Guidehttps://zguide.zeromq.org/docs/chapter5/ 我们将介绍: 何时使用发布订阅 如何处理太慢的订阅者(自杀蜗牛模式) 如何设计高速订阅者(黑盒模式) 如何监控发布-订阅网络(Espresso 模式) 如何构建共享键值存储(克隆模式) 如何使用反应器简化复杂的服务器 如何使用 Binary Star 模式向服务器添加故障转移 Pub-Sub 的优点和缺点 ZeroMQ
ØMQ支持多种模式,具体可以参阅:https://blog.csdn.net/qq_41453285/article/details/106865539 本文介绍ØMQ的“发布-订阅”模式 二、发布-订阅模式 发布-订阅模式由https://rfc.zeromq.org/spec/29/正式定义 在发布-订阅模式中,有一个发布者用来发送消息,该模式中有很多订阅者会接收发布者发布的消息 ØMQ的套接字类型有4种: ZMQ_PUB ZMQ_SUB ZMQ_XPUB const string host = string("tcp://127.0.0.1:5803"); const string topic("gnss_sensor"); void *context = zmq_ctx_new(); assert(nullptr != context); void *socket = zmq_socket(context, ZMQ_PUB); assert(nullptr != socket); int recv_tim
一个英俊的小伙子:A哥, 一位漂亮的小姑娘:B妹。 A哥:是一位聋哑人,但是会写字,常去奈何桥边钓鱼,每天都会趁钓鱼的空隙,在奈何桥上看美女,重点他每次看到一个他喜欢的,他都会在桥头的木牌上表达自己的欣赏。 B妹收到了诅咒,只有在奈何桥边有男人向她表白,她就能摆脱凡体,变成仙女,所以B妹每天都会去奈何桥边。 某一天清晨,A哥(发送者)看到了B妹(接收者),于是,A哥没办法直接表述自己
ZMQ专题学习之一:初识ZeroMQ ZeroMQ号称是“史上最快的消息队列”,基于c语言开发的。引用官方说明定义:“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。 ZMQ的明确目标是“成为标准网络协议...
Qt是一个跨平台的应用开发框架,提供了丰富的类库和工具,使得开发人员可以快速地开发高效、强交互性的应用程序。在Qt开发中,使用ZeroMQ(简称zmq)来进行通信,可以实现快速、可靠、异步的数据传输。 ZeroMQ是一个高性能、可重用的消息传递库,支持多种网络协议和消息传递模式。使用ZeroMQ可以快速地实现基于消息的通信模式,而无需关心具体的传输细节。当然,ZeroMQ也支持多种传输协议,如TCP、inproc、ipc等。 Qt使用zmq进行通信时,需要使用zmq的API来创建一个socket,用于接收或发送消息。在Qt中,可以使用QSocketNotifier类来监听zmq的socket,当有数据来时,会触发相应的信号,从而实现数据的接收与处理。 同时,在Qt中还需要使用QThread类来实现多线程处理,以避免在主线程中阻塞的情况。Qt提供了多种线程池和异步调用机制,开发者可以根据具体需求来选择合适的处理方式。 总之,Qt提供了一套完整的API和工具,使得开发者可以方便地使用zmq进行数据通信。结合Qt丰富的类库和工具,开发者可以快速地构建高效、可靠的应用程序,满足不同用户的需求。