相关文章推荐
潇洒的弓箭  ·  基于滴滴云DC2 ...·  1 年前    · 

1.了解MQTT

MQTT官网地址: MQTT - 轻量级的物联网消息传输协议 | EMQ MQTT 是轻量级的物联网消息传输协议。EMQ 提供了通俗易懂的技术文章及简单易用的客户端工具,帮助开发者学习 MQTT 协议并快速入门 MQTT 客户端编程。 https://www.emqx.com/zh/mqtt

EMQ X 安装包下载官网(本文下载的是windows版本)如下,包括对应各种系统的安装方式: 免费试用 EMQ 产品 | EMQ 免费试用 EMQ X Cloud,或免费下载 EMQ X(企业版/开源版)、Neuron、NanoMQ,让 EMQ 产品带您快速开启万物互联的世界。 https://www.emqx.com/zh/try?product=broker

2.开发环境

idea版本:IntelliJ IDEA 2021.2.3

开发工具 IntelliJ IDEA 版本号 2021.2.3
mqtt服务端 EMQ X 版本号 emqx-windows-4.3.10
spring-boot 版本 2.5.7
JDK版本 1.8
系统 win10

3.工作流程(个人理解简版)

所有的客户端都可以是订阅端,或者是发布端。任何一个客户端发布一条某个主题的消息,都会通过服务端转发给每一个订阅了该主题的客户端。

4.MQTT消息服务质量

MQTT规定了3种消息等级(我觉的这个是mqtt最核心的东西),级别如下:

QoS 0:消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。

QoS 1:消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息。所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。

QoS 2:消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收。

总结: QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。

5.QoS 在发布与订阅中的区别

  • 当客户端 A 的发布 QoS 大于 客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 B 的订阅 QoS。
  • 当客户端 A 的发布 QoS 小于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 A 的发布 QoS。
  • 总结:服务端给订阅端发送消息时,发布端的QoS 和 订阅端的QoS 谁小,就用谁的QoS级别。

6.代码实现

网上看了好多的spring boot集成mqtt实现发布和订阅的帖子,发现很多都是用的工厂实现,解读起来是在费劲,最后找到另一个帖子后终于是容易理解的内容,经过跟自身所处环境和开发习惯写了本贴。

灵感来自于: springboot集成mqtt发布订阅_sipengfei_的博客-CSDN博客

源码链接:https://pan.baidu.com/s/1-I2cANT1J54Rf__Wcm5t8Q
提取码:p7q2

6.1准备一个服务端

EMQ X 提供了一个公共 Broker 地址用于测试:broker.emqx.io:1883。本文使用的服务端是EMQ X,是windows版本,安装很简单,只需要把包解压到目标文件夹,然后在bin目录下用cmd命令执行emqx start,如下图。

windows版本

最后在浏览器地址栏输入: http://localhost:18083/ ,默认用户名称和密码是admin/public,登录后可以在设置中可以选择语言模式,默认是English,还可以设置为中文和日语,工具界面如下。

6.2 新建spring-boot项目springboot_publish作为消息发布者

在pom.xml文件中添加mqtt的jar支持

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

在application.properties添加mqtt的相关配置

server.port=8081
#MQTT-服务端地址
publish.mqtt.host=tcp://localhost:1883
#MQTT-服务端用户名
publish.mqtt.username=admin
#MQTT-服务端密码
publish.mqtt.password=public
#MQTT-是否清理session
publish.mqtt.cleansession=false
#MQTT-当前客户端的唯一标识
publish.mqtt.clientid=mqtt_publish
#当前客户端的默认主题(大多数时候没什么用)
publish.mqtt.default_topic=测试
#发送超时时间
publish.mqtt.timeout=1000
#心跳时间
publish.mqtt.keepalive = 10
#连接超时时间
publish.mqtt.connectionTimeout=3000

建立MQTT服务端连接工具类MqttConfig,用于加载配置参数

