消息生产者构造好 Message 之后,就会将 Message 发送到指定的 Exchange (交换机),再根据 Exchange 的类型及 routing-key 将消息路由到相应的 queue 中,最后被监听该 queue 的消费者消费,大致如下图:

不过每次发消息都要自己构造 Message 对象比较麻烦。Spring-AMQP 允许我们直接使用自定义的类,然后会利用指定好的 MessageConverter 将自定义的类转换为 Message 进行发送,在接收时也会利用 MessageConverter 将接收到的 Message 对象转成需要的对象。Spring-AMQP 提供了多种 MessageConverter,比如 SimpleMessageConverter,SerializerMessageConverter,Jackson2JsonMessageConverter,MarshallingMessageConverter等等,如果发送的消息对象不是 Message 实例,并且没有指定 MessageConverter 的话,默认用 SimpleMessageConverter。以上各种 MessageConverter 归根结底都是实现了 MessageConverter 接口,该接口只有两个方法:

这两个方法一个是在发送消息时将我们的消息对象转换成标准的 Message 对象,另一个是在接受消息时将 Message 对象转换为相应的对象。
比较常用的 Converter 就是 Jackson2JsonMessageConverter(以下简称 JsonMessageConverter),在发送消息时,它会先将自定义的消息类序列化成json格式,再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类。大致流程如下图:
流程2 流程2

不过使用 JsonMessageConverter 时有一个小问题,在 不对它进行任何改造的前提下 ,发送消息的类和接受消息的类必须是一样的,不仅是要里面的字段一样,类名一样,连类的包路径都要一样。

所以当系统1使用 JsonMessageConverter 发送消息类A给系统2时,系统2可以有如下几种方式来接收:

  • 1.依赖系统1的jar包,直接使用类A来接收
  • 2.不依赖系统1的jar包,自己建一个和A一模一样的类,连名称,包路径都一样
  • 3.负责监听 queue 的类实现 MessageListener 接口,直接接收 Message 类,再自己转换

上面三个方法都不是很好,按照正常的想法,我们肯定是期望系统2直接使用自己的类来接收就可以了,只要与A类的字段名一样即可。那有没有方法可以让系统2既不依赖无用的jar包,也不用建立个与自己命名规范不相符的类, 也无需自己转换呢?

要解决这个问题,就要先看看 JsonMessageConverter 是如何将 Message 进行反序列化的。
在 JsonMessageConverter 的 fromMessage 方法中有这么一段:

就是说默认情况下,JsonMessageConverter 使用的 ClassMapper 是 DefaultJackson2JavaTypeMapper ,在转换时通过 Message 的 Properties 来获取要转换的目标类的类型。通过 Debug 可以发现,目标类的类型是存储在 Message 的 Proterties 的 一个 headers 的 Map 中,Key 叫“__TypeId__”。所以只要想办法在传输消息时更改__TypeId__的值即可。

下面是解决办法,在消息的生产者端为 JsonMessageConverter, 设置一个自定义的 ClassMapper,重写 fromClass 方法,将 __TypeId__ 的值设为消费端用来接收的类的路径+名称。当然了,也可以在消费者端重写toClass方法,直接返回想要转换的目标类的类类型。两种选一种就可以。

消息队列算是各个系统间通信比较常见的方式了。我们公司用的是是基于 AMQP 协议的 RabbitMq。在 Spring-AMQP 中比较重要的类就是 Message,因为要发送的消息必须要构造成一个 Message 对象来进行传输。Message 对象包括两部分 Body 和 Properties,Body 就是真正要发送的消息内容,Properties 就是和消息相关的一些属性(消息头,
该示例通过 rabbitmq _delayed_ message _exchange 插件实现 自定义 延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置是否和你的环境一致 启动成功后访问 ip:port/mq/push(参数 msg 消息;delayTime 延时毫秒时间) 测试,在后端控制台 查看效果
RabbitMQ 提供了几种特性,牺牲了一点性能代价,提供了可靠性的保证。持久化当 RabbitMQ 退出时,默认会将消息和队列都清除,所以需要在第一次声明队列和发送消息时指定其持久化属性为true,这样 RabbitMQ 会将队列、消息和状态存到 RabbitMQ 本地的数据库,重启后会恢复。java: 注:当声明的队列已经存在时,尝试重新定义它的durable是不生效的。接收应答客户端接收消息的模式默认是自动应答,但是通过设置autoAck为false可以让客户端主动应答消息。当客户端拒绝此消息或者未应答便断开连接时,就会使得此消息重新入队(在版本2.7.0以前是到重新加入到队尾,2.7.0及以后是保留
1.依赖系统1的jar包,直接使用类A来接收 2.不依赖系统1的jar包,自己建一个和A一模一样的类,连名称,包路径都一样 3.负责监听 queue 的类实现 Message Listener 接口,直接接收 Message 类,再自己转 显然都不够好也不是自己想要的 在 Json Message Con...
AmqpTemplate还定义了几种发送和接收消息的方法,这些消息委托给 Message Converter Message Converter 为每个方向提供了一个方法:一个用于转换为消息,另一个用于从消息转化。注意,在转换为消息时,还可以提供对象之外的属性。对象参数通常对应于消息体。下面的清单显示了 Message Converter 接口定义: public interface Message ...
Message 对象发送消息 import org.springframework.amqp.core. Message ; import org.springframework.amqp.core. Message Properties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import
RabbitMQ ,接受函数是指用于接收和处理消息的函数。当一个消息被发送到 RabbitMQ 的一个队列时,消费者可以使用一个接受函数来订阅该队列,并接收并处理队列 的消息。接收函数通常会包含以下步骤: 1. 连接到 RabbitMQ 服务器 2. 声明要使用的队列 3. 定义接收消息的回调函数 4. 启动事件循环,等待消息到达并调用回调函数来处理它们 接收函数可以使用多种编程语言来实现,例如 Python、Java、C# 等等。通过使用接收函数,开发人员可以轻松地编写应用程序来处理 RabbitMQ 的消息,并将其作为一个分布式系统的一部分。