相关文章推荐
温柔的铁板烧  ·  Exchange ...·  1 周前    · 
冷冷的滑板  ·  总溶解固体_百度百科·  1 年前    · 
傻傻的键盘  ·  凰女攻略漫画 - 抖音·  2 年前    · 
耍酷的柳树  ·  求诙谐幽默的言情小说,最好是穿越的,重生的, ...·  2 年前    · 
威武的炒粉  ·  《人生大事》登陆院线,首批影评出炉,朱一龙呼 ...·  2 年前    · 
huangchen1210  ·  武汉弘芯半导体公司怎么样? - 知乎·  5 年前    · 
Code  ›  02springboot整合RabbitMQ - 紫月java
rabbitmq 博客园 消息队列 exchange
https://www.cnblogs.com/ziyue7575/p/e7c7760d19558739257fbb795cf2cccf.html
长情的大熊猫
11 月前

SpringBoot整合RabbitMQ
消息发送方
配置
pom:RabbitMQ依赖
application.properties配置application.yml
config
发送简单消息
一对一
一对多发送
多对多发送
发送对象Object
Topic Exchange,根据routing_key发送
Fanout Exchange广播模式或者订阅模式
发送参数讲解
消息实体类
发送消息
测试
接收消息(consumer)
配置文件
entity/config
@RabbitListener消费端监听
接受消息
注解使用application参数
TOC

SpringBoot整合RabbitMQ

