1.了解MQTT
MQTT官网地址:
MQTT - 轻量级的物联网消息传输协议 | EMQ
MQTT 是轻量级的物联网消息传输协议。EMQ 提供了通俗易懂的技术文章及简单易用的客户端工具,帮助开发者学习 MQTT 协议并快速入门 MQTT 客户端编程。
https://www.emqx.com/zh/mqtt
EMQ X 安装包下载官网(本文下载的是windows版本)如下,包括对应各种系统的安装方式:
免费试用 EMQ 产品 | EMQ
免费试用 EMQ X Cloud,或免费下载 EMQ X(企业版/开源版)、Neuron、NanoMQ,让 EMQ 产品带您快速开启万物互联的世界。
https://www.emqx.com/zh/try?product=broker
2.开发环境
idea版本:IntelliJ IDEA 2021.2.3
开发工具
|
IntelliJ IDEA
|
版本号
|
2021.2.3
|
mqtt服务端
|
EMQ X
|
版本号
|
emqx-windows-4.3.10
|
spring-boot 版本
|
2.5.7
|
|
|
JDK版本
|
1.8
|
|
|
系统
|
win10
|
|
|
3.工作流程(个人理解简版)
所有的客户端都可以是订阅端,或者是发布端。任何一个客户端发布一条某个主题的消息,都会通过服务端转发给每一个订阅了该主题的客户端。
4.MQTT消息服务质量
MQTT规定了3种消息等级(我觉的这个是mqtt最核心的东西),级别如下:
QoS 0:消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。
QoS 1:消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息。所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。
QoS 2:消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收。
总结:
QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。
5.QoS 在发布与订阅中的区别
-
当客户端 A 的发布 QoS 大于 客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 B 的订阅 QoS。
-
当客户端 A 的发布 QoS 小于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 A 的发布 QoS。
-
总结:服务端给订阅端发送消息时,发布端的QoS 和 订阅端的QoS 谁小,就用谁的QoS级别。
6.代码实现
网上看了好多的spring boot集成mqtt实现发布和订阅的帖子,发现很多都是用的工厂实现,解读起来是在费劲,最后找到另一个帖子后终于是容易理解的内容,经过跟自身所处环境和开发习惯写了本贴。
灵感来自于:
springboot集成mqtt发布订阅_sipengfei_的博客-CSDN博客
源码链接:https://pan.baidu.com/s/1-I2cANT1J54Rf__Wcm5t8Q
提取码:p7q2
6.1准备一个服务端
EMQ X
提供了一个公共 Broker 地址用于测试:broker.emqx.io:1883。本文使用的服务端是EMQ X,是windows版本,安装很简单,只需要把包解压到目标文件夹,然后在bin目录下用cmd命令执行emqx start,如下图。
windows版本
最后在浏览器地址栏输入:
http://localhost:18083/
,默认用户名称和密码是admin/public,登录后可以在设置中可以选择语言模式,默认是English,还可以设置为中文和日语,工具界面如下。
6.2 新建spring-boot项目springboot_publish作为消息发布者
在pom.xml文件中添加mqtt的jar支持
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
在application.properties添加mqtt的相关配置
server.port=8081
#MQTT-服务端地址
publish.mqtt.host=tcp://localhost:1883
#MQTT-服务端用户名
publish.mqtt.username=admin
#MQTT-服务端密码
publish.mqtt.password=public
#MQTT-是否清理session
publish.mqtt.cleansession=false
#MQTT-当前客户端的唯一标识
publish.mqtt.clientid=mqtt_publish
#当前客户端的默认主题(大多数时候没什么用)
publish.mqtt.default_topic=测试
#发送超时时间
publish.mqtt.timeout=1000
#心跳时间
publish.mqtt.keepalive = 10
#连接超时时间
publish.mqtt.connectionTimeout=3000
建立MQTT服务端连接工具类MqttConfig,用于加载配置参数
@Configuration
@ConfigurationProperties(MQTTConfig.PREFIX)
public class MQTTConfig {
//指定配置文件application-local.properties中的属性名前缀
public static final String PREFIX = "publish.mqtt";
private String host;
private String clientid;
private String username;
private String password;
private boolean cleansession;
private String default_topic;
private int timeout;
private int keepalive;
private int connectionTimeout;
public String getHost() {
return host;
public void setHost(String host) {
this.host = host;
public String getClientid() {
return clientid;
public void setClientid(String clientid) {
this.clientid = clientid;
public String getUsername() {
return username;
public void setUsername(String username) {
this.username = username;
public String getPassword() {
return password;
public void setPassword(String password) {
this.password = password;
public String getDefault_topic() {
return default_topic;
public void setDefault_topic(String default_topic) {
this.default_topic = default_topic;
public int getTimeout() {
return timeout;
public void setTimeout(int timeout) {
this.timeout = timeout;
public int getKeepalive() {
return keepalive;
public void setKeepalive(int keepalive) {
this.keepalive = keepalive;
public int getConnectionTimeout() {
return connectionTimeout;
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
public boolean isCleansession() {
return cleansession;
public void setCleansession(boolean cleansession) {
this.cleansession = cleansession;
建立MQTT服务端连接类MqttConnect,本文不用工厂(关键没理解那种方式,汗...)
* 创建MqttConnectOptions连接对象
@Component
public class MqttConnect {
@Autowired
private MQTTConfig config;
public MqttConnect(MQTTConfig config) {
this.config = config;
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(config.isCleansession());
options.setUserName(config.getUsername());
options.setPassword(config.getPassword().toCharArray());
options.setConnectionTimeout(config.getConnectionTimeout());
//设置心跳
options.setKeepAliveInterval(config.getKeepalive());
return options;
public MqttConnectOptions getOptions(MqttConnectOptions options) {
options.setCleanSession(options.isCleanSession());
options.setUserName(options.getUserName());
options.setPassword(options.getPassword());
options.setConnectionTimeout(options.getConnectionTimeout());
options.setKeepAliveInterval(options.getKeepAliveInterval());
return options;
建立消息发送类MQTTServer类,实现主题消息的发送,以及订阅主题、取消订阅主题
package com.example.springboot_publish.fabuzhe;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
* 发布端:主要实现发布消息和订阅主题,接收消息在回调类PushCallback中
* 要发布消息的时候只需要调用sendMQTTMessage方法就OK了
@Service
public class MQTTServer {
private static final Logger LOGGER = LoggerFactory.getLogger(MQTTServer.class);
/* 订阅者客户端对象 */
private MqttClient subsribeClient;
* 发布者客户端对象
* 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
* 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
* 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
private MqttClient publishClient;
/* 主题对象 */
public MqttTopic topic;
/* 消息内容对象 */
public MqttMessage message;
@Autowired
private MqttConnect mqttConnect;
@Autowired
private MQTTConfig config;
public MQTTServer() {
LOGGER.info("8081上线了");
* 发布者客户端和服务端建立连接
public MqttClient publishConnect() {
//防止重复创建MQTTClient实例
try {
if (publishClient==null) {
//先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
//发布消息不需要回调连接
//client.setCallback(new PushCallback());
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!publishClient.isConnected()) {
publishClient.connect(options);
LOGGER.info("---------------------连接成功");
}else {//这里的逻辑是如果连接成功就重新连接
publishClient.disconnect();
publishClient.connect(mqttConnect.getOptions(options));
LOGGER.info("---------------------连接成功");
} catch (MqttException e) {
LOGGER.info(e.toString());
return publishClient;
* 订阅端的链接方法,关键是回调类的设置,要对订阅的主题消息进行处理
* 断线重连方法,如果是持久订阅,重连时不需要再次订阅
* 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
* true为非持久订阅
public void subsribeConnect() {
try {
//防止重复创建MQTTClient实例
if (subsribeClient==null) {
//clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
subsribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
//如果是订阅者则添加回调类,发布不需要,PushCallback类在后面,继续往下看
subsribeClient.setCallback(new PushCallback(MQTTServer.this));
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!subsribeClient.isConnected()) {
subsribeClient.connect(options);
}else {//这里的逻辑是如果连接成功就重新连接
subsribeClient.disconnect();
subsribeClient.connect(mqttConnect.getOptions(options));
LOGGER.info("----------客户端连接成功");
} catch (MqttException e) {
LOGGER.info(e.getMessage(), e);
* 把组装好的消息发出去
* @param topic
* @param message
* @return
public boolean publish(MqttTopic topic , MqttMessage message) {
MqttDeliveryToken token = null;
try {
//把消息发送给对应的主题
token = topic.publish(message);
token.waitForCompletion();
//检查发送是否成功
boolean flag = token.isComplete();
StringBuffer sbf = new StringBuffer(200);
sbf.append("给主题为'"+topic.getName());
sbf.append("'发布消息:");
if (flag) {
sbf.append("成功!消息内容是:"+new String(message.getPayload()));
} else {
sbf.append("失败!");
LOGGER.info(sbf.toString());
} catch (MqttException e) {
LOGGER.info(e.toString());
return token.isComplete();
* MQTT发送指令:主要是组装消息体
* @param topic 主题
* @param data 消息内容
* @param qos 消息级别
public void sendMQTTMessage(String topic, String data, int qos) {
try {
this.publishClient = publishConnect();
this.topic = this.publishClient.getTopic(topic);
message = new MqttMessage();
//消息等级
//level 0:消息最多传递一次,不再关心它有没有发送到对方,也不设置任何重发机制
//level 1:包含了简单的重发机制,发送消息之后等待接收者的回复,如果没收到回复则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
//level 2: 有了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
message.setQos(qos);
//如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
message.setRetained(false);
message.setPayload(data.getBytes());
//将组装好的消息发出去
publish(this.topic, message);
} catch (Exception e) {
LOGGER.info(e.toString());
e.printStackTrace();
* 订阅端订阅消息
* @param topic 要订阅的主题
* @param qos 订阅消息的级别
public void init(String topic, int qos) {
//建立连接
subsribeConnect();
//以某个消息级别订阅某个主题
try {
subsribeClient.subscribe(topic, qos);
} catch (MqttException e) {
LOGGER.info(e.getMessage(), e);
* 订阅端取消订阅消息
* @param topic 要订阅的主题
public void unionInit(String topic) {
//建立连接
subsribeConnect();
//取消订阅某个主题
try {
//MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
subsribeClient.unsubscribe(topic);
} catch (MqttException e) {
LOGGER.info(e.getMessage(), e);
编写消息接收回调类PushCallback
package com.example.springboot_publish.fabuzhe;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* 主要用来接收和处理订阅主题的消息
public class PushCallback implements MqttCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(PushCallback.class);
private MQTTServer mqttServer;
public PushCallback(MQTTServer mqttServer) {
this.mqttServer = mqttServer;
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
LOGGER.info("---------------------连接断开,可以做重连");
mqttServer.subsribeConnect();
while (true){
try {
//如果没有发生异常说明连接成功,如果发生异常,则死循环
Thread.sleep(1000);
break;
}catch (Exception e){
continue;
* 发送消息,消息到达后处理方法
* @param token
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
* 接收所订阅的主题的消息并处理
* @param topic
* @param message
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
String result = new String(message.getPayload(),"UTF-8");
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + result);
//这里可以针对收到的消息做处理,比如持久化
建立Controller层PublishController
package com.example.springboot_publish.fabuzhe;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping(value = "/testPublish")
public class PublishController {
@Resource
private MQTTServer mqttserver;
@RequestMapping(value = "testPublish")
public String testPublish(String topic, String msg, int qos) {
mqttserver.sendMQTTMessage(topic, msg, qos);
String data = "发送了一条主题是‘"+topic+"’,内容是:"+msg+",消息级别 "+qos;
return data;
* 订阅主题
* @param topic 主题
* @param qos 消息级别
* @return
@RequestMapping(value = "testSubsribe")
public String testSubsribe(String topic, int qos) {
mqttserver.init(topic, qos);
return "订阅主题'"+topic+"'成功";
到此,服务端就搭建完了,当然,服务端也可以是消费端。
启动测试,在浏览器输入,http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=%E6%88%91%E6%98%AF8081,%E6%88%91%E5%8F%91%E5%87%BA%E4%BA%86%E6%B6%88%E6%81%AF%EF%BC%81%EF%BC%81%EF%BC%81http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=我是8081,我发出了消息http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=%E6%88%91%E6%98%AF8081,%E6%88%91%E5%8F%91%E5%87%BA%E4%BA%86%E6%B6%88%E6%81%AF%EF%BC%81%EF%BC%81%EF%BC%81
控制台打印了如下内容
此时,EMQ X服务端监控信息如下
6.3.新建spring-boot项目springboot_subsribe作为消息订阅者
pom.xml文件引用和发布端引用的内容相同。配置文件中除了publish.mqtt.clientid,还有端口号之外,其它一模一样,端口号就不解释了,clientid不能一样是因为每一个客户端的clientid是不能和其它客户端重复的,切记,切记,切记......
先将MqttConfig和MqttConnect这两个类从发布端拿过来,不需要做任何改动;
建立MQTTSubsribe订阅者Service类,实际上跟上面的MQTTServer内容和作用是一样的
package com.example.springboot_subsribe.subsribe;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
* 发布端:主要实现发布消息和订阅主题,接收消息在回调类PushCallback中
* 要发布消息的时候只需要调用sendMQTTMessage方法就OK了
@Service
public class MQTTSubsribe {
private static final Logger LOGGER = LoggerFactory.getLogger(MQTTSubsribe.class);
@Autowired
private MQTTConfig mqttConfig;
/* 订阅者客户端对象 */
private MqttClient subsribeClient;
* 发布者客户端对象
* 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
* 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
* 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
private MqttClient publishClient;
/* 主题对象 */
public MqttTopic topic;
/* 消息内容对象 */
public MqttMessage message;
@Autowired
private MqttConnect mqttConnect;
public MQTTSubsribe() {
LOGGER.info("8082上线了");
* 发布端的链接方法,只发布消息,发布者不需要回调消息类
* @return
public MqttClient publishConnect() {
try {
if (publishClient==null) {
publishClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientid(), new MemoryPersistence());
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!publishClient.isConnected()) {
publishClient.connect(options);
}else {//这里的逻辑是如果连接成功就重新连接
publishClient.disconnect();
publishClient.connect(mqttConnect.getOptions(options));
LOGGER.info("-----回调-----客户端连接成功");
} catch (MqttException e) {
LOGGER.info(e.getMessage(), e);
return publishClient;
* 方法实现说明(这个算是订阅端的链接方法,因为要接收消息):
* 断线重连方法,如果是持久订阅,重连是不需要再次订阅
* 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
* true为非持久订阅
public void subsribeConnect() {
try {
//防止重复创建MQTTClient实例
if (subsribeClient==null) {
//clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
subsribeClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
//如果是订阅者则添加回调类,发布不需要,PushCallback类在后面,继续往下看
subsribeClient.setCallback(new PushCallback(MQTTSubsribe.this));
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!subsribeClient.isConnected()) {
subsribeClient.connect(options);
}else {//这里的逻辑是如果连接成功就重新连接
subsribeClient.disconnect();
subsribeClient.connect(mqttConnect.getOptions(options));
LOGGER.info("----------客户端连接成功");
} catch (MqttException e) {
LOGGER.info(e.getMessage(), e);
* 订阅端订阅消息
* @param topic 要订阅的主题
* @param qos 订阅消息的级别
public void init(String topic, int qos) {
//建立连接
subsribeConnect();
//以某个消息级别订阅某个主题
subscribe(topic, qos);
* 订阅端取消订阅消息
* @param topic 要订阅的主题
public void unionInit(String topic) {
//建立连接
subsribeConnect();
//取消订阅某个主题
unsuSubscribe(topic);
* 订阅某个主题
* @param topic .
* @param qos .
public void subscribe(String topic, int qos) {
try {
subsribeClient.subscribe(topic,2);
} catch (MqttException e) {
LOGGER.info(e.getMessage(), e);
* 取消订阅某个主题
* @param topic 要取消的主题
public void unsuSubscribe(String topic) {
try {
//MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
subsribeClient.unsubscribe(topic);
} catch (MqttException e) {
LOGGER.info(e.getMessage(), e);
* 消息内容发送
* @param topic
* @param message
* @return
public boolean publish(MqttTopic topic , MqttMessage message) {
MqttDeliveryToken token = null;
try {
//把消息发送给对应的主题
token = topic.publish(message);
token.waitForCompletion();
//检查发送是否成功
boolean flag = token.isComplete();
StringBuffer sbf = new StringBuffer(200);
sbf.append("给主题为'"+topic.getName());
sbf.append("'发布消息:");
if (flag) {
sbf.append("成功!消息内容是:"+new String(message.getPayload()));
} else {
sbf.append("失败!");
LOGGER.info(sbf.toString());
} catch (MqttException e) {
LOGGER.info(e.toString());
return token.isComplete();
* MQTT发送指令
* @param topic 主题
* @param data 消息内容
* @param qos 消息级别
public void sendMQTTMessage(String topic, String data, int qos) {
try {
//MQTTSubsribe server = new MQTTSubsribe();
//server.client = server.connectCallback();
this.publishClient = this.publishConnect();
this.topic = this.publishClient.getTopic(topic);
this.message = new MqttMessage();
//消息等级
//level 0:消息最多传递一次,不再关心它有没有发送到对方,也不设置任何重发机制
//level 1:包含了简单的重发机制,发送消息之后等待接收者的回复,如果没收到回复则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
//level 2: 有了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
message.setQos(qos);
//如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
message.setRetained(false);
message.setPayload(data.getBytes());
publish(this.topic, message);
} catch (Exception e) {
LOGGER.info(e.toString());
e.printStackTrace();
编写回调连接PushCallback类并实现MqttCallback接口,跟上面的PushCallback一样
package com.example.springboot_subsribe.subsribe;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* 主要用来接收和处理订阅主题的消息
public class PushCallback implements MqttCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(PushCallback.class);
private MQTTSubsribe mqttSubsribe;
public PushCallback(MQTTSubsribe subsribe) {
this.mqttSubsribe = subsribe;
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
LOGGER.info("---------------------连接断开,可以做重连");
mqttSubsribe.subsribeConnect();
while (true){
try {
//如果没有发生异常说明连接成功,如果发生异常,则死循环
Thread.sleep(1000);
break;
}catch (Exception e){
continue;
* 发送消息,消息到达后处理方法
* @param token
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
* 接收所订阅的主题的消息并处理
* @param topic
* @param message
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
String result = new String(message.getPayload(),"UTF-8");
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + result);
//这里可以针对收到的消息做处理,比如持久化
编写测试controller类SubsribeController
package com.example.springboot_subsribe.subsribe;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/SubsribeController")
public class SubsribeController {
@Autowired
private MQTTSubsribe mqttSubsribe;
* 订阅主题
* @param topic 主题
* @param qos 消息级别
* @return
@RequestMapping(value = "testSubsribe")
public String testSubsribe(String topic, int qos) {
mqttSubsribe.init(topic, qos);
return "订阅'"+topic+"'成功";
* 退订主题
* @param topic 主题
* @return
@RequestMapping(value = "testUnsvSubsribe")
public String testUnsvSubsribe(String topic) {
mqttSubsribe.unionInit(topic);
return "取消订阅'"+topic+"'成功";
@RequestMapping(value = "testPublish")
public String testPublish(String topic, String msg, int qos) {
mqttSubsribe.sendMQTTMessage(topic, msg, qos);
String data = "发送了一条主题是‘"+topic+"’,内容是:"+msg+",消息级别 "+qos;
return data;
到此,编码就完成了,启动项目后再浏览器测试订阅,在浏览器输入:
http://localhost:8082/SubsribeController/testSubsribe?topic=test&qos=2
这个时候再用http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=我是8081,我发出了消息发布主题为test的消息,springboot_subsribe服务就会把消息接收到,如下图控制台输出
服务端页面显示了两台客户端,mqtt_subsribe订阅了主题‘test’,效果如下
接下来,我们从发布端发条信息来看
http://localhost:8081/testPublish/testPublish?topic=测试发布&qos=2&msg=消息发出成功了吗
控制台输出如下
取消订阅的调用就不在这里详细叙说了,调用方法跟发布主题类似。
此时,服务端页面展示如下
可以看出订阅已经没了,,,,, 本文结束!
结束语,其实服务端是否用emq x 都行,我刚开始试验用的active也是一样代码,区别就是服务监控平台不同。
本文介绍了使用idea工具搭建spring boot项目,并集成mqtt消息服务,进行消息发布与消息订阅接收处理,使用EMQ X作为服务端。使用基础方式实现,有助于菜鸟学习并理解。
现在是:2023年3月5日19:03:49在上一篇文章中,我介绍了如何在服务器中安装消息服务器,这是在操作协议的时候必不可少的步骤,今天我们就来看看如何将服务集成到项目中。刚开始在集成的时候,也在网上看了些资料,也遇到了些坑,最后参考的是这篇文章,然后加上自己的简单修改,以及博主的悉心指导,最后终于实现了我预期的效果。
参考文章连接:点击这里注意,在实现mqtt的时候,一定要先启动emqx消息服务器的服务,关于emqx的安装与使用,可以移步到这里点击这里
下面我们来看看实现代码。为了模拟的更加真实点儿,我这
Windows上Mqtt服务器搭建与使用客户端工具MqttBox进行测试:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112305328
在上面搭建好了MQTT服务器以及客户端工具MqttBox之后,怎样在SpringBoot中实现订阅主题接收消息和发布主题推送消息的功能。
博客:https://blog.csdn.net/badao_liumang_qizhi
关注公众号
霸道的程序猿
获取编程相关电子书、教程
公司中项目大多是物联网项目,需要跟设备进行交互,用到的协议比较多,如NB/
MQTT/LWM2M/COAP等,项目中不可避免用到了
MQTT协议,本文介绍
springboot项目
MQTT客户端实现,不多说直接上可执行代码。
一、EMQ官网java sdk demo,如果只需要用到一个客户端,可以参照下官网demo,修改下应用用项目
1、pom.xml依赖引用
<dependency>
<groupId>org.eclipse.paho</groupId>
最近在做MQTT对接,然后发送消息,然后参考网上的实战文章进行了一下整理。
文章主要参考自(https://www.codetd.com/article/13550340),然后自己做了些许更改。
1、整合准备
SpringBoot:2.2.2.RELEASE
MQTT平台:EMQX4.4.1(Docker运行)
虚拟机服务器:Centos7(192.168.56.102 )
发送端:cloud-mqtt-send8001
接收端:cloud-mqtt-accept8002
2、发送端:cl
这玩意能干什么?我只能说,这是一个物联网的方案,能通过java客户端监听来自单片机发送的消息,单片机有什么消息?常见的有:比如持久性的传感器数据上报,这你得1s上传一次吧,还有一些控制设备的信息,比如灯,电机之类一次操作持续运转的东西。
写在前面:
曾经用过的一种需要接入Internet的物联网方案(这是目前的主流):
我之前就受益与免费的云平台(云服务器),比如我以前博客里介绍过了的巴法云平台,还有我以前用过的小熊派华为云平台,它们的好处显而易见,就是不需要你去搭建服务器,不需要去了解数据传输的各种
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.
mqttv3</artifactId>
<version>1.1.0...
spring-boot集成mqtt,实现发布和订阅(工厂模式)
花九猫的对象: