首发于 IT技术

Springboot2(28)集成rabbitmq实现延时消息

rabbitmq实现消息的确认机制和延时消息的发送

消息生产者代码实现的主要配置

@Configuration

@Slf4j

public class PrividerRabbitmqConfig {

@Resource

private RabbitTemplate rabbitTemplate;

/**

* 定制化amqp模版 可根据需要定制多个

*

*

* 此处为模版类定义 Jackson消息转换器

* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack

* ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack

*

* @return the amqp template

*/

@Bean

public AmqpTemplate amqpTemplate() {

//使用jackson 消息转换器

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

rabbitTemplate.setEncoding("UTF-8");

//开启returncallback yml 需要 配置 publisher-returns: true

rabbitTemplate.setMandatory(true);

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

log.info ("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", replyCode, replyText, exchange, routingKey);

});

//消息确认 yml 需要配置 publisher-returns: true

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

log.debug("消息发送到exchange成功,id: {}", correlationData.getId());

} else {

log.info ("消息发送到exchange失败,原因: {}", cause);

}

});

return rabbitTemplate;

}

/* --------------------------------Direct exchange test--------------------------------------------- */

/**

* 声明Direct交换机 支持持久化.

*

* @return the exchange

*/

@Bean("directExchange")

public Exchange directExchange() {

return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();

}

/**

* 声明一个队列 支持持久化.

*

* @return the queue

*/

@Bean("directQueue")

public Queue directQueue() {

return QueueBuilder.durable("DIRECT_QUEUE").build();

}

/**

* 通过绑定键 将指定队列绑定到一个指定的交换机 .

*

* @param queue the queue

* @param exchange the exchange

* @return the binding

*/

@Bean

public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();

}

/* ----------------------------------------Fanout exchange test------------------------------------------ */

/**

* 声明 fanout 交换机.

*

* @return the exchange

*/

@Bean("fanoutExchange")

public FanoutExchange fanoutExchange() {

return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();

}

/**

* Fanout queue A.

*

* @return the queue

*/

@Bean("fanoutQueueA")

public Queue fanoutQueueA() {

return QueueBuilder.durable("FANOUT_QUEUE_A").build();

}

/**

* Fanout queue B .

*

* @return the queue

*/

@Bean("fanoutQueueB")

public Queue fanoutQueueB() {

return QueueBuilder.durable("FANOUT_QUEUE_B").build();

}

/**

* 绑定队列A 到Fanout 交换机.

*

* @param queue the queue

* @param fanoutExchange the fanout exchange

* @return the binding

*/

@Bean

public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {

return BindingBuilder.bind(queue).to(fanoutExchange);

}

/**

* 绑定队列B 到Fanout 交换机.

*

* @param queue the queue

* @param fanoutExchange the fanout exchange

* @return the binding

*/

@Bean

public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {

return BindingBuilder.bind(queue).to(fanoutExchange);

}

/**

* 超时队列

* 消息被拒绝(basic.reject/ basic.nack)并且requeue=false

* 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))

* 队列达到最大长度

* @return

*/

@Bean("deadLetterQueue")

public Queue deadLetterQueue() {

Map<String, Object> arguments = new HashMap<>();

arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE");

arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY");

Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);

return queue;

}

@Bean

public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY2").noargs();

}

/**

* 优先级队列

* @return

*/

/* @Bean("priorityQueue")*/

public Queue priorityQueue(){

Map<String, Object> arguments = new HashMap<>();

arguments.put("x-max-priority",10); //队列的属性参数 有10个优先级别

Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);

return queue;

}

/* @Bean

public Binding priorityBinding(@Qualifier("priorityQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();

}*/

}


消息消费者代码实现的主要配置

@Configuration

public class ConsumerRabbitmqConfig {

@Bean

public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack

return factory;

}

}


1 发送确认

application.yml添加配置

spring.rabbitmq.publisher-confirms: true #将channel信道设置成confirm模式


设置消息确认会影响并发性能,导致消息掉失。因为每个connection最多支持2048个channel,当channel达到2048时,会报错org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。

@Bean

public AmqpTemplate amqpTemplate() {

//使用jackson 消息转换器

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

rabbitTemplate.setEncoding("UTF-8");

//开启returncallback yml 需要 配置 publisher-returns: true

rabbitTemplate.setMandatory(true);

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

log.info ("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", replyCode, replyText, exchange, routingKey);

});

//消息确认 yml 需要配置 publisher-returns: true

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

log.debug("消息发送到exchange成功,id: {}", correlationData.getId());

} else {

log.info ("消息发送到exchange失败,原因: {}", cause);

}

});

return rabbitTemplate;

}


rabbitTemplate.setConfirmCallback()

消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中

rabbitTemplate.setReturnCallback()

通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调

同一个连接不同channel使用事务和发布确认

2 消费确认

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: manual


RabbitListenerContainerFactory 中进行开启手动 ack

@Bean

public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack

return factory;

}


确认消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);


3 发送消息

CorrelationData correlationData = new CorrelationData( UUID.randomUUID().toString());

rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",new UserData(userName),correlationData);


4 超时队列

/**

* 超时队列

* 消息被拒绝(basic.reject/ basic.nack)并且requeue=false

* 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))

* 队列达到最大长度

* @return

*/

@Bean("deadLetterQueue")

public Queue deadLetterQueue() {

Map<String, Object> arguments = new HashMap<>();

arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE");

arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY");

Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);

return queue;

}


/**

* 发送超时消息

* @return

*/

@RequestMapping("/deadLetter")

@ResponseBody

public R deadLetterMsg(){

MessagePostProcessor processor = new MessagePostProcessor(){

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setExpiration("10000" );//10秒后超时

message.getMessageProperties().setPriority(1);

return message;

}

};

rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","DIRECT_ROUTING_KEY2", "333", processor);

return R.ok();

}


通过超时队列达到消息延时发送的效果。当消息发送到超时队列deadLetterQueue经过一定时间超时后,消息会路由到绑定的exchange(DIRECT_EXCHANGE)。而绑定exchange的队列会得到消息。

测试

请求地址: 127.0.0.1:8080/deadLett

消息发送




到收到消息


发布于 2020-05-01 22:34

文章被以下专栏收录