说明

本文用示例介绍SpringBoot如何使用RabbitMQ。

官网

​Spring AMQP 2.1.2.RELEASE 中文文档 - 1. Preface | Docs4dev​

注解

@RabbitListener

用在方法上

当监听到队列中有消息时则会进行接收并处理,如果不存在,会报错。

@Component
public class Receiver {
//也可监听多个队列:@RabbitListener(queues = {"hello", "hi"})
@RabbitListener(queues = "hello")
public void process(String hello) {
System.out.println ("Receiver : " + hello);
}
}

@RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则报错)

@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
value = @Queue(value = "consumer_queue",durable = "true"),
key = "key.#"
))
public void processMessage1(Message message) {
System.out.println(message);
}

用在类上

  1. 需配合 @RabbitHandler 注解一起使用
  2. @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型判断。
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}

@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}

@Payload 与 @Headers

其他网址

​Spring AMQP 2.1.2.RELEASE 中文文档 - 3. Reference | Docs4dev​

​RabbitMQ(三) RabbitMQ高级整合应用 - 盲目的拾荒者 ​

简介

@Headers 必须通过Map接收。

//@Header("amqp_receivedRoutingKey") String rk 直接获取header中某一个key

获得消息中的 body 与 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}

获取单个 Header 属性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}

绑定

其他网址

​SpringBoot 整合 rabbitmq​

简介

1.创建交换机/队列/绑定 实例都有两种方式

交换机(下边两种方式等价):

ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true).build();
new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false)

队列(下边两种方式等价):

QueueBuilder.durable("Hi").build();
new Queue(QUEUE_HI, true)

绑定(下边两种方式等价):

注意:第一种的参数并不是字符串。

BindingBuilder.bind(helloQueue).to(welcomExchange).with("hello.#")
new Binding("Queue@hello", Binding.DestinationType.QUEUE,
"Exchange@topic.welcome", "hello.#", null)

推荐程度依次递减。

法1:通过配置类配置(简洁)

package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRouterConfig {
public static final String QUEUE_HELLO = "Queue@hello";
public static final String QUEUE_HI = "Queue@hi";
public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
public static final String ROUTINGKEY_HELLOS = "hello.#";

@Autowired
AmqpAdmin amqpAdmin;

@Bean
Object initBindingTest() {
amqpAdmin.declareExchange(new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false));

amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));

amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));

return new Object();
}
}

amqpAdmin.declareBinding

需要一个Binding对象作为参数

  • exchange:交换器名称
  • type:交换器类型。BuiltinExchangeType枚举类,有以下4中类型交换器:DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”)
  • durable:设置是否持久化。true:持久化,false:非持久化。持久化可以将交换器存盘,在服务器重启时不会丢失相关消息。
  • autoDelete:设置是否自动删除。true:自动删除,false:不自动删除。自动删除的前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此交换器解绑。
  • internal:设置是否内置的。true:内置交换器,false:非内置交换器。内置交换器,客户端无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • arguments:其他一些结构化参数。如备份交换器:alternate-exchange、超时时间。示例配置超时时间方法:
Map<String, Object> params = new HashMap();
params.put("x-message-ttl", 2000);
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, params));

法2:通过配置类配置(繁琐)

适用于队列和交换器不多时

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
public final static String QUEUE_DIRECT = "Queue@direct";
public final static String QUEUE_TOPIC_ONE = "Queue@topic_one";
public final static String TOPIC_QUEUE_TWO = "Queue@topic_two";
public final static String QUEUE_FANOUT_ONE = "Queue@fanout_one";
public final static String QUEUE_FANOUT_TWO = "Queue@fanout_two";

public final static String EXCHANGE_TOPIC = "Exchange@topic";
public final static String EXCHANGE_FANOUT = "Exchange@fanout";

public final static String ROUTINGKEY_TOPIC_ONE = "hello.key";
public final static String ROUTINGKEY_TOPIC_TWO = "*.key";

// direct模式队列
@Bean
public Queue directQueue() {
return new Queue(QUEUE_DIRECT, true);
}

// topic 订阅者模式队列
@Bean
public Queue topicQueueOne() {
return new Queue(QUEUE_TOPIC_ONE, true);
}
@Bean
public Queue topicQueueTwo() {
return new Queue(TOPIC_QUEUE_TWO, true);
}

// fanout 广播者模式队列
@Bean
public Queue fanoutQueueOne() {
return new Queue(QUEUE_FANOUT_ONE, true);
}
@Bean
public Queue fanoutQueueTwo() {
return new Queue(QUEUE_FANOUT_TWO, true);
}

// topic 交换器
@Bean
public TopicExchange topExchange() {
return new TopicExchange(EXCHANGE_TOPIC);
}

// fanout 交换器
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_FANOUT);
}

// 订阅者模式绑定
@Bean
public Binding topicExchangeBingingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(ROUTINGKEY_TOPIC_ONE);
}

@Bean
public Binding topicExchangeBingingTwo() {
return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(ROUTINGKEY_TOPIC_TWO);
}

// 广播模式绑定
@Bean
public Binding fanoutExchangeBingingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
@Bean
public Binding fanoutExchangeBingingTwo() {
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
}

法3:通过代码配置

@Component
public class Receiver {
@RabbitListener(queues = "hello")
public void process(String hello) {
System.out.println ("Receiver : " + hello);
}

@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "Exchange@topic.Hello",durable = "true",type = "topic"),
value = @Queue(value = "Queue@Hello",durable = "true"),
key = "key.#"
))
public void processMessage1(Message message) {
System.out.println(message);
}
}

法4:通过MQ网页配置

