相关文章推荐
大气的大蒜  ·  Django ...·  6 月前    · 
唠叨的桔子  ·  使用`scipy.interpolate.g ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I'm using spring integration and its support for MQTT; I saw the spring integration documentation and my simple test case is to publish a message on a MQTT topic. The Spring documentation is located here: http://docs.spring.io/spring-integration/reference/html/mqtt.html#_configuring_with_java_configuration_15

I'm using these versions:

  • spring 4.3.4
  • spring integration 4.3.5
  • I built this simple configuration class:

    @Configuration
    @IntegrationComponentScan
    public class CommunicationServerApplication
        @Bean
        public MqttPahoClientFactory mqttClientFactory()
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setServerURIs(mqttServerUris);
            if (StringUtils.hasText(mqttUsername) && StringUtils.hasText(mqttPassword))
                factory.setUserName(mqttUsername);
                factory.setPassword(mqttPassword);
            factory.setConnectionTimeout(mqttConnectionTimeout);
            factory.setKeepAliveInterval(mqttKeepAliveInterval);
            factory.setPersistence(new MqttDefaultFilePersistence(mqttPersistenceFileDirectory));
            return factory;
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel", autoStartup="true")
        public MessageHandler mqttOutbound()
            String clientId = mqttClientId;
            if( !StringUtils.hasText(clientId) )
                clientId = UUID.randomUUID().toString();
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(mqttTopic);
            if( mqttQos >= 0 && mqttQos <=2 )
                messageHandler.setDefaultQos(mqttQos);
            return messageHandler;
        @Bean
        public MessageChannel mqttOutboundChannel()
            DirectChannel dc = new DirectChannel();
            return dc;
        @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
        public interface MqttMsgproducer
            void sendToMqtt(String data);
    

    And then I used this simple test case:

    @ContextConfiguration(value ={ "classpath:app-ctx.xml"})
    @RunWith(SpringJUnit4ClassRunner.class)
    public class SimpleMqttTestSuite
        private static final Logger logger = LoggerFactory.getLogger(SimpleMqttTestSuite.class.getName());
        @Autowired
        private MqttMsgproducer sender;
        @Test
        public void startServerTest()
                sender.sendToMqtt("Hello");
            catch (Exception e)
                logger.error("Error", e);
    

    My app-ctx.xml is:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:oxm="http://www.springframework.org/schema/oxm"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd  
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd 
        http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd 
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
        <context:component-scan base-package="it.olegna.test.integration" />
        <context:property-placeholder location="classpath:configuration.properties"
            order="0" ignore-resource-not-found="true" ignore-unresolvable="true" />    
    </beans>
    

    Executing the simple test, I'm having this error:

    2016-12-20 10:46:33,889 49967 [nioEventLoopGroup-3-1] ERROR - Errore 
    org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.GenericApplicationContext@2e6a8155.mqttOutboundChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.4.RELEASE.jar:4.3.4.RELEASE]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    

    I can't figure what I'm missing in the configuration. Can anybody give to me a tip?

    thank you

    Angelo

    Not an answer to your question, but probably useful for some who search for the error message in the title: I had a similar error when following the MQTT example from the Spring 5 documentation. I had to replace adapter.setOutputChannel(mqttInputChannel()) with adapter.setOutputChannelName("mqttInputChannel") to make it work. – Henrik Solgaard Apr 18, 2018 at 21:42 @EnableIntegration fix has nothing to do with subscribing within a channel bean definition – Eduardo Mar 12, 2020 at 13:57 It is unusual (rare) to ever subscribe to a channel from user code; normally, the framework will automatically subscribe consumers when they are started (usually automatically). – Gary Russell Mar 13, 2020 at 13:35 @GaryRussell does the EnableIntegration Annotation need to go where my channels are defined? I'm still receiving the dispatcher error along with link this error sometimes, it seems to me they are related. – Beez Nov 3, 2020 at 21:41

    I solved my issue

    It was related to the fact that I built the Channel but now handler have been subscribed

    In my application class I did the following:

    @Bean
    public MessageChannel mqttOutboundChannel()
        DirectChannel dc = new DirectChannel();
        dc.subscribe(mqttOutbound());
        return dc;
    

    As you can see now I manually add subscribe the bean mqttOutbound (the message handler) to the Channel

    By doing in this way all works

    I hope this can help

    Angelo

    UPDATE AFTER Gary Russell ANSWER

    As suggested by Gary Russell I didn't subscribe to the Channel

    I added the annotation @EnableIntegration

    So my Application class now is the following:

    @Configuration
    @IntegrationComponentScan
    @EnableIntegration
    public class CommunicationServerApplication
        @Bean
        public MqttPahoClientFactory mqttClientFactory()
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setServerURIs(mqttServerUris);
            if (StringUtils.hasText(mqttUsername) && StringUtils.hasText(mqttPassword))
                factory.setUserName(mqttUsername);
                factory.setPassword(mqttPassword);
            factory.setConnectionTimeout(mqttConnectionTimeout);
            factory.setKeepAliveInterval(mqttKeepAliveInterval);
            factory.setPersistence(new MqttDefaultFilePersistence(mqttPersistenceFileDirectory));
            return factory;
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel", autoStartup="true")
        public MessageHandler mqttOutbound()
            String clientId = mqttClientId;
            if( !StringUtils.hasText(clientId) )
                clientId = UUID.randomUUID().toString();
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(mqttTopic);
            if( mqttQos >= 0 && mqttQos <=2 )
                messageHandler.setDefaultQos(mqttQos);
            return messageHandler;
        @Bean
        public MessageChannel mqttOutboundChannel()
            DirectChannel dc = new DirectChannel();
            return dc;
        @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
        public interface MqttMsgproducer
            void sendToMqtt(String data);
            

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.