相关文章推荐
气宇轩昂的啄木鸟  ·  RabbitMQ消息处理避坑指南:为什么你的 ...·  1 周前    · 
坐怀不乱的绿茶  ·  RabbitMQ消息处理避坑指南:为什么你的 ...·  1 周前    · 
可爱的烤红薯  ·  【RabbitMQ】消息可靠性投递开发者社区·  1 周前    · 
怕考试的木耳  ·  软件分享库合集链接汇总推荐_蓝奏云软件分享链 ...·  10 月前    · 
精明的日记本  ·  江西省发布第三批非法集资严重失信人名单 ...·  1 年前    · 
奔放的梨子  ·  Creating Word ...·  2 年前    · 
奔跑的苦咖啡  ·  地藏菩萨本愿经讲记(第十三卷)·  2 年前    · 
愤怒的菠萝  ·  异兽魔都(林田球创作的系列漫画)_搜狗百科·  2 年前    · 
Code  ›  【RabbitMQ】消息可靠性投递开发者社区
info 消息队列 交换机 rabbitmq
https://cloud.tencent.com/developer/article/2357341
可爱的烤红薯
1 周前
后端码匠

【RabbitMQ】消息可靠性投递

原创
腾讯云
开发者社区
文档 建议反馈 控制台
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
发布
后端码匠
社区首页 > 专栏 > 【RabbitMQ】消息可靠性投递

【RabbitMQ】消息可靠性投递

原创
作者头像
后端码匠
修改 于 2023-11-12 15:59:09
修改 于 2023-11-12 15:59:09
547 0
举报
文章被收录于专栏: 后端码匠 后端码匠

RabbitMQ消息可靠性投递

什么是消息的可靠性投递?即保证消息百分百发送到消息队列中去,消息发送端需要接受到mq服务端接受到消息的确认应答。除此之外还应有完善的消息补偿机制,发送失败的消息可以再感知并二次处理。 生产者到交换机通过 confirmCallback , 交换机到队列通过 returnCallback

当前环境

RabbitMQConfig

代码语言: java
复制
package cn.com.codingce.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
     * 交换机名称
    public static final String EXCHANGE_NAME = "health_hra3_exchange";
     * 队列名称
    public static final String QUEUE = "health_hra3_queue";
    @Bean
    public Exchange healthHra3Exchange() {
        // 创建交换机,durable代表持久化,使用Bean注入
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    @Bean
    public Queue healthHra3Queue() {
        // 创建队列,使用Bean注入
        return QueueBuilder.durable(QUEUE).build();
     * 交换机和队列绑定关系
     * @param queue    上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
     * @param exchange 上面注入的交换机Bean
    @Bean
    public Binding healthHra3Binding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("health.#").noargs();
}

HealthHra3MQListener

代码语言: java
复制
package cn.com.codingce.listener;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE) // 监听的队列名称
public class HealthHra3MQListener {
     * RabbitHandler会自动匹配消息类型(消息自动确认)
     * @param msg     发送的是String类型,这里用String进行接收,RabbitHandler会自动进行匹配
     * @param message
     * @throws IOException
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        log.info("releaseCouponRecord into"); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
        long msgTag = message.getMessageProperties().getDeliveryTag();
        log.info("监听到消息:消息内容,msg={}", msg); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
        log.info("msgTag={}", msgTag); // msgTag=1
        log.info("message={}", message.toString()); // message=(Body:'新HRA3报告来啦!!' MessageProperties [headers={}, ……
}

yml

代码语言: yaml
复制
server:
  port: 9090
spring:
  application:
    # 微服务系统有意义, 养成好习惯, 先写出来
    name: rabbitmq-02-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /codingce
    # 新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated
  thymeleaf.cache: false

可靠性投递confirmCallback

confirmCallback是生产者到交换机,可以理解为确认消息是否发送成功。新版依赖可靠性投递默认是关闭的,使用以下方法开启:

代码语言: shell
复制
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated

yml

代码语言: yaml
复制
server:
  port: 9090
spring:
  application:
    # 微服务系统有意义, 养成好习惯, 先写出来
    name: rabbitmq-02-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /codingce
    #新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated
  thymeleaf.cache: false

编码实现confirmCallback

