RabbitMQ学习-----高级特性2
RabbitAdmin
注入spring容器中,可以很好的操控RabbitMQ.配置如下:
@Configuration
public class RabbitmqConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses("172.0.0.0:5672");
cachingConnectionFactory.setUsername("root");
cachingConnectionFactory.setPassword("root");
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
* rabbitAdmin操作RabbbitMQ
* @param connectionFactory
* @return
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//必须设置成true,否则spring容器不会加载rabbitmq
//rabbitadmin底层实现就是从spring 容器中获取Exchange/Binding/RountingKey以及Queue的@Bean声明
//然后使用RabbitTemplate的execute()方法执行对应的修改(添加一个交换机/删除一个绑定/清空一个队列)等一些列rabbitmq操作
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
- rabbitAdmin声明和绑定
public class TestController {
@Autowired
private RabbitAdmin rabbitAdmin;
// 每个模组都带一个测试服务 用于验证
public void test() {
//声明exchange
rabbitAdmin.declareExchange(new DirectExchange("test_direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test_topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test_fanout", false, false));
//声明queue
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
//声明队列绑定,将队列绑定到哪个exchange上
//绑定的要素:队列,exchange,routingKey
//两种绑定方式:new Binding()和BindingBuilder
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test_direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(BindingBuilder
.bind(new Queue("test.topic.queue", false)) //queue可以先声明,也可以在binding的时候声明
.to(new TopicExchange("test_topic", false, false))
.with("uer.#")); //指定路由key
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
消息模板 rabbitTemplate
在与spring整合时需要实例化,在和springboot整合时添加配置即可
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
//参数4可以对传进的消息进行加工设置
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
消息监听容器SimpleMessageListenerContainer
- 设置事务特性,事务管理器,事务属性,事务容量(并发),是否开启事务,回滚消息;
- 设置消费者数量,最小最大数量,批量消费;
- 设置消息确认模式,是否重回队列,异常捕获handler函数;
- 设置消费者标签生成策略,是否独占模式,消费者属性等;
- 设置监听器,消息转换器;
- 在运行中修改消费者数量的大小,接收消息模式等;
- 消息监听配置;
@Configuration
public class RabbitmqConfig {
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
@Bean
public TopicExchange exchange002() {
return new TopicExchange("topic002", true, false);
@Bean
public Queue queue002() {
return new Queue("queue002", true); //队列持久
@Bean
public Binding binding002() {
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
@Bean
public Queue queue003() {
return new Queue("queue003", true); //队列持久
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003());
container.setConcurrentConsumers(1); //消费者数量
container.setMaxConcurrentConsumers(5); //最大消费者数量
container.setDefaultRequeueRejected(false); //重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端标签策略
@Override
public String createConsumerTag(String queue) {
return queue+"_"+ UUID.randomUUID();
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//做消息的处理
String messageBody = new String(message.getBody());
System.out.println(messageBody);
return container;
}
消息监听适配器(messageListenerAdapter)
- 适配自定义的MessageDelegate类。我们就可以不采用监听的方式,采用适配的方式-----适配不同的消息处理方法
//自定义messageDelegate
public class MessageDelegate {
//自定义MessageListenerAdapter名称必须为handleMessage
public void handleMessage(byte[] messageBody) { //注意为字节数组
System.out.println("handleMessage默认方法,消息内容 String:" + new String(messageBody));
//rabbitmq配置
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(new Queue("test.direct.queue", false)); //设置监听队列
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false); //重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端标签策略
@Override
public String createConsumerTag(String queue) {
return queue+"_"+ UUID.randomUUID();
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//做消息的处理
String messageBody = new String(message.getBody());
System.out.println(messageBody);
//配置适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("handleMessage"); //将message传递给MessageDelegate中的方法
container.setMessageListener(adapter);
//全局的转换器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
return container;
}
消息转化messageConvert
messageListenerAdapter使用不匹配的listenerMethod时报异常,需要使用MessageConverter将消息进行转换
- 自定义方法名
//自定义MessageDelegate类
public class MessageDelegate {
public void consumeMessage(byte[] messageBody) {
System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
}
- TextMessageConverter转换器
//自定义MessageConverter
public class TextMessageConvert implements MessageConverter {
//java对象转化为message对象
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
//message对象转化为java对象
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contenType = message.getMessageProperties().getContentType();
if (StringUtils.isNotBlank(contenType) && contenType.contains("text")) {
return new String(message.getBody());
return message.getBody();
//修改RabbitMQConfig类
//配置适配器时添加messageconvert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("handleMessage"); //MessageDelegate中的方法
adapter.setMessageConverter(new TextMessageConvert());
container.setMessageListener(adapter);
QueueOrTagToMethodName
将队列名称和方法名称进行一一适配,不同的队列使用不同的方法进行监听
//修改RabbitMQConfig类
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); //核心读取map中的queue,分配给不同的method
container.setMessageListener(adapter);
//---------------------------------分割线--------------------------------------------//
//修改messageDelegate
public class MessageDelegate {
public void method1(String messageBody) {
System.err.println("method1 收到消息内容:" + new String(messageBody));
public void method2(String messageBody) {
System.err.println("method2 收到消息内容:" + new String(messageBody));
消息转换器MessageConverter
正常情况下,消息是以二进制的数据形式进行转换的,如果需要将二进制数据转化为其他形式,就需要使用到MessageConverter----------实现该接口,重写toMessage和fromMessage.
- Jackson2JsonMessageConverter(Java对象转换)
//修改RabbitMQConfig
// 1.1 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//重点,加入json格式的转换器 json对应Map对象
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
//----------------------------分割线---------------------------------------//
//修改messageListernContainer中的messageDelegater
public class MessageDelegate {
//json对应Map对象---该map接收json格式的数据
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息内容:" + messageBody);
//------------------------------------分割线:修改生产者----------------------//
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
- DefaultJackson2JavaTypeMapper(java对象映射)
- 多对象映射装换
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
//-----------------------------分割线----------------------------------------------//
//key表示标签 对应一个类的具体全路径。类和标签绑定之后,标签是order,意思就是转换成order类
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.cp.spring.entity.Order.class);
idClassMapping.put("packaged", com.cp.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
//一层套一层
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
public class MessageDelegate {
//json对应Map对象
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
public void consumeMessage(Packaged pack) {
System.err.println("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
- 全局转化
修改RabbitMQConfig类
//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的转换器:所有小的Converter都可以放到这个大的Converter中
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
//text走文本转换器
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
//json走json转换器
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
//图片走图片转换器
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
//pdf走pdf转换器
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
//--------------------------分割线---------------------imageConvert-----------------------//
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------Image MessageConverter----------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/010_test/" + fileName + "." + extName;
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {