Springboot2(28)集成rabbitmq实现延时消息
rabbitmq实现消息的确认机制和延时消息的发送
消息生产者代码实现的主要配置
@Configuration
@Slf4j
public class PrividerRabbitmqConfig {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 定制化amqp模版 可根据需要定制多个
*
*
* 此处为模版类定义 Jackson消息转换器
* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
* ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
*
* @return the amqp template
*/
@Bean
public AmqpTemplate amqpTemplate() {
//使用jackson 消息转换器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setEncoding("UTF-8");
//开启returncallback yml 需要 配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
http:// log.info ("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", replyCode, replyText, exchange, routingKey);
});
//消息确认 yml 需要配置 publisher-returns: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
} else {
http:// log.info ("消息发送到exchange失败,原因: {}", cause);
}
});
return rabbitTemplate;
}
/* --------------------------------Direct exchange test--------------------------------------------- */
/**
* 声明Direct交换机 支持持久化.
*
* @return the exchange
*/
@Bean("directExchange")
public Exchange directExchange() {
return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
}
/**
* 声明一个队列 支持持久化.
*
* @return the queue
*/
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable("DIRECT_QUEUE").build();
}
/**
* 通过绑定键 将指定队列绑定到一个指定的交换机 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
}
/* ----------------------------------------Fanout exchange test------------------------------------------ */
/**
* 声明 fanout 交换机.
*
* @return the exchange
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
}
/**
* Fanout queue A.
*
* @return the queue
*/
@Bean("fanoutQueueA")
public Queue fanoutQueueA() {
return QueueBuilder.durable("FANOUT_QUEUE_A").build();
}
/**
* Fanout queue B .
*
* @return the queue
*/
@Bean("fanoutQueueB")
public Queue fanoutQueueB() {
return QueueBuilder.durable("FANOUT_QUEUE_B").build();
}
/**
* 绑定队列A 到Fanout 交换机.
*
* @param queue the queue
* @param fanoutExchange the fanout exchange
* @return the binding
*/
@Bean
public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
/**
* 绑定队列B 到Fanout 交换机.
*
* @param queue the queue
* @param fanoutExchange the fanout exchange
* @return the binding
*/
@Bean
public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
/**
* 超时队列
* 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
* 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))
* 队列达到最大长度
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE");
arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY");
Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);
return queue;
}
@Bean
public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY2").noargs();
}
/**
* 优先级队列
* @return
*/
/* @Bean("priorityQueue")*/
public Queue priorityQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority",10); //队列的属性参数 有10个优先级别
Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);
return queue;
}
/* @Bean
public Binding priorityBinding(@Qualifier("priorityQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
}*/
}
消息消费者代码实现的主要配置
@Configuration
public class ConsumerRabbitmqConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}
}
1 发送确认
application.yml添加配置
spring.rabbitmq.publisher-confirms: true #将channel信道设置成confirm模式
设置消息确认会影响并发性能,导致消息掉失。因为每个connection最多支持2048个channel,当channel达到2048时,会报错org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
@Bean
public AmqpTemplate amqpTemplate() {
//使用jackson 消息转换器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setEncoding("UTF-8");
//开启returncallback yml 需要 配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
http:// log.info ("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", replyCode, replyText, exchange, routingKey);
});
//消息确认 yml 需要配置 publisher-returns: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
} else {
http:// log.info ("消息发送到exchange失败,原因: {}", cause);
}
});
return rabbitTemplate;
}
rabbitTemplate.setConfirmCallback()
消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
rabbitTemplate.setReturnCallback()
通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
同一个连接不同channel使用事务和发布确认
2 消费确认
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
RabbitListenerContainerFactory 中进行开启手动 ack
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}
确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
3 发送消息
CorrelationData correlationData = new CorrelationData( UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",new UserData(userName),correlationData);
4 超时队列
/**
* 超时队列
* 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
* 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))
* 队列达到最大长度
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE");
arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY");
Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);
return queue;
}
/**
* 发送超时消息
* @return
*/
@RequestMapping("/deadLetter")
@ResponseBody
public R deadLetterMsg(){
MessagePostProcessor processor = new MessagePostProcessor(){
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000" );//10秒后超时
message.getMessageProperties().setPriority(1);
return message;
}
};
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","DIRECT_ROUTING_KEY2", "333", processor);
return R.ok();
}
通过超时队列达到消息延时发送的效果。当消息发送到超时队列deadLetterQueue经过一定时间超时后,消息会路由到绑定的exchange(DIRECT_EXCHANGE)。而绑定exchange的队列会得到消息。
测试
请求地址: http:// 127.0.0.1:8080/deadLett er
消息发送
到收到消息