代码语言: java
复制
package cn.com.codingce.controller;
import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("api")
@Slf4j
public class SendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
     * 可靠性投递confirmCallback
     * @return
    @GetMapping("/confirmCallback")
    public R confirmCallback() {
        log.info("可靠性投递 confirmCallback");
          correlationData:配置
          ack:交换机是否收到消息,true是成功,false是失败
          cause:失败的原因
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("confirm==== ack={}", ack);
            log.info("confirm==== cause={}", cause);
            if (ack) {
                log.info("发送成功,{}", cause);
            } else {
                log.error("发送失败,{}", cause);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "health.new", "新HRA3报告来了!!");
        return R.ok();
    @GetMapping(value = "/default", produces = "text/html;charset=utf-8")
    public String getDefault() {
        return "队列服务运行正常...";
}

LOG

代码语言: shell
复制
2023-11-12 15:48:31.782  INFO 6840 --- [nio-9090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-11-12 15:48:31.782  INFO 6840 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-11-12 15:48:31.783  INFO 6840 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2023-11-12 15:48:42.146  INFO 6840 --- [nio-9090-exec-5] c.c.codingce.controller.SendController   : 可靠性投递 confirmCallback
2023-11-12 15:48:42.155  INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController   : confirm==== ack=true
2023-11-12 15:48:42.156  INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController   : confirm==== cause=null
2023-11-12 15:48:42.156  INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController   : 发送成功,null
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : releaseCouponRecord into
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : 监听到消息:消息内容,msg=新HRA3报告来了!!
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : msgTag=1
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : message=(Body:'新HRA3报告来了!!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=health_hra3_exchange, receivedRoutingKey=health.new, deliveryTag=1, consumerTag=amq.ctag-7HDzHnEZ0foZ_MrQmGqFYQ, consumerQueue=health_hra3_queue])

可靠性投递returnCallback

returnCallback交换机到队列,消息从交换器发送到对应队列失败时触发:

  • 第一步 开启returnCallback配置
代码语言: text
复制
spring.rabbitmq.publisher-returns=true #新版
  • 第二步 修改交换机投递到队列失败的策略
代码语言: text
复制
# 为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true

yml

代码语言: yaml
复制
server:
  port: 9090
spring:
  application:
    # 微服务系统有意义, 养成好习惯, 先写出来
    name: rabbitmq-02-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /codingce
    # 新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated
    #########################################################################################
    # 为true,则交换机处理消息到路由失败,则会返回给生产者
    publisher-returns: true
  thymeleaf.cache: false

编码实实现returnCallback

代码语言: java
复制
package cn.com.codingce.controller;
import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("api")
@Slf4j
public class SendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/returnCallback")
    public R returnCallback() {
        log.info("交换机到队列通过returnCallback 可靠性投递 returnCallback");
        // 为true,则交换机处理消息到路由失败,则会返回给生产者,开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            int code = returnedMessage.getReplyCode();
            log.info("returnCallback code={}", code);
            log.info("returnCallback returned={}", returnedMessage);
        // 这个routingKey是不存在的,它找不到这个路由,所以会出现异常从而触发上面的回调方法
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xxx.health.new", "新HRA3报告来了!!");
        return R.ok();
    @GetMapping(value = "/default", produces = "text/html;charset=utf-8")
    public String getDefault() {
        return "队列服务运行正常...";
 
推荐文章
气宇轩昂的啄木鸟  ·  RabbitMQ消息处理避坑指南:为什么你的@RabbitHandler不生效?常见配置错误排查
1 周前
坐怀不乱的绿茶  ·  RabbitMQ消息处理避坑指南:为什么你的@RabbitHandler总是不生效?
1 周前
可爱的烤红薯  ·  【RabbitMQ】消息可靠性投递开发者社区
1 周前
怕考试的木耳  ·  软件分享库合集链接汇总推荐_蓝奏云软件分享链接网站汇总手机 - 骑士助手
10 月前
精明的日记本  ·  江西省发布第三批非法集资严重失信人名单 _ 防范金融风险 _ 南昌县人民政府
1 年前
奔放的梨子  ·  Creating Word Application using Excel VBA: Run-time error '429': ActiveX component can't create obje
2 年前
奔跑的苦咖啡  ·  地藏菩萨本愿经讲记(第十三卷)
2 年前
愤怒的菠萝  ·  异兽魔都(林田球创作的系列漫画)_搜狗百科
2 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号