说明
本文用示例介绍SpringBoot如何使用RabbitMQ。
官网
Spring AMQP 2.1.2.RELEASE 中文文档 - 1. Preface | Docs4dev
注解
@RabbitListener
用在方法上
当监听到队列中有消息时则会进行接收并处理,如果不存在,会报错。
@Component
public class Receiver {
@RabbitListener(queues = "hello")
public void process(String hello) {
System.out.println ("Receiver : " + hello);
}
}
@RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则报错)
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
value = @Queue(value = "consumer_queue",durable = "true"),
key = "key.#"
))
public void processMessage1(Message message) {
System.out.println(message);
}
用在类上
-
需配合 @RabbitHandler 注解一起使用
-
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型判断。
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}
@Payload 与 @Headers
其他网址
Spring AMQP 2.1.2.RELEASE 中文文档 - 3. Reference | Docs4dev
RabbitMQ(三) RabbitMQ高级整合应用 - 盲目的拾荒者
简介
@Headers 必须通过Map接收。
//@Header("amqp_receivedRoutingKey") String rk 直接获取header中某一个key
获得消息中的 body 与 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}
获取单个 Header 属性
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}
绑定
其他网址
SpringBoot 整合 rabbitmq
简介
1.创建交换机/队列/绑定 实例都有两种方式
交换机(下边两种方式等价):
ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true).build();
new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false)
队列(下边两种方式等价):
QueueBuilder.durable("Hi").build();
new Queue(QUEUE_HI, true)
绑定(下边两种方式等价):
注意:第一种的参数并不是字符串。
BindingBuilder.bind(helloQueue).to(welcomExchange).with("hello.#")
new Binding("Queue@hello", Binding.DestinationType.QUEUE,
"Exchange@topic.welcome", "hello.#", null)
推荐程度依次递减。
法1:通过配置类配置(简洁)
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitRouterConfig {
public static final String QUEUE_HELLO = "Queue@hello";
public static final String QUEUE_HI = "Queue@hi";
public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
public static final String ROUTINGKEY_HELLOS = "hello.#";
@Autowired
AmqpAdmin amqpAdmin;
@Bean
Object initBindingTest() {
amqpAdmin.declareExchange(new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false));
amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
return new Object();
}
}
amqpAdmin.declareBinding
需要一个Binding对象作为参数
-
exchange:交换器名称
-
type:交换器类型。BuiltinExchangeType枚举类,有以下4中类型交换器:DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”)
-
durable:设置是否持久化。true:持久化,false:非持久化。持久化可以将交换器存盘,在服务器重启时不会丢失相关消息。
-
autoDelete:设置是否自动删除。true:自动删除,false:不自动删除。自动删除的前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此交换器解绑。
-
internal:设置是否内置的。true:内置交换器,false:非内置交换器。内置交换器,客户端无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
-
arguments:其他一些结构化参数。如备份交换器:alternate-exchange、超时时间。示例配置超时时间方法:
Map<String, Object> params = new HashMap();
params.put("x-message-ttl", 2000);
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, params));
法2:通过配置类配置(繁琐)
适用于队列和交换器不多时
package com.lly.order.message;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
public final static String QUEUE_DIRECT = "Queue@direct";
public final static String QUEUE_TOPIC_ONE = "Queue@topic_one";
public final static String TOPIC_QUEUE_TWO = "Queue@topic_two";
public final static String QUEUE_FANOUT_ONE = "Queue@fanout_one";
public final static String QUEUE_FANOUT_TWO = "Queue@fanout_two";
public final static String EXCHANGE_TOPIC = "Exchange@topic";
public final static String EXCHANGE_FANOUT = "Exchange@fanout";
public final static String ROUTINGKEY_TOPIC_ONE = "hello.key";
public final static String ROUTINGKEY_TOPIC_TWO = "*.key";
@Bean
public Queue directQueue() {
return new Queue(QUEUE_DIRECT, true);
}
@Bean
public Queue topicQueueOne() {
return new Queue(QUEUE_TOPIC_ONE, true);
}
@Bean
public Queue topicQueueTwo() {
return new Queue(TOPIC_QUEUE_TWO, true);
}
@Bean
public Queue fanoutQueueOne() {
return new Queue(QUEUE_FANOUT_ONE, true);
}
@Bean
public Queue fanoutQueueTwo() {
return new Queue(QUEUE_FANOUT_TWO, true);
}
@Bean
public TopicExchange topExchange() {
return new TopicExchange(EXCHANGE_TOPIC);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_FANOUT);
}
@Bean
public Binding topicExchangeBingingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(ROUTINGKEY_TOPIC_ONE);
}
@Bean
public Binding topicExchangeBingingTwo() {
return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(ROUTINGKEY_TOPIC_TWO);
}
@Bean
public Binding fanoutExchangeBingingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
@Bean
public Binding fanoutExchangeBingingTwo() {
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
}
法3:通过代码配置
@Component
public class Receiver {
@RabbitListener(queues = "hello")
public void process(String hello) {
System.out.println ("Receiver : " + hello);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "Exchange@topic.Hello",durable = "true",type = "topic"),
value = @Queue(value = "Queue@Hello",durable = "true"),
key = "key.#"
))
public void processMessage1(Message message) {
System.out.println(message);
}
}
法4:通过MQ网页配置
添加交换器:
http://localhost:15672/#/exchanges
//例如:Exchange@topic.Hello
添加队列:
http://localhost:15672/#/queues
//例如:Queue@Hello
交换器添加路由键:
http://localhost:15672/#/exchanges
=> 点击交换器名字=> Binding=> 添加队列与路由
配置
RabbitTemplate配置
实例
|
作用
|
rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
|
设置confirm回调函数。
|
rabbitTemplate.setReturnCallback(rabbitReturnCallback);
|
设置return回调函数。
|
rabbitTemplate.setMandatory(true);
|
当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。
|
rabbitTemplate.setUsePublisherConnection(true);
|
使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
|
实例
操作RabbitMQ
启动RabbitMQ
命令行运行:
rabbitmq-server.bat
//若是第一次开启,还要运行:rabbitmq-plugins.bat enable rabbitmq_management
配置一个管理员用户
用户名:admin 密码:123456
权限配置为:/
详见
:
RabbitMQ系列--综述_feiying0canglang的博客
代码
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
server:
# port: 9100
port: 9101
spring:
application:
# name: demo-rabbitmq-sender
name: demo-rabbitmq-receiver
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
# virtualHost: /
队列配置
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitRouterConfig {
public static final String QUEUE_HELLO = "Queue@hello";
public static final String QUEUE_HI = "Queue@hi";
public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
public static final String ROUTINGKEY_HELLOS = "hello.#";
@Autowired
AmqpAdmin amqpAdmin;
@Bean
Object initBindingTest() {
amqpAdmin.declareExchange(new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false));
amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
return new Object();
}
}
Controller
package com.example.controller;
import com.example.config.RabbitConfig;
import com.example.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@RestController
public class HelloController {
@Autowired
private Sender sender;
@PostMapping("/hi")
public void hi() {
sender.send(RabbitConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
}
@PostMapping("/hello1")
public void hello1() {
sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
}
@PostMapping("/hello2")
public void hello2() {
sender.send("Exchange@topic.welcome","hello.b", "hello2 message:" + LocalDateTime.now());
}
}
sender
package com.example.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String routingKey, String message) {
this.rabbitTemplate.convertAndSend(routingKey, message);
}
public void send(String exchange, String routingKey, String message) {
this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
receiver
package com.example.mq;
import com.example.config.RabbitRouterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
public void hello(String hello) {
System.out.println ("Receiver(hello) : " + hello);
}
@RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
public void hi(String payload) {
System.out.println ("Receiver(hi) : " + payload);
throw new RuntimeException("手动造的异常");
}
}
基础测试
运行sender
server:
port: 9100
spring:
application:
name: demo-rabbitmq-sender
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
运行receiver
server:
port: 9101
spring:
application:
name: demo-rabbitmq-receiver
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
MQ页面
测试只获得数据(payload)
postman访问:
Node Exporter
receiver打印结果
Receiver(hi) : hi message:2020-10-21T18:52:24.766
postman访问:
Node Exporter
receiver打印结果:无打印
即:若队列已经与交换器绑定,则必须指定交换器和路由键。
postman访问:
Node Exporter
receiver打印结果
Receiver(hello) : hello2 message:2020-10-21T18:52:41.434
测试详细数据
测试获得payload/message/channel
使用receiver的第二种代码。
postman访问:
Node Exporter
receiver打印结果
Receiver(hi):
payload:hi1 message:2020-10-22T15:41:11.796
message:(Body:'hi1 message:2020-10-22T15:41:11.796' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=Queue@hi, deliveryTag=1, consumerTag=amq.ctag-K0Yka5vHxrq6JzNXlg3ncQ, consumerQueue=Queue@hi])
channel:Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,1), conn: Proxy@4f4c88f9 Shared Rabbit Connection: SimpleConnection@7c52fc81 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 62067]
postman访问:
Node Exporter
receiver打印结果:无打印
即:若队列已经与交换器绑定,则必须指定交换器和路由键。
postman访问:
Node Exporter
receiver打印结果
Receiver(hello):
payload:hello2 message:2020-10-22T15:42:13.126
message:(Body:'hello2 message:2020-10-22T15:42:13.126' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=Exchange@topic.welcome, receivedRoutingKey=hello.b, deliveryTag=1, consumerTag=amq.ctag-yZryIR801zNV5f0f9mJd9g, consumerQueue=Queue@hello])
channel:Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,2), conn: Proxy@4f4c88f9 Shared Rabbit Connection: SimpleConnection@7c52fc81 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 62067]