An open-source universal messaging library
复制代码

ZeroMQ(也称为ØMQ,0MQ或zmq)看起来像一个可嵌入的网络库,但是却像一个并发框架。 它为您提供套接字,这些套接字可在各种传输方式(例如进程内,进程间,TCP和多播)中承载原子消息。 您可以使用扇出,发布-订阅,任务分发和请求-回复等模式将套接字 N-to-N 连接起来。 它足够快,可以成为集群产品的基础。 它的异步 I/O 模型为您提供了可扩展的多核应用程序,这些应用程序是作为异步消息处理任务而构建的。 它具有多种语言API,并且可以在大多数操作系统上运行。

  • 通用的: 在任何语言、任何平台上连接您的代码。
  • 智能的:智能模式,例如pub-sub,push-pull和client-server。
  • 高效的:小型库中的异步 I/O 引擎。
  • 多传输支持:跨inproc,IPC,TCP,UDP,TIPC,多播和WebSocket传送消息
  • 社区:由一个大型的、活跃的开源社区支持
  • ZeroMQ,又称为0MQ或者ZMQ.是一个高性能的异步的消息传递库,旨在用于分布式的或者并发的应用程序。它提供了一个消息队列,但是与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。

    Zeromq支持通用的消息模式(发布订阅,请求响应,客户端服务端等其他)通过一系列传输(TCP,进程内,进程间,多播,WebSocket等)让进程间消息通信和线程间消息通信一样简单。这样可以使您的代码清晰,模块化并且易于扩展。

    ZeroMq由一大帮贡献者开发的

    2.ZeroMQ中的Zero

    ZeroMQ的哲学是从0开始的。零是指零代理(ZeroMQ是无代理的)、零延迟、零成本(它是免费的)和零管理。 更一般地说,zero指的是贯穿整个项目的极简主义文化。我们通过消除复杂性而不是公开新功能来增加功能

    3.核心概念

    ZeroMQ消息是在应用程序或同一应用程序的组件之间传递的离散数据单元。 从ZeroMQ本身的角度来看,消息被认为是不透明的二进制数据。

    在线上,ZeroMQ消息是可容纳在内存中的大小从零到开始的任何大小的Blob。 您可以使用协议缓冲区,msgpack,JSON或其他应用程序需要的来进行序列化。 选择可移植的序列化数据表示形式是明智的,但是您可以自行权衡取舍。

    最简单的ZeroMQ消息由一帧(也称为消息部分)组成。帧是ZeroMQ消息的基本有线格式。帧是指定长度的数据块。长度可以是0。ZeroMQ保证交付消息的所有部分(一个或多个),或者一个也不交付。这允许您以单个在线消息的形式发送或接收帧列表。

    一条消息(单部分或多部分)必须适合内存。 如果要发送任意大小的文件,则应将它们分成多个部分,并将每个部分作为单独的单部分消息发送。 使用多部分数据不会减少内存消耗。

  • 使用字符串
  • 将数据作为字符串传递通常是进行通信的最简单方法,因为序列化非常简单。对于ZeroMQ,我们建立了这样的规则:字符串是指定长度的,并且在传输时不带结尾null。

    因为我们使用帧的长度来反映字符串的长度,所以我们可以通过将多个字符串放在一个单独的帧中来发送多个字符串。

    language

    使用sendMore方法可以在一条消息中发送多个字符串帧。 此方法将延迟消息的实际发送,直到发送完最后一帧为止。

    socket.sendMore (socket, "HELLO");
    socket.sendMore (socket, "beautiful");
    socket.send (socket, "WORLD!");
    复制代码

    除了使用套接字的send API,您还可以使用ZMsg类构造多帧消息。

    ZMsg *strings = new ZMsg();
    strings.add("HELLO");
    strings.add("beautiful");
    strings.add("WORLD");
    strings.send(socket);
    复制代码

    要接收一系列字符串帧,请多次调用recvStr函数。

    String hello     = socket.recvStr();
    String beautiful = socket.recvStr();
    String world 	 = socket.recvStr();
    复制代码

    为了在一次调用中检索整个消息,请使用ZMsg类的静态recvMsg方法。

    ZMsg strings = ZMsg.recvMsg(socket);
    String hello     = strings.popString();
    String beautiful = strings.popString();
    String world     = strings.popString();
    复制代码

    4.Socket API

    套接字是用于网络编程的事实上的标准API。 这就是ZeroMQ提供熟悉的基于套接字的API的原因。使ZeroMQ特别受开发人员欢迎的一件事是,它使用不同的套接字类型来实现任何任意的消息传递模式。此外,ZeroMQ套接字提供了对基础网络协议的干净抽象,从而隐藏了那些协议的复杂性,并使它们之间的切换变得非常容易。

    与传统的套接字的主要区别

    一般而言,常规套接字提供了到面向连接的可靠字节流( SOCK_STREAM )或无连接不可靠数据报( SOCK_DGRAM )的同步接口。 相比之下,ZeroMQ套接字提供了异步消息队列的抽象,其确切的排队语义取决于所使用的套接字类型。传统的套接字传输字节流或离散数据报,而ZeroMQ套接字传输离散消息。

    ZeroMQ套接字是异步的,这意味着物理连接建立和拆除,重新连接以及有效交付的时间对用户是透明的,并且由ZeroMQ本身进行组织。 此外,在对等端不可接收消息的情况下,消息可以排队。

    传统套接字仅允许严格的一对一(两个对等方),多对一(许多客户端,一台服务器),或者在某些情况下允许一对多(多播)关系。 除PAIR套接字外,ZeroMQ套接字可以连接到多个端点,同时接受来自绑定到该套接字的多个端点的传入连接,从而允许多对多关系。

    Socket生命周期

    ZeroMQ套接字有四个部分,就像BSD套接字一样:

  • 创建和销毁套接字,它们一起形成了套接字寿命的循环
  • 通过在套接字上设置选项并在必要时进行检查来配置套接字
  • 通过创建与套接字之间的ZeroMQ连接,将套接字插入网络拓扑。
  • 使用套接字通过在套接字上写入和接收消息来承载数据
  • 绑定与连接

    使用ZeroMQ套接字,绑定和连接谁都没有关系。 在上面,您可能已经注意到服务器使用Bind而客户端使用Connect。 为什么会这样,有什么区别?

    ZeroMQ为每个基础连接创建队列。 如果您的套接字连接到三个对等套接字,则在后台有三个消息队列。

    使用Bind,您允许对等点连接到您,因此您不知道将来会有多少对等点,并且您不能预先创建队列。相反,队列是作为连接到绑定套接字的各个对等点创建的。

    借助Connect,ZeroMQ知道将至少有一个对等节点,因此它可以立即创建一个队列。 这适用于ROUTER以外的所有套接字类型,其中ROUTER仅在我们连接的对等方确认我们的连接之后才创建队列。

    因此,将消息发送到没有对等端的绑定套接字或没有实时连接的ROUTER时,没有队列可以存储消息。

    什么时候使用bind,什么时候使用connect?

    常规的,在你的架构中稳定的点上使用bind,在易失性端点的动态组件上使用connect.在request/reply模式中,服务提供者是你使用bind的节点,客户端是使用connect的节点。就像旧的普通TCP.

    如果您不确定哪个部分更稳定(即点对点),请考虑在中间安装一个稳定的设备,各方都可以连接到该设备。

    高水位标记是对指定套接字正在与之通信的任何单个对等方在内存中排队的ZeroMQ正在排队的未完成消息的最大数量的硬限制。

    如果达到此限制,套接字将进入异常状态,并且根据套接字类型,ZeroMQ将采取适当的措施,例如阻止或丢弃已发送的消息。 有关每种套接字类型所采取的确切操作的详细信息,请参阅下面的各个套接字说明。

    ZeroMQ套接字API的包装下面是消息传递模式的世界。 ZeroMQ模式由具有匹配类型的套接字对实现。

    内置的核心ZeroMQ模式是:

  • Request-reply:将一组客户端连接到一组服务。 这是一个远程过程调用和任务分配模式。
  • Pub-sub:将一组发布者连接到一组订阅者。 这是一种数据分发模式。
  • Pipeline:以扇出/扇入模式连接节点,该模式可以具有多个步骤和循环。 这是并行的任务分配和收集模式。
  • Exclusive pair:专门连接两个socket。这是一个进程中连接两个线程的模式,不要与普通的套接字对混淆。
  • 还有更多的ZeroMQ模式仍处于草案状态

  • Client-server:它允许单个ZeroMQ服务器与一个或多个ZeroMQ客户端对话。 客户端始终启动对话,此后,任何一个对等方都可以异步向另一方发送消息。
  • Radio-dish:它用于以扇出方式将数据从单个发布者分发到多个订阅者。
  • Request-reply pattern

    请求-应答模式旨在用于各种面向服务的体系结构。 它有两种基本类型:同步( REQ REP 套接字类型)和异步套接字类型( DEALER ROUTER 套接字类型),它们可以以多种方式混合使用。

    REQ socket

    客户端使用REQ套接字向服务发送请求并从服务接收答复。 此套接字类型仅允许sends和后续receive调用的交替序列。 一个 REQ 套接字可以连接到任意数量的 REP ROUTER 套接字。 发送的每个请求都在所有已连接的服务之间进行轮询,并且收到的每个答复都与上一个发出的请求匹配。 它设计用于简单的请求-答复模型,其中对失败的对等节点的可靠性不成问题。如果没有服务可用,则套接字上的任何发送操作都将阻塞,直到至少一项服务可用为止。 REQ 套接字不会丢弃任何消息。

    特征摘要:

    兼容的对等套接字 REP, ROUTER

    REP socket

    服务使用 REP 套接字从客户端接收请求并向其发送回复。 此套接字类型仅允许receive和后续send调用的交替序列。 从所有客户端接收到的每个请求都是公平排队的,并且每个发送的答复都将路由到发出最后一个请求的客户端。 如果原始请求者不再存在,则答复将被静默丢弃。

    特征摘要:

    兼容的对等套接字 REQ, DEALER

    DEALER socket

    DEALER 套接字类型与一组匿名对等方进行对话,并使用循环算法发送和接收消息。 只要不丢弃消息,它就是可靠的。 对于与 REP ROUTER 服务器通信的客户端, DEALER 可以作为 REQ 的异步替代。 DEALER 收到的消息将从所有连接的对等方公平排队。

    DEALER 套接字由于已达到所有对等方的最高水位而进入静音状态时,或者如果根本没有任何对等体,则套接字上的任何发送操作都将阻塞,直到静音状态结束或至少一个对等方可用发送为止; 消息不会被丢弃

    DEALER 套接字连接到 REP 套接字时,发送的消息必须包含一个空框架作为消息的第一部分(定界符),跟在一个或多个主体部分之后。

    特征摘要:

    兼容的对等套接字 ROUTER, REP, DEALER

    ROUTER socket

    ROUTER 套接字类型使用显式寻址与一组对等方对话,以便将每个传出消息发送到特定的对等方连接。 ROUTER REP 的异步替代品,通常用作与 DEALER 客户端进行通信的服务器的基础。

    当收到消息时,ROUTER套接字将在消息部分之前包含消息的始发对等方的路由ID,然后再将其传递给应用程序。 接收到的消息在所有连接的同级之间公平排队。 发送消息时,ROUTER套接字将删除消息的第一部分,并使用它来确定消息应路由到的对等方的路由ID。 如果对等点已不存在或不再存在,则该消息将被静默丢弃。

    当ROUTER套接字由于已达到所有对等方的高水位线而进入静音状态时,发送到该套接字的任何消息都将被丢弃,直到静音状态结束为止。 同样,任何路由到已达到单个高水位标记的对等方的消息也将被丢弃。

    当REQ套接字连接到ROUTER套接字时,除了始发对等方的路由ID外,每个收到的消息还应包含一个空的定界符消息部分。 因此,由应用程序看到的每个接收到的消息的整个结构变为:一个或多个路由ID部分,定界符部分,一个或多个主体部分。 向REQ套接字发送答复时,应用程序必须包括定界符部分。

    特征摘要:

    Compatible peer sockets DEALER, REQ, ROUTER

    主题(Topic)

    ZeroMQ使用多部分消息来传达主题信息。 主题使用字节数组表示,尽管您可以使用字符串并使用适当的文本编码。

    发布者必须在消息的第一帧中包含主题,在消息有效负载之前。 例如,要向状态主题的订户发布状态消息:

    //  Send a message on the 'status' topic
    pub.sendMore("Status");
    pub.send("All is well");
    复制代码
    //  Subscribe to the 'status'
    sub.subscribe("status");
    复制代码

    一个订户套接字可以具有多个订阅筛选器。

    使用前缀检查将消息的主题与订户的订阅主题进行比较。 也就是说,订阅 topic 的订户将收到带有topic的消息:

  • topic
  • topic/subtopic
  • topical
  • 但是,它不会接收带有主题的消息:

  • TOPIC(记住,这是一个字节比较)
  • 这种前缀匹配行为的一个结果是,您可以通过订阅一个空的主题字符串来接收所有发布的消息

    PUB socket

    发布者使用PUB套接字分发数据。 发送的消息以扇出方式分发给所有连接的对等方。 此套接字类型无法接收任何消息。

    当PUB套接字由于已达到订户的高水位线而进入静音状态时,将发送给有问题的订户的任何消息都将被丢弃,直到静音状态结束为止。 send函数永远不会阻塞此套接字类型。

    特征摘要:

    Compatible peer sockets SUB, XSUB

    XPUB socket

    PUB 相同,只不过你可以以传入消息的形式接收订阅。订阅消息是一个字节值为1(表示订阅)值为0(表示取消订阅)跟着=在订阅消息主体后面。也接收没有sub/unsub前缀的消息,但对订阅状态没有影响。

    特征摘要:

    Compatible peer sockets ZMQ_SUB, ZMQ_XSUB

    XSUB socket

    SUB 相同,只不过您是通过向套接字发送订阅消息来进行订阅的。订阅消息是字节1(订阅)或者0(取消订阅)在订阅主体之后。也可以发送不带sub/unsub前缀的消息,但对订阅状态没有影响。

    特征摘要:

    Compatible peer sockets ZMQ_PUB, ZMQ_XPUB

    Pipeline pattern

    管道模式用于任务分配,通常在一个多阶管道中,一个或几个节点将工作推给许多工作节点,工作节点再将结果推给一个或几个收集器。该模式是最可靠的,因为它不会丢弃消息,除非节点意外断开连接。它是可伸缩的,节点可以在任何时候连接。

    ZeroMQ通过以下两种套接字类型支持流水线:

  • PUSH Socket Type
  • PULL Socket Type
  • PUSH socket

    PUSH套接字类型与一组匿名PULL对等方对话,使用循环算法发送消息。 此套接字类型未实现接收操作。

    当PUSH套接字由于已达到所有下游节点的高水位线而进入静音状态时,或者如果根本没有下游节点,则套接字上的任何发送操作将阻塞,直到静音状态结束或至少一个下游 节点可用于发送; 消息不会被丢弃。

    特征摘要:

    Compatible peer sockets

    Exclusive pair pattern

    PAIR不是通用套接字,而是用于两个对等体系结构稳定的特定用例。 这通常将PAIR限制为在单个进程内用于线程间通信。

    PAIR socket

    PAIR类型的套接字只能一次连接到单个对等方。 对通过PAIR套接字发送的消息不执行消息路由或筛选。

    当PAIR套接字由于已达到所连接对等方的高水位线而进入静音状态时,或者如果未连接任何对等方,则套接字上的任何发送操作都将阻塞,直到对等方可用于发送; 消息不会被丢弃。

    尽管PAIR套接字可以在除inproc之外的其他传输上使用,但是它们无法自动重新连接,同时新的传入连接将被终止,而任何先前的连接(包括处于关闭状态的连接)都会使它们在大多数情况下不适合TCP

    5.客户端服务器模式实例:

    服务器端:

    package main
    import (
    	zmq "github.com/pebbe/zmq4"
    	"fmt"
    	"time"
    func main() {
    	//  Socket to talk to clients
    	responder, _ := zmq.NewSocket(zmq.REP)
    	defer responder.Close()
    	responder.Bind("tcp://*:5555")
    	for {
    		//  Wait for next request from client
    		msg, _ := responder.Recv(0)
    		fmt.Println("Received ", msg)
    		//  Do some 'work'
    		time.Sleep(time.Second)
    		//  Send reply back to client
    		reply := "World"
    		responder.Send(reply, 0)
    		fmt.Println("Sent ", reply)
    复制代码

    服务器创建一个响应类型的套接字,绑定5555端口,然后等待消息到来,可以看到,我们并没有进行任何配置,我们仅仅发送字符串。

    package main
    import (
    	zmq "github.com/pebbe/zmq4"
    	"fmt"
    func main() {
    	//  Socket to talk to server
    	fmt.Println("Connecting to hello world server...")
    	requester, _ := zmq.NewSocket(zmq.REQ)
    	defer requester.Close()
    	requester.Connect("tcp://localhost:5555")
    	for request_nbr := 0; request_nbr != 10; request_nbr++ {
    		// send hello
    		msg := fmt.Sprintf("Hello %d", request_nbr)
    		fmt.Println("Sending ", msg)
    		requester.Send(msg, 0)
    		// Wait for reply:
    		reply, _ := requester.Recv(0)
    		fmt.Println("Received ", reply)
    复制代码

    客户端创建一个请求类型的套接字,连接并开始发送消息。 发送和接收方法都被阻塞(默认情况下)。对于receive,它很简单:如果没有消息,方法将阻塞。 发送它比较复杂,取决于套接字类型。对于请求套接字,如果达到高水位或没有连接对等点,该方法将阻塞。

    分类:
    后端
    标签: