The inbound channel adapter is implemented by the MqttPahoMessageDrivenChannelAdapter.
For convenience, you can configure it by using the namespace.
A minimal configuration might be as follows:
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
topics="bar,baz" (3)
qos="1,2" (4)
converter="myConverter" (5)
client-factory="clientFactory" (6)
send-timeout="123" (7)
error-channel="errors" (8)
recovery-interval="10000" (9)
manual-acks="false" (10)
channel="out" />
A comma-separated list of QoS values.
It can be a single value that is applied to all topics or a value for each topic (in which case, the lists must be the same length).
An MqttMessageConverter (optional).
By default, the default DefaultPahoMessageConverter produces a message with a String payload with the following headers:
mqtt_topic: The topic from which the message was received
mqtt_duplicate: true if the message is a duplicate
mqtt_qos: The quality of service
You can configure the DefaultPahoMessageConverter to return the raw byte[] in the payload by declaring it as a <bean/> and setting the payloadAsBytes property to true.
The send() timeout.
It applies only if the channel might block (such as a bounded QueueChannel that is currently full).
The error channel.
Downstream exceptions are sent to this channel, if supplied, in an ErrorMessage.
The payload is a MessagingException that contains the failed message and cause.
The recovery interval.
It controls the interval at which the adapter attempts to reconnect after a failure.
It defaults to 10000ms (ten seconds).
The acknowledgment mode; set to true for manual acknowledgment.
Starting with version 4.1, you can omit the URL.
Instead, you can provide the server URIs in the serverURIs property of the DefaultMqttPahoClientFactory.
Doing so enables, for example, connection to a highly available (HA) cluster.
Starting with version 4.2.2, an MqttSubscribedEvent is published when the adapter successfully subscribes to the topics.
MqttConnectionFailedEvent events are published when the connection or subscription fails.
These events can be received by a bean that implements ApplicationListener.
Also, a new property called recoveryInterval controls the interval at which the adapter attempts to reconnect after a failure.
It defaults to 10000ms (ten seconds).
Prior to version 4.2.3, the client always unsubscribed when the adapter was stopped.
This was incorrect because, if the client QOS is greater than 0, we need to keep the subscription active so that messages arriving
while the adapter is stopped are delivered on the next start.
This also requires setting the cleanSession property on the client factory to false.
It defaults to true.
Starting with version 4.2.3, the adapter does not unsubscribe (by default) if the cleanSession property is false.
This behavior can be overridden by setting the consumerCloseAction property on the factory.
It can have values: UNSUBSCRIBE_ALWAYS, UNSUBSCRIBE_NEVER, and UNSUBSCRIBE_CLEAN.
The latter (the default) unsubscribes only if the cleanSession property is true.
To revert to the pre-4.2.3 behavior, use UNSUBSCRIBE_ALWAYS.
Starting with version 5.0, the topic, qos, and retained properties are mapped to .RECEIVED_… headers (MqttHeaders.RECEIVED_TOPIC, MqttHeaders.RECEIVED_QOS, and MqttHeaders.RECEIVED_RETAINED), to avoid inadvertent propagation to an outbound message that (by default) uses the MqttHeaders.TOPIC, MqttHeaders.QOS, and MqttHeaders.RETAINED headers.
To determine the source of an event, use the following; you can check the bean name and/or the connect options (to access the server URIs etc).