我刚刚开始使用ZMQ。我正在设计一个应用程序,它的工作流程是:
的服务器,该服务器永远在等待客户端PUSHes。当一个请求到来时,将为该特定请求派生一个工作进程。是的,工作进程可以存在,当进程完成它的任务时,它会将结果PUSHes到客户端。
我假设PUSH/PULL架构适用于此。请在这一点上纠正我。
但是我该如何处理这些场景呢?
那么如何在PUSH/PULL模型中设置类似 timeout 的东西呢?
:谢谢用户938949的建议,我得到了一个 working answer ,我正在将它分享给后人。
【玩转 GPU】有奖征文
精美礼品等你拿!
如果您使用的是>= 3.0,那么您可以设置RCVTIMEO套接字选项:
client_receiver.RCVTIMEO = 1000 # in milliseconds
但一般来说,您可以使用轮询器:
poller = zmq.Poller() poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send
poller.poll() 需要一个超时:
poller.poll()
evts = poller.poll(1000) # wait *up to* one second for a message to arrive.
如果没有要接收的内容,则 evts 将是一个空列表。
evts
您可以使用 zmq.POLLOUT 进行轮询,以检查发送是否成功。
zmq.POLLOUT
或者,要处理可能已失败的对等体的情况,请执行以下操作:
worker.send(msg, zmq.NOBLOCK)
可能足够了,它将始终立即返回-如果发送不能完成,则引发一个ZMQError(zmq.EAGAIN)。
这是在我参考了user938949的答案和 http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/ 之后,我做的一个 快速破解 。如果你做得更好,请发布你的答案, 我会推荐你的答案 。
对于那些想要 持久解决方案 on可靠性的人,请参阅 http://zguide.zeromq.org/page:all#toc64
zeromq (beta ATM) 3.0版支持ZMQ_RCVTIMEO和ZMQ_SNDTIMEO中的 timeout 。 http://api.zeromq.org/3-0:zmq-setsockopt
服务器
zmq.NOBLOCK确保当客户端不存在时,send()不会阻塞。
import time import zmq context = zmq.Context() ventilator_send = context.socket(zmq.PUSH) ventilator_send.bind("tcp://127.0.0.1:5557") while True: i=i+1 time.sleep(0.5) print ">>sending message ",i ventilator_send.send(repr(i),zmq.NOBLOCK) print " succeed" except: print " failed"
客户端
轮询器对象可以监听许多接收套接字(参见上面链接的“使用ZeroMQ的Python多处理”)。我只在 work_receiver .上链接了它在无限循环中,客户端以1000ms为间隔进行轮询。如果在该时间内未收到任何消息,则 socks 对象将返回空。
import time import zmq context = zmq.Context() work_receiver = context.socket(zmq.PULL) work_receiver.connect("tcp://127.0.0.1:5557") poller = zmq.Poller() poller.register(work_receiver, zmq.POLLIN) # Loop and accept messages from both channels, acting accordingly while True: socks = dict(poller.poll(1000)) if socks: if socks.get(work_receiver) == zmq.POLLIN: