Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I want to use ZeroMQ in a Python project for IPC. One process should receive control commands from other processes. Therefore I decided to follow the PUB/SUB example, but with switched roles.
One process uses
zmq.SUB
on the listener side,
other processes use
zmq.PUB
on the connector side.
But my problem is that not everything that was sent on the PUB side, was received on the SUB side.
Here is the code:
import zmq
import asyncio
IPC_SOCK = "/tmp/tmp.sock"
class DataObj:
def __init__(self) -> None:
self.value = 0
def __str__(self) -> str:
return f'DataObj.value: {self.value}'
async def server():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind(f'ipc://{IPC_SOCK}')
socket.subscribe("")
while True:
obj = socket.recv_pyobj(flags=zmq.NOBLOCK)
print(f'<-- {obj}')
await asyncio.sleep(0.1)
except zmq.Again:
await asyncio.sleep(0.1)
async def client():
print("Waiting for server to be come up")
await asyncio.sleep(2)
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect(f'ipc://{IPC_SOCK}')
data_obj = DataObj()
data_obj.value = 42
print("Sending object once")
socket.send_pyobj(data_obj)
print(f"--> {data_obj}")
print("Send object --> Not Received!")
print("Sending object twice")
for i in range(2):
data_obj.value += 1
socket.send_pyobj(data_obj)
print(f"--> {data_obj}")
await asyncio.sleep(0.1)
print("Send both objects --> Received only once")
async def main():
t_server = asyncio.create_task(server())
t_client = asyncio.create_task(client())
await t_client
await t_server
if __name__ == "__main__":
asyncio.run(main())
Thats the output generated. (On my desktop linux (arch), and same on my Raspberry Pi 4 (raspbian))
Waiting for server to be come up
Sending object once
--> DataObj.value: 42
Send object --> Not Received!
Sending object twice
--> DataObj.value: 43
--> DataObj.value: 44
<-- DataObj.value: 44
Send both objects --> Received only once
Anyone any idea how to solve the problem? Switching the socket from ipc://
Transport Class to tcp://
Transport Class, doesn't make any difference. So I assume the problem is something else here.
Is it generally allowed to do pub on a connector, and sub on the listener side?
–
There are several problems here...
You're creating multiple ZMQ contexts. You should only be creating a single context, and then all your tasks should allocate sockets from that context.
In your main
task, you're first await
-ing on the client, then on the server. That means that the server never runs until the client has stopped running, which is surely not what you intend!
ZeroMQ has support for asyncio, so if you're writing asyncio client/server tasks, you should probably use asyncio sockets as well.
You probably shouldn't be using PUB/SUB sockets. The pub/sub model is not reliable; unlike other socket types, zeromq will discard messages if the receiver is not up, if it can't keep up, etc. For what you're doing, you might be better off with REQ/REP sockets.
I've modified your code, taking into account the above comments, to produce the following example (note that there are multiple clients to demonstrate that things work as expected in that situation):
import zmq
import zmq.asyncio
import asyncio
IPC_SOCK = "/tmp/tmp.sock"
context = zmq.asyncio.Context()
class DataObj:
def __init__(self, value=0) -> None:
self.value = value
def __str__(self) -> str:
return f"DataObj.value: {self.value}"
def __add__(self, val):
self.value += val
return self
async def server():
socket = context.socket(zmq.REP)
socket.bind(f"ipc://{IPC_SOCK}")
print("Server is running")
while True:
obj = await socket.recv_pyobj()
await socket.send(b"")
print(f"<-- {obj}")
async def client(id=0):
socket = context.socket(zmq.REQ)
socket.connect(f"ipc://{IPC_SOCK}")
data_obj = DataObj((id * 100) + 42)
print(f"client {id} sending object once")
await socket.send_pyobj(data_obj)
await socket.recv()
print(f"{id} --> {data_obj}")
print(f"client {id} sending object twice")
for _ in range(2):
data_obj += 1
await socket.send_pyobj(data_obj)
await socket.recv()
print(f"{id} --> {data_obj}")
async def main():
await asyncio.gather(server(), client(1), client(2))
if __name__ == "__main__":
asyncio.run(main())
Running this code produces:
Server is running
client 1 sending object once
client 2 sending object once
<-- DataObj.value: 142
<-- DataObj.value: 242
1 --> DataObj.value: 142
client 1 sending object twice
2 --> DataObj.value: 242
client 2 sending object twice
<-- DataObj.value: 143
<-- DataObj.value: 243
1 --> DataObj.value: 143
2 --> DataObj.value: 243
<-- DataObj.value: 144
<-- DataObj.value: 244
1 --> DataObj.value: 144
2 --> DataObj.value: 244
–
–
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.