添加交换器:​ ​http://localhost:15672/#/exchanges​ ​   //例如:Exchange@topic.Hello

添加队列:​ ​http://localhost:15672/#/queues​ ​            //例如:Queue@Hello

交换器添加路由键:​ ​http://localhost:15672/#/exchanges​ ​=> 点击交换器名字=> Binding=> 添加队列与路由

RabbitMQ--SpringBoot--整合/使用/实例_spring

RabbitMQ--SpringBoot--整合/使用/实例_java_02

配置

RabbitTemplate配置


实例



作用



rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);



设置confirm回调函数。



rabbitTemplate.setReturnCallback(rabbitReturnCallback);



设置return回调函数。



rabbitTemplate.setMandatory(true);



当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。



rabbitTemplate.setUsePublisherConnection(true);



使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞


实例

操作RabbitMQ

启动RabbitMQ

命令行运行:

rabbitmq-server.bat

//若是第一次开启,还要运行:rabbitmq-plugins.bat enable rabbitmq_management

配置一个管理员用户

用户名:admin   密码:123456

权限配置为:/

详见 :​ ​RabbitMQ系列--综述_feiying0canglang的博客​

代码

依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

server:
# port: 9100
port: 9101
spring:
application:
# name: demo-rabbitmq-sender
name: demo-rabbitmq-receiver
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
# virtualHost: /

队列配置

package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRouterConfig {
public static final String QUEUE_HELLO = "Queue@hello";
public static final String QUEUE_HI = "Queue@hi";
public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
public static final String ROUTINGKEY_HELLOS = "hello.#";

@Autowired
AmqpAdmin amqpAdmin;

@Bean
Object initBindingTest() {
amqpAdmin.declareExchange(new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false));

amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));

amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));

return new Object();
}
}

Controller

package com.example.controller;

import com.example.config.RabbitConfig;
import com.example.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@RestController
public class HelloController {
@Autowired
private Sender sender;

@PostMapping("/hi")
public void hi() {
sender.send(RabbitConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
}

@PostMapping("/hello1")
public void hello1() {
sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
}

@PostMapping("/hello2")
public void hello2() {
sender.send("Exchange@topic.welcome","hello.b", "hello2 message:" + LocalDateTime.now());
}
}

sender

package com.example.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void send(String routingKey, String message) {
this.rabbitTemplate.convertAndSend(routingKey, message);
}

public void send(String exchange, String routingKey, String message) {
this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}

receiver

package com.example.mq;

import com.example.config.RabbitRouterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
@RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
public void hello(String hello) {
System.out.println ("Receiver(hello) : " + hello);
}

@RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
public void hi(String payload) {
System.out.println ("Receiver(hi) : " + payload);
throw new RuntimeException("手动造的异常");
}

// @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
// public void helloAll(@Payload String payload, Message message, Channel channel) {
// System.out.println("Receiver(hello):");
// System.out.println("payload:" + payload);
// System.out.println("message:" + message);
// System.out.println("channel:" + channel);
// }
// @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
// public void hiAll(@Payload String payload, Message message, Channel channel) {
// System.out.println("Receiver(hi):");
// System.out.println("payload:" + payload);
// System.out.println("message:" + message);
// System.out.println("channel:" + channel);
// }
}

基础测试

运行sender

server:
port: 9100
# port: 9101
spring:
application:
name: demo-rabbitmq-sender
# name: demo-rabbitmq-receiver
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
# virtualHost: /

运行receiver

server:
# port: 9100
port: 9101
spring:
application:
# name: demo-rabbitmq-sender
name: demo-rabbitmq-receiver
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
# virtualHost: /

MQ页面

RabbitMQ--SpringBoot--整合/使用/实例_spring_03

RabbitMQ--SpringBoot--整合/使用/实例_持久化_04

RabbitMQ--SpringBoot--整合/使用/实例_spring_05

测试只获得数据(payload)

postman访问: ​Node Exporter​

receiver打印结果

Receiver(hi) : hi message:2020-10-21T18:52:24.766

postman访问: ​Node Exporter​

receiver打印结果:无打印

即:若队列已经与交换器绑定,则必须指定交换器和路由键。

postman访问: ​Node Exporter​

receiver打印结果

Receiver(hello) : hello2 message:2020-10-21T18:52:41.434

测试详细数据

测试获得payload/message/channel

使用receiver的第二种代码。

postman访问: ​Node Exporter​

receiver打印结果

Receiver(hi):
payload:hi1 message:2020-10-22T15:41:11.796
message:(Body:'hi1 message:2020-10-22T15:41:11.796' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=Queue@hi, deliveryTag=1, consumerTag=amq.ctag-K0Yka5vHxrq6JzNXlg3ncQ, consumerQueue=Queue@hi])
channel:Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,1), conn: Proxy@4f4c88f9 Shared Rabbit Connection: SimpleConnection@7c52fc81 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 62067]

postman访问: ​Node Exporter​

receiver打印结果:无打印

即:若队列已经与交换器绑定,则必须指定交换器和路由键。

postman访问: ​Node Exporter​

receiver打印结果

Receiver(hello):
payload:hello2 message:2020-10-22T15:42:13.126
message:(Body:'hello2 message:2020-10-22T15:42:13.126' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=Exchange@topic.welcome, receivedRoutingKey=hello.b, deliveryTag=1, consumerTag=amq.ctag-yZryIR801zNV5f0f9mJd9g, consumerQueue=Queue@hello])
channel:Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,2), conn: Proxy@4f4c88f9 Shared Rabbit Connection: SimpleConnection@7c52fc81 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 62067]