-
添加依赖
-
添加配置
-
工具类说明
-
订阅消息
-
发送消息
mqtt整合
最近有一个业务,要求连接多个非集群不同的mqtt服务,于是乎写了一个可根据配置动态配置的工具。starter-integration-mqtt整合了spring-integration-mqtt,只需添加配置,并实现消息订阅接口即可。可以实现订阅多个mqtt。
github源码:https://github.com/aLiang-xyl/integration-mqtt
添加依赖
<dependency>
<groupId>cn.xyliang</groupId>
<artifactId>integration-mqtt-starter</artifactId>
<version>0.0.2</version>
</dependency>
添加配置
-
mqtt:
config:
channel1: #通道名称,可自定义,订阅消息时需要该名称
url: [tcp://host1:1883, tcp://host1:1883] #mqtt的url
topics: [topic1, topic2] #监听的主题,和qos一一对应
qos: [1, 0] #监听主题的qos,和主题一一对应
username: admin #用户名
password: public #密码
timeout: 60 #连接超时时间,单位:秒
kep-alive-interval: 60 #心跳时间,单位:秒
async: true #发送消息时是否异步发送
client-id-append-ip: true #是否在clientId后面追加本机ip,因为clientid是唯一值,集群环境下不能使用相同的clientid,追加ip可解决该问题
consumer-client-id: consumer_client_test1 #consumer client id配置
producer-client-id: producer_client_test1 #producer client id配置
consumer-will: #consumer遗嘱消息配置
qos: 1 #遗嘱qos
topic: will_topic #遗嘱主题
payload: '{"id": "consumer_client_test1"}' #遗嘱内容
retained: false #是否发送保留消息
producer-will: #producer遗嘱消息配置
qos: 1 #遗嘱qos
topic: will_topic #遗嘱主题
payload: '{"id": "producer_client_test1"}' #遗嘱内容
retained: false #是否发送保留消息
channel2: #通道名称,第二个配置
url: [tcp://host1:1883, tcp://host1:1883]
topics: [topic1, topic2]
qos: [1, 0]
username: admin
password: public
timeout: 60
kep-alive-interval: 60
async: true
consumer-client-id: consumer_client_test2
producer-client-id: producer_client_test2
consumer-will:
qos: 1
topic: will_topic
payload: '{"id": "consumer_client_test2"}'
retained: false
producer-will:
qos: 1
topic: will_topic
payload: '{"id": "producer_client_test2"}'
retained: false
工具类说明
MqttUtils工具类,用来发送mqtt消息
-
package cn.xyliang.mqtt.utils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import lombok.extern.log4j.Log4j2;
* 描述: mqtt工具类,可以根据通道名称发送消息
* @author xingyl
* @date 2020-04-01 10:16:35
@Log4j2
public class MqttUtils {
* qos 0
public static final int QOS_0 = 0;
* qos 1
public static final int QOS_1 = 1;
* qos 2
public static final int QOS_2 = 2;
private final static Map<String, MqttPahoMessageHandler> HANDLER_MAP = new HashMap<>(16);
public final static String CHANNEL_NAME_SUFFIX = "MqttPahoMessageHandler";
* 存放handler
* @param channelName
* @param handler
public static void put(String channelName, MqttPahoMessageHandler handler) {
HANDLER_MAP.put(channelName + CHANNEL_NAME_SUFFIX, handler);
* 发送消息
* @param topic 要发送的主题
* @param message 消息内容
* @param qos qos级别
* @param channelName 发送到指定的通道
public static void sendMessage(String topic, String message, int qos, String channelName) {
MqttPahoMessageHandler handler = getHandler(channelName);
Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, qos).build();
handler.handleMessage(mqttMessage);
* 发送消息,默认qos级别为1
* @param topic 要发送的主题
* @param message 消息内容
* @param channelName 发送到指定的通道
public static void sendMessage(String topic, String message, String channelName) {
MqttPahoMessageHandler handler = getHandler(channelName);
Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, QOS_1).build();
handler.handleMessage(mqttMessage);
* 发送消息
* @param mqttMessage 消息
* @param channelName 发送到指定的通道
public static void sendMessage(Message<String> mqttMessage, String channelName) {
MqttPahoMessageHandler handler = getHandler(channelName);
handler.handleMessage(mqttMessage);
* 如果只有一个通道将使用该通道发送消息
* @param topic
* @param message
* @param qos
public static void sendMessage(String topic, String message, int qos) {
MqttPahoMessageHandler handler = getDefaultHeadler();
Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, qos).build();
handler.handleMessage(mqttMessage);
* 如果只有一个通道将使用该通道发送消息,默认qos级别为1
* @param topic
* @param message
public static void sendMessage(String topic, String message) {
MqttPahoMessageHandler handler = getDefaultHeadler();
Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, QOS_1).build();
handler.handleMessage(mqttMessage);
* 如果只有一个通道将使用该通道发送消息,默认qos级别为1
* @param mqttMessage 消息信息
public static void sendMessage(Message<String> mqttMessage) {
MqttPahoMessageHandler handler = getDefaultHeadler();
handler.handleMessage(mqttMessage);
* 获取默认的handler
* @return
private static MqttPahoMessageHandler getDefaultHeadler() {
Collection<MqttPahoMessageHandler> values = HANDLER_MAP.values();
Iterator<MqttPahoMessageHandler> iterator = values.iterator();
MqttPahoMessageHandler handler = iterator.next();
if (handler == null) {
log.error("发送消息失败,无可用的headler");
throw new RuntimeException("发送消息失败,无可用的headler");
return handler;
* 根据通道获取handler
* @param channelName
* @return
private static MqttPahoMessageHandler getHandler(String channelName) {
MqttPahoMessageHandler handler = HANDLER_MAP.get(channelName + CHANNEL_NAME_SUFFIX);
if (handler == null) {
log.error("未查询到相应通道{}的handler,存在的通道名称{}", channelName, HANDLER_MAP.keySet());
throw new IllegalArgumentException("未查询到相应通道" + channelName + "的handler");
return handler;
订阅消息
订阅消息需要实现MessageHandler
接口
注意:
@ServiceActivator(inputChannel = "channel1")
中的配置必须和yml配置文件里的channel保持一致,一个channel配置对应一个MessageHandler接口实现。
-
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import lombok.extern.log4j.Log4j2;
* <p>描述:配置channel1消息处理 </p>
* @author xingyl
* @date 2020年3月27日 下午6:33:35
@Log4j2
@Component
public class MqttMessageHandler implements MessageHandler {
@ServiceActivator(inputChannel = "channel1")
@Override
public void handleMessage(Message<?> message) throws MessagingException {
log.info("收到消息---{}", message);
配置中每一个channel对应一个MessageHandler实现
MqttUtils工具类中封装了多个发送消息的方法
github源码:https://github.com/aLiang-xyl/integration-mqtt
————————————————
版权声明:本文为CSDN博主「阿良~」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_20280007/article/details/105250986
springboot自动化配置mqtt,整合spring-integration-mqtt,连接多个mqttmqtt整合 添加依赖 添加配置 工具类说明 订阅消息 发送消息 mqtt整合 最近有一个业务,要求连接多个非集群不同的mqtt服务,于是乎写了一个可根据配置动态配置的工具。starter-integration-mqtt整合了spring-integration-mqtt,只需添加配置,并实现消息订阅接口即可。可以实现订阅多个mqtt。 github源码.
springboot
+idea+
java
+
mqtt
实现
订阅
者
订阅
消息
针对业务需要和硬件对接,使用
mqtt
来
处理
硬件的数据
java
实现
订阅
者
订阅
消息,以及
处理
硬件的数据
是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据实际情况自行定义实体类以及数据库
物联网
项目的
MQTT
订阅
应用示例
一组简单的应用程序,可以从
MQTT
代理
订阅
主题
。 这些应用程序以多种编程
语言
编写,以提供适用于不同平台的示例。
js-
mqtt
-websockets-demo
一个简单的Web应用程序,可以使用websocket从
MQTT
代理
订阅
MQTT
主题
。 提供一个简单的用户界面,以输入主机名/ IP地址,端口号和
MQTT
主题
。
SpringBoot
整合
MQTT
发布和
订阅
消息
处理
上一篇内容讲述了单个
mqtt
客户端
连接
一个指定的
mqtt
服务器,一般来说是够用的,但最近一个项目需求是要接收
多个
mqtt
服务器的数据,如果还按上一篇文章的
处理
方法,就只能讲代码类复制
多个
这样实现,但这样做的后果就是代码不美观不便于维护,于是就整理了这篇文章,
springboot
用同一套代码实现
多个
mqtt
服务器的
连接
和管理:
大致结构,三个类文件即可实现
需要在mysql创建表(表结构如下):
硬件采集的数据传入EMQX平台(采用
MQTT
协议),
java
通过代码
连接
MQTT
服务器,进行采集数据接收、解析、业务
处理
、存储入库、数据展示。
MQTT
是基于发布(Publish)/
订阅
(Subscribe)模式来进行通信及数据交换的。
二、本文只讲解
java
连接
MQTT
服务器进行数据
处理
最近嘛,遇到了客户使用
MQTT
发布消息。而我们则需把这些消息获取并解析进我们自己的系统中,并且对这些数据进行存储,这些就需要我们熟悉
MQTT
的使用,话不多说,搞起。
因我们项目使用的是
SpringBoot
,因此在pom中添加
MQTT
的相关依赖,因仅涉及
订阅
,顾不考虑发布等。
<dependency>
<groupId>org.eclipse.paho</groupId>
import org.eclipse.paho.client.
mqtt
v3.
Mqtt
Client;
import org.eclipse.paho.client.
mqtt
v3.
Mqtt
ConnectOptions;
import org.eclipse.paho.cl