一:前言概述
去年更新RabbitMQ系列文章讲解使用过程中重要知识点,示例都是采用原生Java客户端完成。实际开发过程中大部分使用者都是与Spring进行集成,通过SpringAMQP运用RabbitMQ。接下来几篇文章将会从用法、源码角度讲解SpringAMQP生产级别应用
二:调用论证
<!--定义消息转换器-->
<bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!--定义队列监听-->
<rabbit:listener-container connection-factory="providerConnection" message-converter="jackson2JsonMessageConverter">
<rabbit:listener id="baseConsumer" queues="baseQueue" ref="commonConsumer" method="consumerListener"/>
</rabbit:listener-container>
复制代码
@Component
public class CommonConsumer {
public void consumerListener(byte[] message){
System.out.println("=======使用字节数组进行处理============");
System.out.println(new String(message));
public void consumerListener(String message){
System.out.println("=======使用字符串进行处理============");
System.out.println(message);
public void consumerListener(Map message){
System.out.println("=======使用Map集合进行处理============");
System.out.println(message);
public void consumerListener(List<TestPojo> message){
System.out.println("=======使用List集合进行处理============");
System.out.println(message);
public void consumerListener(TestPojo message){
System.out.println("=======使用实体类集合进行处理============");
System.out.println(message);
复制代码
2.1 默认byte[]处理
SpringAMQP针对RabbitMQ中的消息封装为对象Message,当生产者未做任何处理时消费者将处理为byte[]进行消费
TestPojo messageBody = new TestPojo("zsl",23);
Gson gson = new Gson();
String bodyJson = gson.toJson(messageBody);
MessageProperties properties = new MessageProperties();
Message message = new Message(bodyJson.getBytes(),properties);
rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
return "success";
复制代码
2.2 优化为Map
经过研究发现当Message设置MessageProperties对象的
contentType
属性 为
application/json
后消费者这会将其处理为Map使用
TestPojo messageBody = new TestPojo("zsl",23);
Gson gson = new Gson();
String bodyJson = gson.toJson(messageBody);
MessageProperties properties = new MessageProperties();
// 定义ContentType为application/json消息消费处理为map
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(bodyJson.getBytes(),properties);
rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
return "success";
复制代码
2.3 优化为List
当消息体为当个对象时处理为Map没有问题,因为JSON数据转换的过程中对象都会被默认的映射为key-value形式的Map。如果消息是一个集合,自然会被处理为List
TestPojo messageBody1 = new TestPojo("zsl",23);
TestPojo messageBody2 = new TestPojo("zsl",23);
TestPojo messageBody3 = new TestPojo("zsl",23);
List<TestPojo> messageBodys = new ArrayList<>(3);
messageBodys.add(messageBody1);
messageBodys.add(messageBody2);
messageBodys.add(messageBody3);
Gson gson = new Gson();
String bodyJson = gson.toJson(messageBodys);
MessageProperties properties = new MessageProperties();
// 如果传入List消费将处理为List
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(bodyJson.getBytes(),properties);
rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
return "success";
复制代码
2.4 优化为对象
经过
数组 -> Map/List
处理对于开发者而言并没有丝毫意义,并不是理想的终极目标。就想在入参处通过对象处理消息体,网上很多文章包括官网教程都会告诉你有以下两个方法:
第一种方式就不再赘述,毕竟肯定在生产中是不可能使用上的,第二种方式首先需要更改Jackson2JsonMessageConvert如下配置:
<!--定义消息转换映射-->
<bean id="defaultJackson2JavaTypeMapper" class="org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper">
<property name="idClassMapping">
<entry key="TestPojo">
<value type="java.lang.Class">com.zsl.rabbitmq.pojo.TestPojo</value>
</entry>
</map>
</property>
</bean>
<!--定义消息转换器-->
<bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="javaTypeMapper" ref="defaultJackson2JavaTypeMapper"/>
</bean>
复制代码
然后在生产者发送消息时需要指定MessageProperties的Header属性
__TypeId__
为映射关系的key,如下所示:
TestPojo messageBody = new TestPojo("zsl",23);
Gson gson = new Gson();
String bodyJson = gson.toJson(messageBody);
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
// 定义__TypeId__映射关系消费处理为对象
properties.getHeaders().put("__TypeId__","TestPojo");
Message message = new Message(bodyJson.getBytes(),properties);
rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
return "success";
复制代码
三:源码论证
任何技术学习都离不开它的落脚点
源码
,接下来通过Jackson2JsonMessageConvert源码阅读弄清楚一下三个问题:
3.1 默认byte[]
看到Jackson2JsonMessageConvert的抽象类AbstractJackson2MessageConverter,其中有个方法叫
fromMessage
3.2 ContentType作用
3.3 TypeId 作用
当生产者消息中带有
TypeId__
继续debug跟进发现会进入源码204行分支,这个分支中就会获取到映射的对象Class实例,然后进行JSON的转换返回