@Configuration
@ConfigurationProperties(MQTTConfig.PREFIX)
public class MQTTConfig {
    //指定配置文件application-local.properties中的属性名前缀
    public static final String PREFIX = "publish.mqtt";
    private String host;
    private String clientid;
    private String username;
    private String password;
    private boolean cleansession;
    private String default_topic;
    private int timeout;
    private int keepalive;
    private int connectionTimeout;
    public String getHost() {
        return host;
    public void setHost(String host) {
        this.host = host;
    public String getClientid() {
        return clientid;
    public void setClientid(String clientid) {
        this.clientid = clientid;
    public String getUsername() {
        return username;
    public void setUsername(String username) {
        this.username = username;
    public String getPassword() {
        return password;
    public void setPassword(String password) {
        this.password = password;
    public String getDefault_topic() {
        return default_topic;
    public void setDefault_topic(String default_topic) {
        this.default_topic = default_topic;
    public int getTimeout() {
        return timeout;
    public void setTimeout(int timeout) {
        this.timeout = timeout;
    public int getKeepalive() {
        return keepalive;
    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    public int getConnectionTimeout() {
        return connectionTimeout;
    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    public boolean isCleansession() {
        return cleansession;
    public void setCleansession(boolean cleansession) {
        this.cleansession = cleansession;

建立MQTT服务端连接类MqttConnect,本文不用工厂(关键没理解那种方式,汗...)

* 创建MqttConnectOptions连接对象 @Component public class MqttConnect { @Autowired private MQTTConfig config; public MqttConnect(MQTTConfig config) { this.config = config; public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(config.isCleansession()); options.setUserName(config.getUsername()); options.setPassword(config.getPassword().toCharArray()); options.setConnectionTimeout(config.getConnectionTimeout()); //设置心跳 options.setKeepAliveInterval(config.getKeepalive()); return options; public MqttConnectOptions getOptions(MqttConnectOptions options) { options.setCleanSession(options.isCleanSession()); options.setUserName(options.getUserName()); options.setPassword(options.getPassword()); options.setConnectionTimeout(options.getConnectionTimeout()); options.setKeepAliveInterval(options.getKeepAliveInterval()); return options;

建立消息发送类MQTTServer类,实现主题消息的发送,以及订阅主题、取消订阅主题

package com.example.springboot_publish.fabuzhe;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 * 发布端:主要实现发布消息和订阅主题,接收消息在回调类PushCallback中
 * 要发布消息的时候只需要调用sendMQTTMessage方法就OK了
@Service
public class MQTTServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQTTServer.class);
    /* 订阅者客户端对象 */
    private MqttClient subsribeClient;
     * 发布者客户端对象
     * 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
     * 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
     * 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
    private MqttClient publishClient;
    /* 主题对象 */
    public MqttTopic topic;
    /* 消息内容对象 */
    public MqttMessage message;
    @Autowired
    private MqttConnect mqttConnect;
    @Autowired
    private MQTTConfig config;
    public MQTTServer() {
        LOGGER.info("8081上线了");
     * 发布者客户端和服务端建立连接
    public MqttClient publishConnect() {
        //防止重复创建MQTTClient实例
        try {
            if (publishClient==null) {
                //先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
                //发布消息不需要回调连接
                //client.setCallback(new PushCallback());
            MqttConnectOptions options = mqttConnect.getOptions();
            //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
            if (!publishClient.isConnected()) {
                publishClient.connect(options);
                LOGGER.info("---------------------连接成功");
            }else {//这里的逻辑是如果连接成功就重新连接
                publishClient.disconnect();
                publishClient.connect(mqttConnect.getOptions(options));
                LOGGER.info("---------------------连接成功");
        } catch (MqttException e) {
            LOGGER.info(e.toString());
        return publishClient;
     * 订阅端的链接方法,关键是回调类的设置,要对订阅的主题消息进行处理
     * 断线重连方法,如果是持久订阅,重连时不需要再次订阅
     * 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
     * true为非持久订阅
    public void subsribeConnect() {
        try {
            //防止重复创建MQTTClient实例
            if (subsribeClient==null) {
                //clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
                subsribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
                //如果是订阅者则添加回调类,发布不需要,PushCallback类在后面,继续往下看
                subsribeClient.setCallback(new PushCallback(MQTTServer.this));
            MqttConnectOptions options = mqttConnect.getOptions();
            //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
            if (!subsribeClient.isConnected()) {
                subsribeClient.connect(options);
            }else {//这里的逻辑是如果连接成功就重新连接
                subsribeClient.disconnect();
                subsribeClient.connect(mqttConnect.getOptions(options));
            LOGGER.info("----------客户端连接成功");
        } catch (MqttException e) {
            LOGGER.info(e.getMessage(), e);
     * 把组装好的消息发出去
     * @param topic
     * @param message
     * @return
    public boolean publish(MqttTopic topic , MqttMessage message) {
        MqttDeliveryToken token = null;
        try {
            //把消息发送给对应的主题
            token = topic.publish(message);
            token.waitForCompletion();
            //检查发送是否成功
            boolean flag = token.isComplete();
            StringBuffer sbf = new StringBuffer(200);
            sbf.append("给主题为'"+topic.getName());
            sbf.append("'发布消息:");
            if (flag) {
                sbf.append("成功!消息内容是:"+new String(message.getPayload()));
            } else {
                sbf.append("失败!");
            LOGGER.info(sbf.toString());
        } catch (MqttException e) {
            LOGGER.info(e.toString());
        return token.isComplete();
     * MQTT发送指令:主要是组装消息体
     * @param topic 主题
     * @param data 消息内容
     * @param qos 消息级别
    public void sendMQTTMessage(String topic, String data, int qos) {
        try {
            this.publishClient = publishConnect();
            this.topic = this.publishClient.getTopic(topic);
            message = new MqttMessage();
            //消息等级
            //level 0:消息最多传递一次,不再关心它有没有发送到对方,也不设置任何重发机制
            //level 1:包含了简单的重发机制,发送消息之后等待接收者的回复,如果没收到回复则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
            //level 2: 有了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
            message.setQos(qos);
            //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
            message.setRetained(false);
            message.setPayload(data.getBytes());
            //将组装好的消息发出去
            publish(this.topic, message);
        } catch (Exception e) {
            LOGGER.info(e.toString());
            e.printStackTrace();
     * 订阅端订阅消息
     * @param topic 要订阅的主题
     * @param qos 订阅消息的级别
    public void init(String topic, int qos) {
        //建立连接
        subsribeConnect();
        //以某个消息级别订阅某个主题
        try {
            subsribeClient.subscribe(topic, qos);
        } catch (MqttException e) {
            LOGGER.info(e.getMessage(), e);
     * 订阅端取消订阅消息
     * @param topic 要订阅的主题
    public void unionInit(String topic) {
        //建立连接
        subsribeConnect();
        //取消订阅某个主题
        try {
            //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
            subsribeClient.unsubscribe(topic);
        } catch (MqttException e) {
            LOGGER.info(e.getMessage(), e);

编写消息接收回调类PushCallback

package com.example.springboot_publish.fabuzhe;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 * 主要用来接收和处理订阅主题的消息
public class PushCallback implements MqttCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushCallback.class);
    private MQTTServer mqttServer;
    public PushCallback(MQTTServer mqttServer) {
        this.mqttServer = mqttServer;
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        LOGGER.info("---------------------连接断开,可以做重连");
        mqttServer.subsribeConnect();
        while (true){
            try {
                //如果没有发生异常说明连接成功,如果发生异常,则死循环
                Thread.sleep(1000);
                break;
            }catch (Exception e){
                continue;
     * 发送消息,消息到达后处理方法
     * @param token
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
     * 接收所订阅的主题的消息并处理
     * @param topic
     * @param message
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        String result = new String(message.getPayload(),"UTF-8");
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + result);
        //这里可以针对收到的消息做处理,比如持久化

建立Controller层PublishController

package com.example.springboot_publish.fabuzhe;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping(value = "/testPublish")
public class PublishController {
    @Resource
    private MQTTServer mqttserver;
    @RequestMapping(value = "testPublish")
    public String testPublish(String topic, String msg, int qos) {
        mqttserver.sendMQTTMessage(topic, msg, qos);
        String data = "发送了一条主题是‘"+topic+"’,内容是:"+msg+",消息级别 "+qos;
        return data;
     * 订阅主题
     * @param topic 主题
     * @param qos 消息级别
     * @return
    @RequestMapping(value = "testSubsribe")
    public String testSubsribe(String topic, int qos) {
        mqttserver.init(topic, qos);
        return "订阅主题'"+topic+"'成功";

到此,服务端就搭建完了,当然,服务端也可以是消费端。

启动测试,在浏览器输入,http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=%E6%88%91%E6%98%AF8081,%E6%88%91%E5%8F%91%E5%87%BA%E4%BA%86%E6%B6%88%E6%81%AF%EF%BC%81%EF%BC%81%EF%BC%81http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=我是8081,我发出了消息http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=%E6%88%91%E6%98%AF8081,%E6%88%91%E5%8F%91%E5%87%BA%E4%BA%86%E6%B6%88%E6%81%AF%EF%BC%81%EF%BC%81%EF%BC%81

控制台打印了如下内容

 此时,EMQ X服务端监控信息如下

6.3.新建spring-boot项目springboot_subsribe作为消息订阅者

pom.xml文件引用和发布端引用的内容相同。配置文件中除了publish.mqtt.clientid,还有端口号之外,其它一模一样,端口号就不解释了,clientid不能一样是因为每一个客户端的clientid是不能和其它客户端重复的,切记,切记,切记......

先将MqttConfig和MqttConnect这两个类从发布端拿过来,不需要做任何改动;

建立MQTTSubsribe订阅者Service类,实际上跟上面的MQTTServer内容和作用是一样的

package com.example.springboot_subsribe.subsribe;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 * 发布端:主要实现发布消息和订阅主题,接收消息在回调类PushCallback中
 * 要发布消息的时候只需要调用sendMQTTMessage方法就OK了
@Service
public class MQTTSubsribe {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQTTSubsribe.class);
    @Autowired
    private MQTTConfig mqttConfig;
    /* 订阅者客户端对象 */
    private MqttClient subsribeClient;
     * 发布者客户端对象
     * 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
     * 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
     * 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
    private MqttClient publishClient;
    /* 主题对象 */
    public MqttTopic topic;
    /* 消息内容对象 */
    public MqttMessage message;
    @Autowired
    private MqttConnect mqttConnect;
    public MQTTSubsribe() {
        LOGGER.info("8082上线了");
     * 发布端的链接方法,只发布消息,发布者不需要回调消息类
     * @return
    public MqttClient publishConnect() {
        try {
            if (publishClient==null) {
                publishClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientid(), new MemoryPersistence());
            MqttConnectOptions options = mqttConnect.getOptions();
            //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
            if (!publishClient.isConnected()) {
                publishClient.connect(options);
            }else {//这里的逻辑是如果连接成功就重新连接
                publishClient.disconnect();
                publishClient.connect(mqttConnect.getOptions(options));
            LOGGER.info("-----回调-----客户端连接成功");
        } catch (MqttException e) {
            LOGGER.info(e.getMessage(), e);
        return publishClient;
     * 方法实现说明(这个算是订阅端的链接方法,因为要接收消息):
     * 断线重连方法,如果是持久订阅,重连是不需要再次订阅
     * 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
     * true为非持久订阅
    public void subsribeConnect() {
        try {
            //防止重复创建MQTTClient实例
            if (subsribeClient==null) {
                //clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
                subsribeClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
                //如果是订阅者则添加回调类,发布不需要,PushCallback类在后面,继续往下看
                subsribeClient.setCallback(new PushCallback(MQTTSubsribe.this));
            MqttConnectOptions options = mqttConnect.getOptions();
            //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
            if (!subsribeClient.isConnected()) {
                subsribeClient.connect(options);
            }else {//这里的逻辑是如果连接成功就重新连接
                subsribeClient.disconnect();
                subsribeClient.connect(mqttConnect.getOptions(options));
            LOGGER.info("----------客户端连接成功");
        } catch (MqttException e) {
            LOGGER.info(e.getMessage(), e);
     * 订阅端订阅消息
     * @param topic 要订阅的主题
     * @param qos 订阅消息的级别
    public void init(String topic, int qos) {
        //建立连接
        subsribeConnect();
        //以某个消息级别订阅某个主题
        subscribe(topic, qos);
     * 订阅端取消订阅消息
     * @param topic 要订阅的主题
    public void unionInit(String topic) {
        //建立连接
        subsribeConnect();
        //取消订阅某个主题
        unsuSubscribe(topic);
     * 订阅某个主题
     * @param topic .
     * @param qos .
    public void subscribe(String topic, int qos) {
        try {
            subsribeClient.subscribe(topic,2);
        } catch (MqttException e) {
            LOGGER.info(e.getMessage(), e);
     * 取消订阅某个主题
     * @param topic 要取消的主题
    public void unsuSubscribe(String topic) {
        try {
            //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
            subsribeClient.unsubscribe(topic);
        } catch (MqttException e) {
            LOGGER.info(e.getMessage(), e);
     * 消息内容发送
     * @param topic
     * @param message
     * @return
    public boolean publish(MqttTopic topic , MqttMessage message) {
        MqttDeliveryToken token = null;
        try {
            //把消息发送给对应的主题
            token = topic.publish(message);
            token.waitForCompletion();
            //检查发送是否成功
            boolean flag = token.isComplete();
            StringBuffer sbf = new StringBuffer(200);
            sbf.append("给主题为'"+topic.getName());
            sbf.append("'发布消息:");
            if (flag) {
                sbf.append("成功!消息内容是:"+new String(message.getPayload()));
            } else {
                sbf.append("失败!");
            LOGGER.info(sbf.toString());
        } catch (MqttException e) {
            LOGGER.info(e.toString());
        return token.isComplete();
     * MQTT发送指令
     * @param topic 主题
     * @param data 消息内容
     * @param qos 消息级别
    public void sendMQTTMessage(String topic, String data, int qos) {
        try {
            //MQTTSubsribe server = new MQTTSubsribe();
            //server.client = server.connectCallback();
            this.publishClient = this.publishConnect();
            this.topic = this.publishClient.getTopic(topic);
            this.message = new MqttMessage();
            //消息等级
            //level 0:消息最多传递一次,不再关心它有没有发送到对方,也不设置任何重发机制
            //level 1:包含了简单的重发机制,发送消息之后等待接收者的回复,如果没收到回复则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
            //level 2: 有了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
            message.setQos(qos);
            //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
            message.setRetained(false);
            message.setPayload(data.getBytes());
            publish(this.topic, message);
        } catch (Exception e) {
            LOGGER.info(e.toString());
            e.printStackTrace();

编写回调连接PushCallback类并实现MqttCallback接口,跟上面的PushCallback一样

package com.example.springboot_subsribe.subsribe;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 * 主要用来接收和处理订阅主题的消息
public class PushCallback implements MqttCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushCallback.class);
    private MQTTSubsribe mqttSubsribe;
    public PushCallback(MQTTSubsribe subsribe) {
        this.mqttSubsribe = subsribe;
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        LOGGER.info("---------------------连接断开,可以做重连");
        mqttSubsribe.subsribeConnect();
        while (true){
            try {
                //如果没有发生异常说明连接成功,如果发生异常,则死循环
                Thread.sleep(1000);
                break;
            }catch (Exception e){
                continue;
     * 发送消息,消息到达后处理方法
     * @param token
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
     * 接收所订阅的主题的消息并处理
     * @param topic
     * @param message
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        String result = new String(message.getPayload(),"UTF-8");
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + result);
        //这里可以针对收到的消息做处理,比如持久化

编写测试controller类SubsribeController

package com.example.springboot_subsribe.subsribe;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/SubsribeController")
public class SubsribeController {
    @Autowired
    private MQTTSubsribe mqttSubsribe;
     * 订阅主题
     * @param topic 主题
     * @param qos 消息级别
     * @return
    @RequestMapping(value = "testSubsribe")
    public String testSubsribe(String topic, int qos) {
        mqttSubsribe.init(topic, qos);
        return "订阅'"+topic+"'成功";
     * 退订主题
     * @param topic 主题
     * @return
    @RequestMapping(value = "testUnsvSubsribe")
    public String testUnsvSubsribe(String topic) {
        mqttSubsribe.unionInit(topic);
        return "取消订阅'"+topic+"'成功";
    @RequestMapping(value = "testPublish")
    public String testPublish(String topic, String msg, int qos) {
        mqttSubsribe.sendMQTTMessage(topic, msg, qos);
        String data = "发送了一条主题是‘"+topic+"’,内容是:"+msg+",消息级别 "+qos;
        return data;

到此,编码就完成了,启动项目后再浏览器测试订阅,在浏览器输入:

http://localhost:8082/SubsribeController/testSubsribe?topic=test&qos=2

 这个时候再用http://localhost:8081/testPublish/testPublish?topic=test&qos=2&msg=我是8081,我发出了消息发布主题为test的消息,springboot_subsribe服务就会把消息接收到,如下图控制台输出

服务端页面显示了两台客户端,mqtt_subsribe订阅了主题‘test’,效果如下

 接下来,我们从发布端发条信息来看

 http://localhost:8081/testPublish/testPublish?topic=测试发布&qos=2&msg=消息发出成功了吗

控制台输出如下

 取消订阅的调用就不在这里详细叙说了,调用方法跟发布主题类似。

 此时,服务端页面展示如下

可以看出订阅已经没了,,,,,   本文结束!

结束语,其实服务端是否用emq x 都行,我刚开始试验用的active也是一样代码,区别就是服务监控平台不同。

本文介绍了使用idea工具搭建spring boot项目,并集成mqtt消息服务,进行消息发布与消息订阅接收处理,使用EMQ X作为服务端。使用基础方式实现,有助于菜鸟学习并理解。
现在是:2023年3月5日19:03:49在上一篇文章中,我介绍了如何在服务器中安装消息服务器,这是在操作协议的时候必不可少的步骤,今天我们就来看看如何将服务集成到项目中。刚开始在集成的时候,也在网上看了些资料,也遇到了些坑,最后参考的是这篇文章,然后加上自己的简单修改,以及博主的悉心指导,最后终于实现了我预期的效果。 参考文章连接:点击这里注意,在实现mqtt的时候,一定要先启动emqx消息服务器的服务,关于emqx的安装与使用,可以移步到这里点击这里 下面我们来看看实现代码。为了模拟的更加真实点儿,我这
Windows上Mqtt服务器搭建与使用客户端工具MqttBox进行测试: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112305328 在上面搭建好了MQTT服务器以及客户端工具MqttBox之后,怎样在SpringBoot中实现订阅主题接收消息和发布主题推送消息的功能。 博客:https://blog.csdn.net/badao_liumang_qizhi 关注公众号 霸道的程序猿 获取编程相关电子书、教程
公司中项目大多是物联网项目,需要跟设备进行交互,用到的协议比较多,如NB/MQTT/LWM2M/COAP等,项目中不可避免用到了MQTT协议,本文介绍springboot项目MQTT客户端实现,不多说直接上可执行代码。 一、EMQ官网java sdk demo,如果只需要用到一个客户端,可以参照下官网demo,修改下应用用项目 1、pom.xml依赖引用 <dependency> <groupId>org.eclipse.paho</groupId>
最近在做MQTT对接,然后发送消息,然后参考网上的实战文章进行了一下整理。 文章主要参考自(https://www.codetd.com/article/13550340),然后自己做了些许更改。 1、整合准备 SpringBoot:2.2.2.RELEASE MQTT平台:EMQX4.4.1(Docker运行) 虚拟机服务器:Centos7(192.168.56.102 ) 发送端:cloud-mqtt-send8001 接收端:cloud-mqtt-accept8002 2、发送端:cl
这玩意能干什么?我只能说,这是一个物联网的方案,能通过java客户端监听来自单片机发送的消息,单片机有什么消息?常见的有:比如持久性的传感器数据上报,这你得1s上传一次吧,还有一些控制设备的信息,比如灯,电机之类一次操作持续运转的东西。 写在前面: 曾经用过的一种需要接入Internet的物联网方案(这是目前的主流): 我之前就受益与免费的云平台(云服务器),比如我以前博客里介绍过了的巴法云平台,还有我以前用过的小熊派华为云平台,它们的好处显而易见,就是不需要你去搭建服务器,不需要去了解数据传输的各种
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.0...
[code=java] 2023-02-18 14:47:49.629 INFO 36724 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2023-02-18 14:47:49.629 INFO 36724 --- [extShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'daenhook-api.errorChannel' has 0 subscriber(s). 2023-02-18 14:47:49.629 INFO 36724 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger' 2023-02-18 14:47:49.629 INFO 36724 --- [extShutdownHook] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler' [/code] 有遇到过这个错误吗 spring-boot集成mqtt,实现发布和订阅(工厂模式) 花九猫的对象: 你都不测试的的吗?