相关文章推荐
坏坏的猕猴桃  ·  Rxjava2 ...·  9 月前    · 
英姿勃勃的冲锋衣  ·  gradle - HttpClient ...·  1 年前    · 
风流倜傥的乒乓球  ·  使用itext ...·  1 年前    · 

I'm trying to get a Camel route JMS->HTTP4 with Transaction working but the message is not transferred to ActiveMQ.DLQ when an Exception and I can't see why.

The example below illustrates what could happen if the server of the REST service is down and route cannot be delivered.

I get the correct Exception :

2018-01-18 12:30:50:962-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN  o.a.c.s.s.TransactionErrorHandler - Transaction rollback (0x30a1c779) redelivered(false) for (MessageId: ID:MGR-MacBook-Pro.local-51837-1516262355358-4:2:1:1:16 on ExchangeId: ID-MGR-MacBook-Pro-local-1516275047663-0-1) caught: java.net.ConnectException: Cannot connect to CORE REST  
2018-01-18 12:30:50:965-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN  o.a.c.c.j.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.net.ConnectException: Cannot connect to CORE REST] 
org.apache.camel.RuntimeCamelException: java.net.ConnectException: Cannot connect to CORE REST …

But the message is consumed and removed from queue. My assumption was that using transaction/transacted Camel and AMQ would resolve this and move the message to ActiveMQ.DLQ.

I have read chapter 9 of Camel in Action 1st Ed. and googled but haven't found any solution to my problem.

I know I can create/define my own TransactionErrorHandler() and store messages in a queue of my choice but I was under the impression that this was default when using transacted…

I'm using a standalone ActiveMQ 5.15.2 vanilla installed and config.
Camel 2.20.1
Java 8_144 on MacOS 10.13.2

My config:

