Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I have legacy system that send message to RabbitMQ.
The system use only one queue :
q.finance.invoice
but it has two types of message, where the type of message is available on header.
The first type
Type : invoice.created
"field_1" : "",
"field_2" : "",
The second type
Type : invoice.paid
"field_5" : "",
"field_6" : "",
So now my consumer need to handle the message selectively based on data type.
Spring has @RabbitHandler that possible to do this... IF the message is published by spring.
I cannot use a @RabbitHandler annotation though.
I think it because the @RabbitHandler is converting message based on __TypeId__ header that does not exists from legacy system.
How can I simulate this @RabbitHandler behaviour (taking data based on it's type)?
So I use @RabbitListener to consume message.
But @RabbitListener is taking all types of message.
Another reason we use @RabbitListener is because our error handler depends on Message and Channel
The basic method signatue we have is like this:
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
// convert message body JSON string to object
// process it
I'm trying to do manual reject based on type, which works. But I'm sure it is not scalable when I have many listeners or queues
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice created : {}", message);
channel.basicReject(tag, true);
return;
log.info("[on Method] Listening invoice created : {}", message);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice paid : {}", message);
channel.basicReject(tag, true);
return;
log.info("[on Method] Listening invoice paid : {}", message);
See, the point if when I have 4 messages (paid-paid-created-created), the listener can runs more than 4 times, because we cannot control who will take which message. So it can be like this for the listenInvoicePaid()
reject()
reject()
ack()
reject()
ack()
And the same way multiple rejects() before ack() can also be happen in listenInvoiceCreated()
So in total I can have like 10 messages call or so, before all the message properly processed.
Any suggestion to fix the code?
You can add a MessagePostProcessor to the container factory's afterReceiveMessagePostProcessor property. In the post processor, you can examine the JSON body() and set the __TypeId__ header to the appropriate class name.
See this answer for an example.
–
Possible Implementation
Here is the naive if-else way, thanks Mark. This is your suggestion (1st alternative).
As for 2nd alternative, I can't do because publisher is legacy system which I don't have code
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header("type") String type) throws IOException {
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
log.info("Delegate to invoice paid handler");
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
log.info("Delegate to invoice created handler");
} else {
log.info("Delegate to default handler");
2nd implementation alternative
Here is what I implement, thanks to Gary. I think this is cleaner approach. Next I only need to extract the message post processor to some other class for maintanability, so I won't clutter my @RabbitListener
Configuration File
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Configuration
public class RabbitmqConfig {
@Bean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
var type = message.getMessageProperties().getHeaders().get("type").toString();
String typeId = null;
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
typeId = InvoicePaidMessage.class.getName();
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
typeId = InvoiceCreatedMessage.class.getName();
Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));
return message;
return factory;
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
Listener
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Service
@RabbitListener(queues = "q.finance.invoice")
public class InvoiceListener {
private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
@RabbitHandler
public void listenInvoiceCreated(InvoiceCreatedMessage message) {
log.info("Listening invoice created : {}", message);
@RabbitHandler
public void listenInvoicePaid(InvoicePaidMessage message) {
log.info("Listening invoice paid : {}", message);
@RabbitHandler(isDefault = true)
public void listenDefault(Message message) {
log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
I haven't worked with spring integration of rabbit, but all-in-all an idea of having a single queue that handles different message types sound like something problematic:
Many consumers will potentially get messages of types that they can't handle and will have to reject them, so that the message will get back to rabbit, and then again and again... The performance of all the cluster can aggravate because of this.
So I think there are two paths you can follow:
Implement single listener that can handle two types of messages. No need to change Rabbit but can be a challenging refactoring on the java side.
Fortunately Rabbit MQ is very flexible when it comes to routing the messages. Configure exchange to route message of type A to queue A and message of type B to queue B based on routing key, header of whatever, there are different types of Exchanges in Rabbit and you'll find the best configuration that works for you for sure.
I personally would go with the second path.
–
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.