参考资料:

http://mqtt.p2hp.com/mqtt311

https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/02-ControlPacketFormat.html

https://blog.csdn.net/anxianfeng55555/article/details/80908795

MQTT简介

​ MQTT是一种基于发布/订阅模式的轻量级通讯协议,该协议构建在TCP/IP协议上。 MQTT最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛应用。

  • 开放消息协议,简单易实现
  • 发布订阅模式,一对多消息发布
  • 基于TCP/IP网络连接,提供有序,无损,双向连接
  • 2字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量
  • 消息QoS支持,可靠传输保证
  • 物联网M2M通信,物联网大数据采集
  • Android消息推送,WEB消息推送
  • 智能硬件、智能家具、智能电器
  • 车联网通信,电动车站桩采集
  • 智慧城市、远程医疗、远程教育
  • 电力、石油与能源等行业市场
  • MQTT控制报文的结构

    ​ MQTT通过交换一些预定义的MQTT控制报文来工作,每条MQTT命令消息的消息头都包含一个固定的报头,有些消息会携带一个可变报文头和一个负荷。消息格式如下:

    |固定包头,存在于所有MQTT控制包
    |可变包头,存在于某些MQTT控制包
    |载荷,存在于某些MQTT控制包
    

    固定报文头(Fixed Header)

    ​ MQTT固定报文头最少有两个字节,第一个字节包含消息类型(Message Type)和QoS级别等标志位。第二个字节开始是剩余长度字段,该长度是后面的可变报文头加消息负载的总长度,该字段最多允许四个字节。

    ​ 剩余长度使用了一种可变长度的结构来编码,这种结构使用单一字节表示0-127的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识位。剩余长度最多可以用四个字节来表示。

    ​ 例如十进制的数字64可以被编码成一个单独的字节,十进制为64,八进制为0x40。十进制数字321(=65+2×128)被编码为两个字节,低位在前。第一个字节是65+128 = 193。注意最高位的128表示后面至少还有一个字节。第二个字节是2,表示2*127。(翻译注:321 = 11000001 00000010,第一个字节是“标识符后面还有一个字节”+65,第二个字节是“标识符后面没有字节了”+256)。

    可变报文头(Variable Header)

    ​ 可变报文头主要包含协议名、协议版本、连接标志(Connect Flags)、心跳间隔时间(Keep Alive timer)、连接返回码(Connect Return Code)、主题名(Topic Name)等

    有效负荷(Payload)

    ​ 可以理解为消息主题(body)

    ​ 当MQTT发送的消息类型是CONNECT(连接)、PUBLISH(发布)、SUBSCRIBE(订阅)、SUBACK(订阅确认)、则会带有负荷。

    ​ 各种类型消息的控制报文参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html

    MQTT的消息类型(Message Type)(控制报文类型)

    报文流动方向

    QoS 1:至少分发一次。服务器的消息接收由PUBACK消息进行确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。QoS 2:只分发一次。这是最高级别的消息传递,消息丢失和重复都是不可接受的,使用这个服务质量等级会有额外的开销。

    通过下面的例子可以更深刻的理解上面三个传输质量等级。
    比如目前流行的共享单车智能锁,智能锁可以定时使用QoS level 0质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过一段时间又会再发送一次。之后用户可以通过App查询周围单车位置,找到单车后需要进行解锁,这时候可以使用QoS level 1质量消息,手机App不断的发送解锁消息给单车锁,确保有一次消息能达到以解锁单车。最后用户用完单车后,需要提交付款表单,可以使用QoS level 2质量消息,这样确保只传递一次数据,否则用户就会多付钱了。

    Springboot整合MQTT实现消息发布和订阅

    一、在Linux上搭建MQTT服务

    1.1、打开EMQ官网:https://www.emqx.io/cn/products/broker

    1.2、点击开始试用

    1.3、选择服务器对应版本

    1.4、复制下载命令到ssh工具中执行

    ​ 下载完成

    1.5、下载完成后执行安装命令

    1.6、安装成功后执行命令:

    sudo emqx start
    

    出现以下信息表示启动成功

    1.7、测试

    ​ 浏览器访问ip:18083进入管理界面,默认账号为admin,密码为public

    二、MQTT服务搭建完成后使用Springboot整合MQTT协议

    2.1、创建一个maven项目

    2.2、在父工程下创建一个Springboot项目作为消息提供者,导入以下依赖

    <!--mqtt相关依赖-->
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    

    2.3、修改配置文件

    spring:
      application:
        name: provider
      #MQTT配置信息
      mqtt:
        #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
        url: tcp://ip:1883
        username: admin
        password: public
        #客户端id(不能重复)
        client:
          id: provider-id
        #MQTT默认的消息推送主题,实际可在调用接口时指定
        default:
          topic: topic
    server:
      port: 8081
    

    2.4、消息发布者客户端配置

    package com.xct.mqttprovider.mqtt;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
     * @Author: xct
     * @Date: 2021/7/30 15:32
     * @Description:
    @Configuration
    @Slf4j
    public class MqttProviderConfig {
        @Value("${spring.mqtt.username}")
        private String username;
        @Value("${spring.mqtt.password}")
        private String password;
        @Value("${spring.mqtt.url}")
        private String hostUrl;
        @Value("${spring.mqtt.client.id}")
        private String clientId;
        @Value("${spring.mqtt.default.topic}")
        private String defaultTopic;
         * 客户端对象
        private MqttClient client;
         * 客户端连接服务端
         * @author xct
         * @param
         * @return void
         * @date 2021/7/30 16:01
        public void connect(){
            try {
                //创建MQTT客户端对象
                client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
                //连接设置
                MqttConnectOptions options = new MqttConnectOptions();
                //是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
                //设置为true表示每次连接到服务端都是以新的身份
                options.setCleanSession(true);
                //设置连接用户名
                options.setUserName(username);
                //设置连接密码
                options.setPassword(password.toCharArray());
                //设置超时时间,单位为秒
                options.setConnectionTimeout(100);
                //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
                options.setKeepAliveInterval(20);
                //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
                options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
                //设置回调
                client.setCallback(new MqttProviderCallBack());
                client.connect(options);
            } catch (MqttException e) {
                e.printStackTrace();
        public void publish(int qos,boolean retained,String topic,String message){
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(retained);
            mqttMessage.setPayload(message.getBytes());
            //主题目的地,用于发布/订阅消息
            MqttTopic mqttTopic = client.getTopic(topic);
            //提供一种机制来跟踪消息的传递进度。
            //用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度
            MqttDeliveryToken token;
            try {
                //将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。
                //一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。
                token = mqttTopic.publish(mqttMessage);
                token.waitForCompletion();
            } catch (MqttException e) {
                e.printStackTrace();
    

    2.5、消息发布客户端回调

    package com.xct.mqttprovider.mqtt;
    import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
     * @Author: xct
     * @Date: 2021/7/30 16:00
     * @Description:
    @Configuration
    public class MqttProviderCallBack implements MqttCallback {
        @Value("${spring.mqtt.client.id}")
        private String clientId;
         * 与服务器断开连接的回调
         * @author xct
         * @param throwable
         * @return void
         * @date 2021/7/30 16:19
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println(clientId + "与服务器断开连接");
         * 消息到达的回调
         * @author xct
         * @param s
         * @param mqttMessage
         * @return void
         * @date 2021/7/30 16:19
        @Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
         * 消息发布成功的回调
         * @author xct
         * @param iMqttDeliveryToken
         * @return void
         * @date 2021/7/30 16:20
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            IMqttAsyncClient client = iMqttDeliveryToken.getClient();
            System.out.println(client.getClientId() + "发布消息成功!");
    

    2.6、创建控制器测试发布消息

    package com.xct.mqttprovider.controller;
    import com.xct.mqttprovider.mqtt.MqttProviderConfig;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
     * @Author: xct
     * @Date: 2021/7/30 16:26
     * @Description:
    @Controller
    public class SendController {
        @Autowired
        private MqttProviderConfig providerClient;
        @RequestMapping("/sendMessage")
        @ResponseBody
        public String sendMessage(int qos,boolean retained,String topic,String message){
            try {
                providerClient.publish(qos,retained,topic,message);
                return "发送成功";
            }catch (Exception e){
                e.printStackTrace();
                return "发送失败";
    

    2.7、在父工程下创建一个Springboot项目作为消息消费者,导入以下依赖

    <!--mqtt相关依赖-->
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    

    2.8、配置文件

    spring:
      application:
        name: consumer
      #MQTT配置信息
      mqtt:
        #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
        url: tcp://ip:1883
        username: admin
        password: public
        #客户端id(不能重复)
        client:
          id: consumer-id
        #MQTT默认的消息推送主题,实际可在调用接口时指定
        default:
          topic: topic
    server:
      port: 8082
    

    2.9、消费者客户端配置

    package com.xct.mqttconsumer.mqtt;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import javax.annotation.PostConstruct;
     * @Author: xct
     * @Date: 2021/7/30 17:06
     * @Description:
    @Configuration
    public class MqttConsumerConfig {
        @Value("${spring.mqtt.username}")
        private String username;
        @Value("${spring.mqtt.password}")
        private String password;
        @Value("${spring.mqtt.url}")
        private String hostUrl;
        @Value("${spring.mqtt.client.id}")
        private String clientId;
        @Value("${spring.mqtt.default.topic}")
        private String defaultTopic;
         * 客户端对象
        private MqttClient client;
         * 在bean初始化后连接到服务器
         * @author xct
         * @param
         * @return void
         * @date 2021/7/30 16:48
        @PostConstruct
        public void init(){
            connect();
         * 客户端连接服务端
         * @author xct
         * @param
         * @return void
         * @date 2021/7/30 16:01
        public void connect(){
            try {
                //创建MQTT客户端对象
                client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
                //连接设置
                MqttConnectOptions options = new MqttConnectOptions();
                //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
                //设置为true表示每次连接到服务端都是以新的身份
                options.setCleanSession(true);
                //设置连接用户名
                options.setUserName(username);
                //设置连接密码
                options.setPassword(password.toCharArray());
                //设置超时时间,单位为秒
                options.setConnectionTimeout(100);
                //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
                options.setKeepAliveInterval(20);
                //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
                options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
                //设置回调
                client.setCallback(new MqttConsumerCallBack());
                client.connect(options);
                //订阅主题
                //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
                int[] qos = {1,1};
                String[] topics = {"topic1","topic2"};
              	//订阅主题
                client.subscribe(topics,qos);
            } catch (MqttException e) {
                e.printStackTrace();
         * 断开连接
         * @author xct
         * @param
         * @return void
         * @date 2021/8/2 09:30
        public void disConnect(){
            try {
                client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
         * 订阅主题
         * @author xct
         * @param topic
         * @param qos
         * @return void
         * @date 2021/7/30 17:12
        public void subscribe(String topic,int qos){
            try {
                client.subscribe(topic,qos);
            } catch (MqttException e) {
                e.printStackTrace();
    

    3.0、 消息消费者客户端回调

    package com.xct.mqttconsumer.mqtt;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
     * @Author: xct
     * @Date: 2021/7/30 17:06
     * @Description:
    public class MqttConsumerCallBack implements MqttCallback {
         * 客户端断开连接的回调
         * @author xct
         * @param throwable
         * @return void
         * @date 2021/7/30 17:14
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("与服务器断开连接,可重连");
         * 消息到达的回调
         * @author xct
         * @param topic
         * @param message
         * @return void
         * @date 2021/7/30 17:14
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println(String.format("接收消息主题 : %s",topic));
            System.out.println(String.format("接收消息Qos : %d",message.getQos()));
            System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
            System.out.println(String.format("接收消息retained : %b",message.isRetained()));
         * 消息发布成功的回调
         * @author xct
         * @param iMqttDeliveryToken
         * @return void
         * @date 2021/7/30 17:14
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    

    3.1、控制器提供手动建立连接和断开连接方法

    package com.xct.mqttconsumer.controller;
    import com.xct.mqttconsumer.mqtt.MqttConsumerConfig;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
     * @Author: xct
     * @Date: 2021/7/30 17:20
     * @Description:
    @Controller
    public class TestController {
        @Autowired
        private MqttConsumerConfig client;
        @Value("${spring.mqtt.client.id}")
        private String clientId;
        @RequestMapping("connect")
        @ResponseBody
        public String connect(){
            client.connect();
            return clientId + "连接到服务器";
        @RequestMapping("disConnect")
        @ResponseBody
        public String disConnect(){
            client.disConnect();
            return clientId + "与服务器断开连接";
    

    3.2、测试

    分别启动两个项目,可以在管理界面看到创建的两个客户端

    调用发布消息接口发布消息

    消费者控制台打印

    3.3、客户端断线消息恢复

    把消费者与服务端断开连接

    再调用发布消息接口发送两条消息到topic1,然后再把消费者连接到服务端

    控制台没有东西打印

    修改消费者客户端配置,把setCleanSession改为false

    重启项目,把消费者客户端断开连接,调用发布消息接口发布两条消息,再把消费者和服务端连接上