@Configuration
 public class Config {
     * The Camel context.
     final CamelContext camelContext;
     * The Broker url.
     @Value("${jms.broker.url}")
    private String brokerURL;
     * Instantiates a new Config.
     * @param camelContext   the sisyfos context
     * @param metricRegistry the metric registry
    @Autowired
    public Config(final CamelContext camelContext, final MetricRegistry metricRegistry) {
        this.camelContext = camelContext;
        this.metricRegistry = metricRegistry;
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(brokerURL);
        return activeMQConnectionFactory;
     * Pooled connection factory pooled connection factory.
     * @return the pooled connection factory
    @Bean
    @Primary
    public PooledConnectionFactory pooledConnectionFactory() {
        final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setMaxConnections(8);
        pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
        return pooledConnectionFactory;
     * Jms configuration jms configuration.
     * @return the jms configuration
    @Bean
    public JmsConfiguration jmsConfiguration() {
        final JmsConfiguration jmsConfiguration = new JmsConfiguration();
        jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
        jmsConfiguration.setTransacted(true);
        jmsConfiguration.setTransactionManager(transactionManager());
        jmsConfiguration.setConcurrentConsumers(10);
        return jmsConfiguration;
     * Transaction manager jms transaction manager.
     * @return the jms transaction manager
    @Bean
    public JmsTransactionManager transactionManager() {
        final JmsTransactionManager transactionManager = new JmsTransactionManager();
        transactionManager.setConnectionFactory(pooledConnectionFactory());
        return transactionManager;
     * Active mq component active mq component.
     * @return the active mq component
        @Bean
public ActiveMQComponent activeMQComponent(JmsConfiguration jmsConfiguration,
                                           PooledConnectionFactory pooledConnectionFactory,
                                           JmsTransactionManager transactionManager) {
    final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
    activeMQComponent.setConfiguration(jmsConfiguration);
    activeMQComponent.setTransacted(true);
    activeMQComponent.setUsePooledConnection(true);
    activeMQComponent.setConnectionFactory(pooledConnectionFactory);
    activeMQComponent.setTransactionManager(transactionManager);
    return activeMQComponent;

My Route:

    @Component
public class SendToCore extends SpringRouteBuilder {
    @Override
    public void configure() throws Exception {
        Logger.getLogger(SendToCore.class).info("Sending to CORE");
        //No retries if first fails due to connection error
        interceptSendToEndpoint("http4:*")
                .choice()
                .when(header("JMSRedelivered").isEqualTo("false"))
                .throwException(new ConnectException("Cannot connect to CORE REST"))
                .end();
        from("activemq:queue:myIncomingQueue")
                .transacted()
                .setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
                .to("http4:localhost/myRESTservice")
                .log("${header.CamelHttpResponseCode}")
                .end();

You might find redundant declarations in some of the beans and that is me trying to resolve the issue…

Adding a link to a Github repo of mine with a small test project illustrating this:
https://github.com/hakuseki/transacted

This is probably a problem of SpringBoot autoconfiguration.

If the messages get lost instead of going to the DLQ the ActiveMQ component of Camel does autocommit them instead of waiting until the work is done.

Update: Steps to make your example work with Java Config

Notice: my config does not have a transaction manager because it is not needed for your case. Instead just set in the ActiveMQComponent transacted to true and lazyCreateTransactionManager to false. Then you got a "local" transaction with your broker and that is all you need.

  • I removed the .transacted() from your route (needs a transaction manager, but is not needed to have a "JMS local-transacted" route)
  • I commented out your error handler in the route class (needs a transaction manager, you can use the default error handler)
  • Disable autoconfig for JMS and ActiveMQ in MainApplication: @SpringBootApplication(exclude = { JmsAutoConfiguration.class, ActiveMQAutoConfiguration.class})
  • Replace your Java config with the following one (adapted from this question: ConnectionFactory get destroyed before camel)
  • Java configuration:

    @Value("${jms.broker.url}") 
    String brokerURL;
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(brokerURL);
        return activeMQConnectionFactory;
    @Bean
    @Primary
    public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory cf) {
        final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setMaxConnections(1);
        pooledConnectionFactory.setConnectionFactory(cf);
        return pooledConnectionFactory;
    @Bean(name = "activemq")
    @ConditionalOnClass(ActiveMQComponent.class)
    public ActiveMQComponent activeMQComponent(ConnectionFactory connectionFactory) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
        activeMQComponent.setConnectionFactory(connectionFactory);
        activeMQComponent.setTransacted(true);
        activeMQComponent.setLazyCreateTransactionManager(false);
        return activeMQComponent;
    

    Finally, just to "run" the route, I added a small Camel Route test

    @RunWith(CamelSpringBootRunner.class)
    @SpringBootTest(classes = MainApplication.class)
    public class SampleCamelApplicationTest {
        @Produce(uri = "activemq:queue:myIncomingQueue")
        protected ProducerTemplate template;
        @Test
        public void shouldProduceMessages() throws Exception {
            template.sendBody("test");
            Thread.sleep(20000); //wait for ActiveMQ redeliveries
    

    If I run this test, the message is going to ActiveMQ.DLQ.

    Hope this helps

    Thx for your reply. I have tested it and it works. BUT I still cannot get it to work in Java DSL. Using the same config as your XML but in Java I get no DLQ and a missing message. The main difference as I see it is that there are some difference between Camel DefaultErrorHandler works when having XML based config than a Java DSL… – Mikael Grevsten Jan 22 '18 at 11:42 Just updated my answer. I found another question and I was able to adapt a workinng java config from it. – burki Jan 22 '18 at 12:46

    Just noticed that if you want Spring Boot to handle the lifecycle of those pool and configuration then you should not call their method directly, but let them be provided as parameters in the method signature

    eg this

    public ActiveMQComponent activeMQComponent() {
    

    Should be

     public ActiveMQComponent activeMQComponent(JmsConfiguration config, ConnectionFactory cf, ...) {
    

    Then Spring Boot will provides these beans to you.

    About why your transaction not works, then you can look at some of the transaction examples from the Camel in Action 2nd edition book: https://github.com/camelinaction/camelinaction2/tree/master/chapter12

    Changed the ActiveMQComponent according to your recommendation but still not able to rescue the message to a DLQ. Read all relevant examples but still not able to see what's wrong with my code… Any more ideas? – Mikael Grevsten Jan 18 '18 at 19:30 Have you tried with setting cacheLevelName=CACHE_CONSUMER. And also check your AMQ broker configuration what its DLQ settings may have been configured as. – Claus Ibsen Jan 19 '18 at 7:55 Thanks for the tip. Set the CacheLevel to CACHE_CONSUMER (3) with the same result. AMQ broker is vanilla, no changes to activemq.xml, on localhost – Mikael Grevsten Jan 19 '18 at 8:07 I wonder if some of the resources you inject via @Bean are not started, eg they have a lifecycle methods for start/stop that Spring should at some point invoke. However it may be the it has defaults for start/stop method naming. But maybe you can look into that – Claus Ibsen Jan 19 '18 at 8:15 OK, I added PreDestroy and PostConstruct on my involved beans with the same result… All looks well, correct Exception is thrown but the message is lost. – Mikael Grevsten Jan 19 '18 at 8:23

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.

    site design / logo © 2019 Stack Exchange Inc; user contributions licensed under cc by-sa 3.0 with attribution required. rev 2019.5.21.33788