一:前言概述

去年更新RabbitMQ系列文章讲解使用过程中重要知识点,示例都是采用原生Java客户端完成。实际开发过程中大部分使用者都是与Spring进行集成,通过SpringAMQP运用RabbitMQ。接下来几篇文章将会从用法、源码角度讲解SpringAMQP生产级别应用

二:调用论证

    <!--定义消息转换器-->
    <bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
    <!--定义队列监听-->
    <rabbit:listener-container connection-factory="providerConnection" message-converter="jackson2JsonMessageConverter">
        <rabbit:listener id="baseConsumer" queues="baseQueue" ref="commonConsumer" method="consumerListener"/>
    </rabbit:listener-container>
复制代码
@Component
public class CommonConsumer {
    public void consumerListener(byte[] message){
        System.out.println("=======使用字节数组进行处理============");
        System.out.println(new String(message));
    public void consumerListener(String message){
        System.out.println("=======使用字符串进行处理============");
        System.out.println(message);
    public void consumerListener(Map message){
        System.out.println("=======使用Map集合进行处理============");
        System.out.println(message);
    public void consumerListener(List<TestPojo> message){
        System.out.println("=======使用List集合进行处理============");
        System.out.println(message);
    public void consumerListener(TestPojo message){
        System.out.println("=======使用实体类集合进行处理============");
        System.out.println(message);
复制代码
2.1 默认byte[]处理

SpringAMQP针对RabbitMQ中的消息封装为对象Message,当生产者未做任何处理时消费者将处理为byte[]进行消费

        TestPojo messageBody = new TestPojo("zsl",23);
        Gson gson = new Gson();
        String bodyJson = gson.toJson(messageBody);
        MessageProperties properties = new MessageProperties();
        Message message = new Message(bodyJson.getBytes(),properties);
        rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
        return "success";
复制代码
2.2 优化为Map

经过研究发现当Message设置MessageProperties对象的 contentType 属性 为 application/json 后消费者这会将其处理为Map使用

        TestPojo messageBody = new TestPojo("zsl",23);
        Gson gson = new Gson();
        String bodyJson = gson.toJson(messageBody);
        MessageProperties properties = new MessageProperties();
        // 定义ContentType为application/json消息消费处理为map
        properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        Message message = new Message(bodyJson.getBytes(),properties);
        rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
        return "success";
复制代码
2.3 优化为List

当消息体为当个对象时处理为Map没有问题,因为JSON数据转换的过程中对象都会被默认的映射为key-value形式的Map。如果消息是一个集合,自然会被处理为List

        TestPojo messageBody1 = new TestPojo("zsl",23);
        TestPojo messageBody2 = new TestPojo("zsl",23);
        TestPojo messageBody3 = new TestPojo("zsl",23);
        List<TestPojo> messageBodys = new ArrayList<>(3);
        messageBodys.add(messageBody1);
        messageBodys.add(messageBody2);
        messageBodys.add(messageBody3);
        Gson gson = new Gson();
        String bodyJson = gson.toJson(messageBodys);
        MessageProperties properties = new MessageProperties();
        // 如果传入List消费将处理为List
        properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        Message message = new Message(bodyJson.getBytes(),properties);
        rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
        return "success";
复制代码
2.4 优化为对象

经过 数组 -> Map/List 处理对于开发者而言并没有丝毫意义,并不是理想的终极目标。就想在入参处通过对象处理消息体,网上很多文章包括官网教程都会告诉你有以下两个方法:

  • 生产者直接指定消费者项目中消息对应对象的完全路径:生产者与消费者耦合严重
  • 生产者配置映射关系,消费者根据映射关系转换:推荐,完全解耦
  • 第一种方式就不再赘述,毕竟肯定在生产中是不可能使用上的,第二种方式首先需要更改Jackson2JsonMessageConvert如下配置:

        <!--定义消息转换映射-->
        <bean id="defaultJackson2JavaTypeMapper" class="org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper">
            <property name="idClassMapping">
                    <entry key="TestPojo">
                        <value type="java.lang.Class">com.zsl.rabbitmq.pojo.TestPojo</value>
                    </entry>
                </map>
            </property>
        </bean>
        <!--定义消息转换器-->
        <bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
            <property name="javaTypeMapper" ref="defaultJackson2JavaTypeMapper"/>
        </bean>
    复制代码

    然后在生产者发送消息时需要指定MessageProperties的Header属性 __TypeId__ 为映射关系的key,如下所示:

            TestPojo messageBody = new TestPojo("zsl",23);
            Gson gson = new Gson();
            String bodyJson = gson.toJson(messageBody);
            MessageProperties properties = new MessageProperties();
            properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
            // 定义__TypeId__映射关系消费处理为对象
            properties.getHeaders().put("__TypeId__","TestPojo");
            Message message = new Message(bodyJson.getBytes(),properties);
            rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
            return "success";
    复制代码

    三:源码论证

    任何技术学习都离不开它的落脚点 源码 ,接下来通过Jackson2JsonMessageConvert源码阅读弄清楚一下三个问题:

  • 默认返回byte[]
  • ContentType作用
  • __TypeId__作用
  • 3.1 默认byte[]

    看到Jackson2JsonMessageConvert的抽象类AbstractJackson2MessageConverter,其中有个方法叫 fromMessage

  • 当contentType为null时直接取出Message的body返回,自然是byte[]
  • 3.2 ContentType作用
  • debug跟一下你会发现第二个判断条件就是在判断是否将其进行JSON处理,所以当contentType属性为设置为application/json时还是会直接返回byte数组
  • 3.3 TypeId 作用

    当生产者消息中带有 TypeId__ 继续debug跟进发现会进入源码204行分支,这个分支中就会获取到映射的对象Class实例,然后进行JSON的转换返回

    分类:
    后端
  •