The inbound channel adapter is implemented by the MqttPahoMessageDrivenChannelAdapter
.
For convenience, you can configure it by using the namespace.
A minimal configuration might be as follows:
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
topics="bar,baz" (3)
qos="1,2" (4)
converter="myConverter" (5)
client-factory="clientFactory" (6)
send-timeout="123" (7)
error-channel="errors" (8)
recovery-interval="10000" (9)
manual-acks="false" (10)
channel="out" />
A comma-separated list of QoS values.
It can be a single value that is applied to all topics or a value for each topic (in which case, the lists must be the same length).
An MqttMessageConverter
(optional).
By default, the default DefaultPahoMessageConverter
produces a message with a String
payload with the following headers:
mqtt_topic
: The topic from which the message was received
mqtt_duplicate
: true
if the message is a duplicate
mqtt_qos
: The quality of service
You can configure the DefaultPahoMessageConverter
to return the raw byte[]
in the payload by declaring it as a <bean/>
and setting the payloadAsBytes
property to true
.
The send()
timeout.
It applies only if the channel might block (such as a bounded QueueChannel
that is currently full).
The error channel.
Downstream exceptions are sent to this channel, if supplied, in an ErrorMessage
.
The payload is a MessagingException
that contains the failed message and cause.
The recovery interval.
It controls the interval at which the adapter attempts to reconnect after a failure.
It defaults to 10000ms
(ten seconds).
The acknowledgment mode; set to true for manual acknowledgment.
Starting with version 4.1, you can omit the URL.
Instead, you can provide the server URIs in the serverURIs
property of the DefaultMqttPahoClientFactory
.
Doing so enables, for example, connection to a highly available (HA) cluster.
Starting with version 4.2.2, an MqttSubscribedEvent
is published when the adapter successfully subscribes to the topics.
MqttConnectionFailedEvent
events are published when the connection or subscription fails.
These events can be received by a bean that implements ApplicationListener
.
Also, a new property called recoveryInterval
controls the interval at which the adapter attempts to reconnect after a failure.
It defaults to 10000ms
(ten seconds).
Prior to version 4.2.3, the client always unsubscribed when the adapter was stopped.
This was incorrect because, if the client QOS is greater than 0, we need to keep the subscription active so that messages arriving
while the adapter is stopped are delivered on the next start.
This also requires setting the cleanSession
property on the client factory to false
.
It defaults to true
.
Starting with version 4.2.3, the adapter does not unsubscribe (by default) if the cleanSession
property is false
.
This behavior can be overridden by setting the consumerCloseAction
property on the factory.
It can have values: UNSUBSCRIBE_ALWAYS
, UNSUBSCRIBE_NEVER
, and UNSUBSCRIBE_CLEAN
.
The latter (the default) unsubscribes only if the cleanSession
property is true
.
To revert to the pre-4.2.3 behavior, use UNSUBSCRIBE_ALWAYS
.
Starting with version 5.0, the topic
, qos
, and retained
properties are mapped to .RECEIVED_…
headers (MqttHeaders.RECEIVED_TOPIC
, MqttHeaders.RECEIVED_QOS
, and MqttHeaders.RECEIVED_RETAINED
), to avoid inadvertent propagation to an outbound message that (by default) uses the MqttHeaders.TOPIC
, MqttHeaders.QOS
, and MqttHeaders.RETAINED
headers.
To determine the source of an event, use the following; you can check the bean name and/or the connect options (to access the server URIs etc).