【RabbitMQ 服务器】
# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
Broker: 192.168.0.xx
virtual host: vhosttest
Exchange: exchangetest
Queue: queuetest
Routing key: rkeytest
【Python 环境】
OS: Windows 10
Python: 3.6.3 x64
pika: 0.11.2
【查看队列状态】
# 通过浏览器查看队列状态
http://192.168.0.xx:15672/api/queues/vhosttest/queuetest
# 通过命令行查看队列状态
curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | jq
# 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)
curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | \
jq '.messages'
【send.py】
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 发送方/生产者
import os, sys, time
import pika
def Main():
credentials = pika.PlainCredentials("test", "test")
parameters = pika.ConnectionParameters(host="192.168.0.xx",
virtual_host='vhosttest',
credentials=credentials)
connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ
channel = connection.channel() # 创建频道
queue = channel.queue_declare(queue='queuetest') # 声明或创建队列
while True: # 循环向队列中发送信息
message = time.strftime('%H:%M:%S', time.localtime())
channel.basic_publish(exchange='exchangetest',
routing_key='rkeytest',
body=message)
print('send message: %s' % message)
while True:
# 检查队列,以重新得到消息计数
queue = channel.queue_declare(queue='queuetest', passive=True)
queue.method.message_count 获取的为 ready 的消息数
截至 2018-03-06(pika 0.11.2)
walker 没找到利用 pika 获取 unack 或者 total 消息数的方法
messageCount = queue.method.message_count
print('messageCount: %d' % messageCount)
if messageCount < 100:
break
connection.sleep(1)
# 关闭连接
connection.close()
if __name__ == '__main__':
Main()
【recv.py - 版本1】
一个消费者
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 接收方/消费者
import os, sys, time
import pika
# 接收处理消息的回调函数
def ConsumerCallback (channel, method, properties, body):
print("Received %s" % body)
def Main():
credentials = pika.PlainCredentials("test", "test")
parameters = pika.ConnectionParameters(host="192.168.0.xx",
virtual_host='vhosttest',
credentials=credentials)
connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ
channel = connection.channel() # 创建频道
queue = channel.queue_declare(queue='queuetest') # 声明或创建队列
# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)
print('Wait Message ...')
channel.start_consuming()
if __name__ == '__main__':
Main()
【recv.py - 版本2】
利用多线程实现多个消费者同时消费
#encoding: utf-8
#author: walker
#date: 2018-03-9
#summary: 接收方/消费者
import os, sys, time
import pika
import threading
from queue import Queue
GlobalQueue = Queue(10)
class Consumer(threading.Thread):
def run(self):
while True:
task = GlobalQueue.get()
print('thread-%d,\ttask: %s' % (threading.get_ident(), task))
# 接收处理消息的回调函数
def ConsumerCallback (channel, method, properties, body):
# 将消息推入队列
GlobalQueue.put(body)
def Main():
credentials = pika.PlainCredentials("test", "test")
parameters = pika.ConnectionParameters(host="192.168.0.86",
virtual_host='vhosttest',
credentials=credentials)
connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ
channel = connection.channel() # 创建频道
channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 公平消费
# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)
print('Wait Message ...')
for i in range(3):
c = Consumer()
c.start()
channel.start_consuming() # 开始接收任务
if __name__ == '__main__':
Main()
【后记】
channel.basic_publish(exchange='',
routing_key='queuetest',
body=task)
【0.x 到 1.x 的迁移】
# 0.x 版本
pika.ConnectionParameters(host=Host,
virtual_host=VirtualHost,
credentials=pika.PlainCredentials(User, Pwd),
heartbeat_interval=0)
# 1.x 版本
pika.ConnectionParameters(host=MQHost,
virtual_host=MQVirtualHost,
credentials=credentials,
heartbeat=0)
# 0.x 版本
channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)
# 1.x 版本
channel.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=True)
# 0.x 版本
# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
channel.basic_consume(consumer_callback=ConsumerCallback, queue=MQQueueNode2Center, no_ack=False)
# 1.x 版本
channel.basic_consume(queue=MQQueueNode2Center, on_message_callback=ConsumerCallback, auto_ack=False)
【相关阅读】
***
walker
***