参考: https://blog.csdn.net/ztx114/article/details/78410727

  • 引入相关依赖
  • 对application.properties进行配置
  • 消息发送方

    pom:RabbitMQ依赖

    amqp依赖即可

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    application.properties配置application.yml

  • publisher-confirms ,实现一个监听器用于监听Broker端给我们返回的确认请求: RabbitTemplate.ConfirmCalback ;
  • publisher-returns ,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功: RabbitTemplate.ReturnCalback ;
  • 注意,在发送消息的时候对template进行配置 mandatory=true 保证监听有效
    生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等

    spring:
      rabbitmq: #配置RabbitMQ
        addresses: 192.168.221.128:5672 #地址
        username: guest
        password: guest
        virtual-host: /  #虚拟机路径,/是默认地址
        publisher-confirms: true
        #connection-timeout:15000 #连接超时时间
        #publisher-returns: true
        template:
          mandatory: true
      http:
        encoding:
          charset:UTF-8
      jackson:
        date-format:yyyy-wMM-dd HH:mm:ss
        time-zone:GMT+8
        default-property-inclusion:NON NULL
    server:
      port: 8001
      servlet:  #端口号
        context-path: /  #项目路径

    config

    @Configuration
    @ComponentScan({"com.bfxy.springboot.*"})//扫描所有的包
    public class MainConfig {}

    发送简单消息

    简单消息,队列必须对应,发送者和接收者的 queue name 必须一致,不然不能接收;
    消息只能被接收一次

    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue Queue() {
            return new Queue("hello");
    
    @Component
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
        public void send() {
            String context = "hello " + new Date();
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("hello", context);//发送消息
    
    @Component
    @RabbitListener(queues = "hello") //监听队列
    public class HelloReceiver {
        @RabbitHandler //接收消息
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
    

    一对多发送

    多个接受者,每条消息被其中一个接收(只能接受一次)
    比如,将一条消息发送10遍多个接收方的时候,会轮询的接收

            String context = "hello " + new Date();
            for (int i = 0; i < 10; i++) {
                this.rabbitTemplate.convertAndSend("hello", context+"    ");
    

    多对多发送

    和一对多一样,接收端仍然会均匀接收到消息

    发送对象Object

    与发送普通消息一样

    this.rabbitTemplate.convertAndSend("object", user);
    //接受者
    @RabbitHandler
    public void process(User user) {
        System.out.println("Receiver object : " + user);
    

    Topic Exchange,根据routing_key发送

    一个消息可以被设置了routing_key的队列接收,每个队列都可以接收

  • 设置queue,exchange,bingding
  • @Configuration
    public class TopicRabbitConfig {
        final static String message = "topic.receive1";
        final static String messages = "topic.receive2";
        @Bean
        public Queue queueMessage() {//设置queue
            return new Queue(TopicRabbitConfig.message);
        @Bean
        public Queue queueMessages() {//设置queue
            return new Queue(TopicRabbitConfig.messages);
        @Bean
        TopicExchange exchange() {//设置交换机
            return new TopicExchange("exchange");
        @Bean  //交换机设置bingding,设置路由key
        //queueMessage只匹配"topic.message"队列
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        @Bean //交换机设置bingding,设置路由key
        //queueMessages同时匹配两个队列(topic.#)
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    
     //发送消息的时候要设置具体的交换和路由key
    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);//两个队列都可以收到消息
    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);//只有queueMessages可以收到消息(topic.#)
    
        @RabbitListener(queues = "topic.receive1")
        @RabbitHandler
        public void process1(String hello) {
            System.out.println("Receiver1  : " + hello);
        @RabbitListener(queues = "topic.receive2")
        @RabbitHandler
        public void process2(String hello) {
            System.out.println("Receiver2  : " + hello);
    

    Fanout Exchange广播模式或者订阅模式

    Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

  • 设置队列,绑定交换机
  • @Configuration
    public class FanoutRabbitConfig {
         //设置了3个队列
        @Bean
        public Queue AMessage() {
            return new Queue("fanout.A");
        @Bean
        public Queue BMessage() {
            return new Queue("fanout.B");
        @Bean
        public Queue CMessage() {
            return new Queue("fanout.C");
     //设置了交换机
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
     //绑定(将这三个队列都绑定到这个交换机上)
        @Bean
        Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        @Bean
        Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        @Bean
        Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
    
    public void send() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    

    接收的时候,三个队列都会受到消息

    发送参数讲解

  • convertAndSend
    • (Object var1):消息体
    • (String var1, Object var2) 队列,消息体
    • (String var1, String var2, Object var3)
  • 消息实体类

    RabbitMQ发送对象必须序列化:

    package com.bfxy.springboot.entity;
    import java.io.Serializable;
    public class Order implements Serializable {
        private static final long serialVersionUID = 9111357402963030257L;
        private String id;
        private String name;
        private String messageId;//消息id
    
    import java.util.Map;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    import com.bfxy.springboot.entity.Order;
     * RabbitMQ发送消息
    @Component 
    public class RabbitSender {
        //自动注入RabbitTemplate模板类
        @Autowired 
        private RabbitTemplate rabbitTemplate;
        //回调函数: confirm确认     
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //ack是结果,true成功,false异常
            //cause是异常的时候返回的异常信息
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if (!ack) {//若是异常
                    System.err.println("异常处理....");
                //若是没有异常,一般接下来是修改数据库
        //回调函数: return返回  (路由成功,不会调用这个方法)   
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            //message,重试的错误码,重试提示,交换机,路由键
            @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        //发送消息方法调用: 构建Message消息
        public void send(Object message, Map < String, Object > properties) throws Exception {
            MessageHeaders mhs = new MessageHeaders(properties);//设置消息自定义参数
            Message msg = MessageBuilder.createMessage(message, mhs);//设置消息
            rabbitTemplate.setConfirmCallback(confirmCallback);//设置回调函数
            rabbitTemplate.setReturnCallback(returnCallback);//设置return回调方法
            //id + 时间戳 全局唯一 
            CorrelationData correlationData = new CorrelationData("1234567890");
            //发送消息,交换机,路由键,消息,消息id
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
        //发送消息方法调用: 构建自定义对象消息,直接发送entity消息
        public void sendOrder(Order order) throws Exception {
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一 
            CorrelationData correlationData = new CorrelationData("0987654321");
            rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
    
        @Autowired
        private RabbitSender rabbitSender;
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        @Test
        public void testSender1() throws Exception {
             Map<String, Object> properties = new HashMap<>();
             properties.put("number", "12345");
             properties.put("send_time", simpleDateFormat.format(new Date()));
             rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
        @Test
        public void testSender2() throws Exception {
             Order order = new Order("001", "第一个订单");
             rabbitSender.sendOrder(order);
    

    消息发送成功,返回确认时,只进入ConfirmCallback
    消息发送失败,返回确认时,先进入ReturnCallback,再进入ConfirmCallback

    接收消息(consumer)

    不建议使用事务

  • 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者再消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理
  • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
  • ## RabbitMQ连接配置(基本配置--必须的)
    spring.rabbitmq.addresses=192.168.0.105:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    # RabbitMQ消费配置(接受消息的配置--监听)
    # 基本并发:5
    spring.rabbitmq.listener.simple.concurrency=5
    # 最大并发:10
    spring.rabbitmq.listener.simple.max-concurrency=10
    # 签收模式:manual手动签收--auto自动签收
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    # 限流策略:同一时间只有1条消息发送过来消费(防止消息太多,内存溢出)
    spring.rabbitmq.listener.simple.prefetch=1
    # Server配置
    server.servlet.context-path=/
    server.port=8082
    #若是不用web项目,下方的不用配置
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=NON_NULL

    entity/config

    entity最好是与生产端一致的entity(上方的order)
    config:与发送端一致(包不一定)

    @RabbitListener消费端监听

    @Rabbitlistener 是一个组合注解,里面可以注解配置@QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等

    //com.rabbitmq.client.Channel;
    @Component
    public class OrderReceiver {
         * 接收消息
         *使用注解进行监听(绑定消息队列等)
         * @param order   消息体内容
         * @param headers 消息头内容
         * @param channel 网络信道
         * @throws Exception 异常
        /*key自懂创建
         @Queue绑定队列,durable持久化
         @Exchange绑定交换机,durable持久化,type = "topic"交换机类型
         key路由key
    ignoreDeclarationExceptions 支持过滤
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "order-queue",durable = "true"),
                exchange = @Exchange(name = "order-exchange",type = "topic",durable="true",ignoreDeclarationExceptions = "true"),
                key = "order.*"
        @RabbitHandler //接收消息
        //是将message拆开了
        //@Payload消息体,
        //Headers消息信息
        //由于上方设置手工签收,必须设置Channel(是RabbitMQ的那个)
        public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 消费者操作(收到消息的处理)
            System.out.println("收到消息:");
            System.out.println("订单信息:" + order.toString());
            // 手动签收消息
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                //通知RabbitMQ消息已签收
            //deliveryTag从头取
            //是否批量签收
            channel.basicAck(deliveryTag, false);
    
        @RabbitListener(.....)
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端Payload: " + message.getPayload());//消息体
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//唯一id
            //手工ACK
            channel.basicAck(deliveryTag, false);
    

    注解使用application参数

    //yaml
    spring.rabbitmq.listener.order.queue.name=queue-2
    spring.rabbitmq.listener.order.queue.durable=true
    spring.rabbitmq.listener.order.exchange.name=exchange-2
    spring.rabbitmq.listener.order.exchange.durable=true
    spring.rabbitmq.listener.order.exchange.type=topic
    spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
    spring.rabbitmq.listener.order.key=springboot.*
    //注解使用
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
            durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
            durable="${spring.rabbitmq.listener.order.exchange.durable}", 
            type= "${spring.rabbitmq.listener.order.exchange.type}", 
            ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
    

    此时,发送端运行一次发送消息,接收端会直接接收到:

    若是没有设置ACK,消息就不会自动签收,



    来自为知笔记(Wiz)


     
    推荐文章
    温柔的铁板烧  ·  Exchange Server中的通讯簿策略过程 | Microsoft Learn
    1 周前
    冷冷的滑板  ·  总溶解固体_百度百科
    1 年前
    傻傻的键盘  ·  凰女攻略漫画 - 抖音
    2 年前
    耍酷的柳树  ·  求诙谐幽默的言情小说,最好是穿越的,重生的,青梅竹马的(男主很腹黑)_百度知道
    2 年前
    威武的炒粉  ·  《人生大事》登陆院线,首批影评出炉,朱一龙呼声高涨|人生大事|朱一龙|殡葬_新浪新闻
    2 年前
    huangchen1210  ·  武汉弘芯半导体公司怎么样? - 知乎
    5 年前
    今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
    删除内容请联系邮箱 2879853325@qq.com
    Code - 代码工具平台
    © 2024 ~ 沪ICP备11025650号