Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I have legacy system that send message to RabbitMQ. The system use only one queue : q.finance.invoice but it has two types of message, where the type of message is available on header.

The first type

Type : invoice.created
  "field_1" : "",
  "field_2" : "",

The second type

Type : invoice.paid
  "field_5" : "",
  "field_6" : "",

So now my consumer need to handle the message selectively based on data type. Spring has @RabbitHandler that possible to do this... IF the message is published by spring. I cannot use a @RabbitHandler annotation though. I think it because the @RabbitHandler is converting message based on __TypeId__ header that does not exists from legacy system.

How can I simulate this @RabbitHandler behaviour (taking data based on it's type)?

So I use @RabbitListener to consume message. But @RabbitListener is taking all types of message. Another reason we use @RabbitListener is because our error handler depends on Message and Channel The basic method signatue we have is like this:

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
   // convert message body JSON string to object
   // process it

I'm trying to do manual reject based on type, which works. But I'm sure it is not scalable when I have many listeners or queues

import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
    private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice created : {}", message);
            channel.basicReject(tag, true);
            return;
        log.info("[on Method] Listening invoice created : {}", message);
    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice paid : {}", message);
            channel.basicReject(tag, true);
            return;
        log.info("[on Method] Listening invoice paid : {}", message);

See, the point if when I have 4 messages (paid-paid-created-created), the listener can runs more than 4 times, because we cannot control who will take which message. So it can be like this for the listenInvoicePaid()

  • reject()
  • reject()
  • ack()
  • reject()
  • ack()
  • And the same way multiple rejects() before ack() can also be happen in listenInvoiceCreated()
    So in total I can have like 10 messages call or so, before all the message properly processed.

    Any suggestion to fix the code?

    You can add a MessagePostProcessor to the container factory's afterReceiveMessagePostProcessor property. In the post processor, you can examine the JSON body() and set the __TypeId__ header to the appropriate class name.

    See this answer for an example.

    Hi Gary, In JMS we have operations like jmsQueueTemplate.receiveSelected through which we can fetch only specific message from the queue , do we have something similar in RabbitMQ/AMQP library , I was looking into the APIs of RabbitTemplate but not able to find any equivalent method ? – Abdul Mohsin Jul 29, 2022 at 14:39

    Possible Implementation

    Here is the naive if-else way, thanks Mark. This is your suggestion (1st alternative). As for 2nd alternative, I can't do because publisher is legacy system which I don't have code

        @RabbitListener(queues = "q.finance.invoice")
        public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
                @Header("type") String type) throws IOException {
            if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
                log.info("Delegate to invoice paid handler");
            } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
                log.info("Delegate to invoice created handler");
            } else {
                log.info("Delegate to default handler");
    

    2nd implementation alternative
    Here is what I implement, thanks to Gary. I think this is cleaner approach. Next I only need to extract the message post processor to some other class for maintanability, so I won't clutter my @RabbitListener

    Configuration File

    import java.util.Optional;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import com.course.finance.message.invoice.InvoiceCreatedMessage;
    import com.course.finance.message.invoice.InvoicePaidMessage;
    @Configuration
    public class RabbitmqConfig {
        @Bean(name = "rabbitListenerContainerFactory")
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
                SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            configurer.configure(factory, connectionFactory);
            factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    var type = message.getMessageProperties().getHeaders().get("type").toString();
                    String typeId = null;
                    if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
                        typeId = InvoicePaidMessage.class.getName();
                    } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
                        typeId = InvoiceCreatedMessage.class.getName();
                    Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));
                    return message;
            return factory;
        @Bean
        Jackson2JsonMessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        @Bean
        RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
    

    Listener

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    import com.course.finance.message.invoice.InvoiceCreatedMessage;
    import com.course.finance.message.invoice.InvoicePaidMessage;
    @Service
    @RabbitListener(queues = "q.finance.invoice")
    public class InvoiceListener {
        private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
        @RabbitHandler
        public void listenInvoiceCreated(InvoiceCreatedMessage message) {
            log.info("Listening invoice created : {}", message);
        @RabbitHandler
        public void listenInvoicePaid(InvoicePaidMessage message) {
            log.info("Listening invoice paid : {}", message);
        @RabbitHandler(isDefault = true)
        public void listenDefault(Message message) {
            log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
    

    I haven't worked with spring integration of rabbit, but all-in-all an idea of having a single queue that handles different message types sound like something problematic:

    Many consumers will potentially get messages of types that they can't handle and will have to reject them, so that the message will get back to rabbit, and then again and again... The performance of all the cluster can aggravate because of this.

    So I think there are two paths you can follow:

  • Implement single listener that can handle two types of messages. No need to change Rabbit but can be a challenging refactoring on the java side.

  • Fortunately Rabbit MQ is very flexible when it comes to routing the messages. Configure exchange to route message of type A to queue A and message of type B to queue B based on routing key, header of whatever, there are different types of Exchanges in Rabbit and you'll find the best configuration that works for you for sure.

  • I personally would go with the second path.

    Well, this is a legacy app which I don't have source code, so exchange & routing is not possible. But I think the first idea should works, it's just some kind of naive code using if-else – Timothy Jun 20, 2019 at 10:10 As for the second idea, basically you don't need source code of the legacy app, you can do routing inside rabbit mq server itself (using its UI or command line)... – Mark Bramnik Jun 20, 2019 at 10:16

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.