Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I want to use ZeroMQ in a Python project for IPC. One process should receive control commands from other processes. Therefore I decided to follow the PUB/SUB example, but with switched roles.

One process uses zmq.SUB on the listener side,
other processes use zmq.PUB on the connector side.

But my problem is that not everything that was sent on the PUB side, was received on the SUB side.

Here is the code:

import zmq
import asyncio
IPC_SOCK = "/tmp/tmp.sock"
class DataObj:
    def __init__(self) -> None:
        self.value = 0
    def __str__(self) -> str:
        return f'DataObj.value: {self.value}'
async def server():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.bind(f'ipc://{IPC_SOCK}')
    socket.subscribe("")
    while True:
                obj = socket.recv_pyobj(flags=zmq.NOBLOCK)
                print(f'<-- {obj}')
                await asyncio.sleep(0.1)
            except zmq.Again:
            await asyncio.sleep(0.1)
async def client():
    print("Waiting for server to be come up")
    await asyncio.sleep(2)
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect(f'ipc://{IPC_SOCK}')
    data_obj = DataObj()
    data_obj.value = 42
    print("Sending object once")
    socket.send_pyobj(data_obj)
    print(f"--> {data_obj}")
    print("Send object --> Not Received!")
    print("Sending object twice")
    for i in range(2):
        data_obj.value += 1
        socket.send_pyobj(data_obj)
        print(f"--> {data_obj}")
        await asyncio.sleep(0.1)
    print("Send both objects --> Received only once")
async def main():
    t_server = asyncio.create_task(server())
    t_client = asyncio.create_task(client())
    await t_client
    await t_server
if __name__ == "__main__":
    asyncio.run(main())

Thats the output generated. (On my desktop linux (arch), and same on my Raspberry Pi 4 (raspbian))

Waiting for server to be come up
Sending object once
--> DataObj.value: 42
Send object --> Not Received!
Sending object twice
--> DataObj.value: 43
--> DataObj.value: 44
<-- DataObj.value: 44
Send both objects --> Received only once

Anyone any idea how to solve the problem? Switching the socket from ipc:// Transport Class to tcp:// Transport Class, doesn't make any difference. So I assume the problem is something else here.

Is it generally allowed to do pub on a connector, and sub on the listener side?

Q : "Is it generally allowed to do pub on a connector, and sub on the listener side?" A : Sure, it is. The devil is ( as always ) in details ( see comments below ). Many-PUB-one-SUBs makes sense only in cases, where SUB benefits from any subscription management features, otherwise its add-on costs are never paid back. Using other Scalable Archetypes may serve better, even using more than one link between any "Commander" and a poor "Private" (who has to execute all incoming orders ). Just imagine Many-PUSH-one-PULL-s ( with another back-channel to each of the PUSH-ers, w Identity – user3666197 May 18 at 14:34

There are several problems here...

  • You're creating multiple ZMQ contexts. You should only be creating a single context, and then all your tasks should allocate sockets from that context.

  • In your main task, you're first await-ing on the client, then on the server. That means that the server never runs until the client has stopped running, which is surely not what you intend!

  • ZeroMQ has support for asyncio, so if you're writing asyncio client/server tasks, you should probably use asyncio sockets as well.

  • You probably shouldn't be using PUB/SUB sockets. The pub/sub model is not reliable; unlike other socket types, zeromq will discard messages if the receiver is not up, if it can't keep up, etc. For what you're doing, you might be better off with REQ/REP sockets.

    I've modified your code, taking into account the above comments, to produce the following example (note that there are multiple clients to demonstrate that things work as expected in that situation):

    import zmq
    import zmq.asyncio
    import asyncio
    IPC_SOCK = "/tmp/tmp.sock"
    context = zmq.asyncio.Context()
    class DataObj:
        def __init__(self, value=0) -> None:
            self.value = value
        def __str__(self) -> str:
            return f"DataObj.value: {self.value}"
        def __add__(self, val):
            self.value += val
            return self
    async def server():
        socket = context.socket(zmq.REP)
        socket.bind(f"ipc://{IPC_SOCK}")
        print("Server is running")
        while True:
            obj = await socket.recv_pyobj()
            await socket.send(b"")
            print(f"<-- {obj}")
    async def client(id=0):
        socket = context.socket(zmq.REQ)
        socket.connect(f"ipc://{IPC_SOCK}")
        data_obj = DataObj((id * 100) + 42)
        print(f"client {id} sending object once")
        await socket.send_pyobj(data_obj)
        await socket.recv()
        print(f"{id} --> {data_obj}")
        print(f"client {id} sending object twice")
        for _ in range(2):
            data_obj += 1
            await socket.send_pyobj(data_obj)
            await socket.recv()
            print(f"{id} --> {data_obj}")
    async def main():
        await asyncio.gather(server(), client(1), client(2))
    if __name__ == "__main__":
        asyncio.run(main())
    

    Running this code produces:

    Server is running
    client 1 sending object once
    client 2 sending object once
    <-- DataObj.value: 142
    <-- DataObj.value: 242
    1 --> DataObj.value: 142
    client 1 sending object twice
    2 --> DataObj.value: 242
    client 2 sending object twice
    <-- DataObj.value: 143
    <-- DataObj.value: 243
    1 --> DataObj.value: 143
    2 --> DataObj.value: 243
    <-- DataObj.value: 144
    <-- DataObj.value: 244
    1 --> DataObj.value: 144
    2 --> DataObj.value: 244
                    Thanks for your explanation and correcting my source. I should have read more of the tutorial from the website before asking my question here. After reading more of the tutorial things are much clearer now. I just doesn't know about the sockets "superpower".   Using two concepts was just in my reduced example. In my real world application there were two complete different processes.
    – seho85
                    May 12 at 14:57
                    @seho85 After 12+ years in using ZeroMQ (since v2.1.11+) I would dare call PUB/SUB "unsafe" ( just due to known fact it may not deliver messages to subscribers who do not listen ), the less to recommend the most dangerous REQ/REP as a feasible substitute thereof. You should be warned, that REQ/REP-Archetype is for ages known to be sure to fall into unsalvagable mutual dead-lock, the only not known thing is when that will happen ( be it due to LoS or other reason of not keeping REQ-asked-REP-answered-REQ-asked-REP-amswered-REQ-... distributed-FSA twostep dance & both wait the other ... )
    – user3666197
                    May 18 at 14:22
                    @seho ... for more details on REQ/REP deadlocks and other distributed-computing use-cases of the scalable, Zero-Copy + ( almost ) Zero-Latency Signalling / Messaging metaplane that ZeroMQ can deliver, feel free to read stackoverflow.com/…
    – user3666197
                    May 18 at 14:27
            

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.

  •