相关文章推荐
低调的红豆  ·  iOS ...·  1 年前    · 
迷茫的马克杯  ·  OCI runtime exec ...·  1 年前    · 
欢乐的铁链  ·  docker - Why error ...·  1 年前    · 
首发于 I am LiuShiYi

消息队列RabbitMQ-使用Python操作RabbitMQ

简单的消息队列

消息生产者向队列中发送message,消费者从消息队列中取出消息并且消费。

  • 生产者
# -*- coding=utf-8 -*-
import pika
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    channel.queue_declare(queue="hello")
    channel.basic_publish(
        exchange='',
        routing_key="hello",
        body="hello world"
    connection.close()
  • 消费者
# -*- coding=utf-8 -*-
import pika
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    channel.queue_declare(queue="hello")
    channel.basic_consume(queue='hello',
                          auto_ack=True,
                          on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
  • 消息确认

当客户端从队列中取出消息之后,可能需要一段时间才能处理完成,如果在这个过程中,客户端出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了,因为rabbitMQ默认会把消息标记为已经完成,然后从队列中移除。

消息确认是客户端从RabbitMQ中取出消息,并完成处理之后,会发送一个Ack信号通知RabbitMQ,消除处理完成。

当RabbitMQ收到客户端的获取消息请求之后,或标记为处理中,当再次收到Ack之后,才会标记为已完成,然后从队列中删除。当RabbitMQ检测到客户端和自己断开链接之后,还没收到Ack,则会重新将消息放回消息队列,交给下一个客户端处理,保证消息不丢失,也就是说,RabbitMQ给了客户端足够长的时间来做数据处理。

上述代码中, auto_ack=True 表示自动发送确认消息,即使消息被处理失败,消息也会被消费掉,即发生数据丢失的情况。

如果设置 auto_ack=False 则需要显式确认消息(在未发送确认信信号之前,消息一直存在)。

显示调用确认消息:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
        ch.basic_ack(delivery_tag = method.delivery_tag)  #发送ack消息

使用交换器

交换器主要负责从生产者那里接受push的消息,根据生产者的定义规则,投递到队列中,是生产者和队列的中间件。

使用fanout实现发布订阅者模型

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在

所有的相关队列中。

  • 生产者
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test123',type='fanout')  #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True)   #创建一个随机队列,并启用exchange
queue_name = rest.method.queue          #获取队列名
channel.queue_bind(exchange='test123',queue=queue_name)   #将随机队列名和exchange进行绑定
def callback(ch, method, properties, body):
    '''回调函数,处理从rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #开始监听 接受消息
  • 订阅者
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test123',type='fanout')  #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True)   #创建一个随机队列,并启用exchange
queue_name = rest.method.queue          #获取队列名
channel.queue_bind(exchange='test123',queue=queue_name)   #将随机队列名和exchange进行绑定
def callback(ch, method, properties, body):
    '''回调函数,处理从rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #开始监听 接受消息

注意:

需先定义订阅者,启动订阅者,否则发布者publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,msg是被丢弃的。

使用direct 实现根据关键字发布消息

消息发布订阅者模型是发布者发布一条消息,所有订阅者都可以收到。现在rabbitmq还支持根据关键字发送,在发送消息的时候使用routing_key参数指定关键字,rabbitmq的exchange会判断routing_key的值,然后只将消息转发至匹配的队列,注意,此时需要订阅者先创建队列。

配置参数为exchange的type=direct,然后定义routing_key即可。

  • 订阅者1:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test321',type='direct')  #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True)   #创建一个随机队列,并启用exchange
queue_name = rest.method.queue          #获取队列名
severities = ['error','warning','info']   #定义三个routing_key
for severity in severities:
    channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)
def callback(ch, method, properties, body):
    '''回调函数,处理从rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #开始监听 接受消息
  • 订阅者2:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test321',type='direct')  #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True)   #创建一个随机队列,并启用exchange
queue_name = rest.method.queue          #获取队列名
severities = ['error','warning']   #定义两个routing_key
for severity in severities:
    channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)
def callback(ch, method, properties, body):
    '''回调函数,处理从rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #开始监听 接受消息
  • 生产者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定义连接池
channel = connection.channel()          #声明队列以向其发送消息消息
channel.exchange_declare(exchange='test321',type='direct')
channel.basic_publish(exchange='test321', routing_key='info', body='info msg',properties=pika.BasicProperties(delivery_mode=2))  #发送info msg到 info routing_key
channel.basic_publish(exchange='test321', routing_key='error', body='error msg',properties=pika.BasicProperties(delivery_mode=2)) #发送error msg到 error routing_key
print('send success msg[] to rabbitmq')
connection.close()   #关闭连接**
  • 效果:

发现订阅者1和订阅者2都收到error消息,但是只有订阅者1收到了info消息

订阅者1:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'info msg'
 [x] Received b'error msg'
订阅者2:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'error msg'

使用topic实现模糊匹配发布消息

Direct实现了根据自定义的routing_key来标示不同的queue,使用topic可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

匹配规则为:

  • # 表示可以匹配0个或多个单词
  • * 表示只能匹配一个单词


  • 订阅者1:使用#匹配
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test333',type='topic')  #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True)   #创建一个随机队列,并启用exchange
queue_name = rest.method.queue          #获取队列名
channel.queue_bind(exchange='test333', routing_key='test.#',queue=queue_name)
def callback(ch, method, properties, body):
    '''回调函数,处理从rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #开始监听 接受消息
  • 订阅者2:使用*匹配
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='test333',type='topic')  #定义一个exchange ,类型为fanout
rest = channel.queue_declare(exclusive=True)   #创建一个随机队列,并启用exchange
queue_name = rest.method.queue          #获取队列名
channel.queue_bind(exchange='test333', routing_key='test.*',queue=queue_name)
def callback(ch, method, properties, body):
    '''回调函数,处理从rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #发送ack消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #开始监听 接受消息
  • 生产者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定义连接池
channel = connection.channel()          #声明队列以向其发送消息消息
channel.exchange_declare(exchange='test333',type='topic')
channel.basic_publish(exchange='test333', routing_key='test.123', body='test.123 msg',properties=pika.BasicProperties(delivery_mode=2))
channel.basic_publish(exchange='test333', routing_key='test.123.321', body=' test.123.321 msg',properties=pika.BasicProperties(delivery_mode=2))
print('send success msg[] to rabbitmq')
connection.close()   #关闭连接
  • 效果:
订阅者1:
 [*] Waiting for messages. To exit press CTRL+C