MQTT与EMQ X
MQTT主题
MQTT主题本质上是一个UTF-8编码的字符串,是MQTT协议进行消息路由的基础。MQTT主题类似URL路径,使用斜杠 / 进行分层。
为了避免歧义且易于理解,通常不建议主题以 / 开头或结尾,例如 /chat 或 chat/。
不同于消息队列(比如Kafka和Pulsar)中的主题,MQTT主题不需要提前创建,MQTT客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。
下图是一个简单的MQTT订阅与发布流程, APP 1 订阅了sensor/2/temperature 主题后,将能接收到 Sensor 2 发布到该主题的消息。
MQTT主题通配符
- 单层通配符 + :单层通配符用于匹配单个层级的主题。例如,home/+/temperature 可以匹配 home/livingroom/temperature 和 home/bedroom/temperature 等。在使用单层通配符时,单层通配符必须占据整个层级
- 多层通配符 # :多层通配符用于匹配多个层级的主题。例如,home/# 可以匹配所有以 home/ 开头的主题,如 home/livingroom/temperature 和 home/bedroom/light 等。在使用多层通配符时,它必须占据整个层级并且 必须是主题的最后一个字符
- 零层通配符 $ :零层通配符用于匹配系统保留的特殊主题,如控制主题和状态主题。在MQTT中,一些特定的主题以美元符号 $ 开头,用于系统级别的消息传输和控制。例如,$SYS/broker/uptime 表示获取代理服务器的运行时间。
系统主题
以 $SYS/ 开头的主题为系统主题,被保留给服务器用来发布一些特定的消息,比如服务器的运行时间、客户端的上下线事件通知、当前连接的客户端数量等等。我们一般将这些主题称为系统主题,客户端可以订阅这些系统主题来获取服务器的有关信息。
不同场景中的主题设计
(1)智能家居
比如我们用传感器监测卧室、客厅以及厨房的温度、湿度和空气质量,可以设计以下几个主题:
myhome/bedroom/temperature
myhome/bedroom/humidity
myhome/bedroom/airquality
myhome/livingroom/temperature
myhome/livingroom/humidity
myhome/livingroom/airquality
myhome/kitchen/temperature
myhome/kitchen/humidity
myhome/kitchen/airquality
接下来,可以通过订阅 myhome/bedroom/+ 主题获取卧室的温度、湿度及空气质量数据,订阅 myhome/+/temperature 主题获取三个房间的温度数据,订阅 myhome/# 获取所有的数据。
(2)充电桩
充电桩的上行主题格式为 ocpp/cp/${cid}/notify/${action},下行主题格式为 ocpp/cp/${cid}/reply/${action}。
ocpp/cp/cp001/notify/bootNotification // 充电桩上线时向该主题发布上线请求
ocpp/cp/cp001/notify/startTransaction // 向该主题发布充电请求
ocpp/cp/cp001/reply/bootNotification // 充电桩上线前需订阅该主题接收上线应答
ocpp/cp/cp001/reply/startTransaction // 充电桩发起充电请求前需订阅该主题接收充电请求应答
(3)即时消息
chat/user/${user_id}/inbox // 一对一聊天:用户上线后订阅该收件箱主题,将能接收到好友发送给自己的消息。给好友回复消息时,只需要将该主题的 user_id 换为好友的的id即可
chat/group/${group_id}/inbox // 群聊:用户加群成功后,可订阅该主题获取对应群组的消息,回复群聊时直接给该主题发布消息即可
req/user/${user_id}/add // 添加好友:可向该主题发布添加好友的申请(user_id 为对方的id),接收好友请求:用户可订阅该主题(user_id 为自己的id)接收其他用户发起的好友请求。
resp/user/${user_id}/add // 接收好友请求的回复:用户添加好友前,需订阅该主题接收请求结果(user_id 为自己的id)。回复好友申请:用户向该主题发送消息表明是否同意好友申请(user_id 为对方的id)。
user/${user_id}/state // 用户在线状态:用户可以订阅该主题获取好友的在线状态。
MQTT主题常见问题及解答
主题的层级及长度有什么限制吗?
MQTT协议规定主题的长度为两个字节,因此主题最多可包含 65,535 个字符。建议主题层级为7个以内。使用较短的主题名称和较少的主题层级意味着较少的资源消耗,例如 my-home/room1/data 比 my/home/room1/data 更好。
服务器对主题数量有限制吗?
不同消息服务器对最大主题数量的支持各不一致,但主题数量越多将会消耗越多的服务器内存。考虑到连接到MQTT Broker的设备数量一般较多,我们建议一个客户端订阅的主题数量最好控制在10个以内。
通配符主题订阅与普通主题订阅性能是否一致?
通配符主题订阅的性能弱于普通主题订阅,且会消耗更多的服务器资源,用户可根据实际业务情况选择订阅类型。
同一个主题能被共享订阅与普通订阅同时使用吗?
可以,但是不建议同时使用。
常见的MQTT主题使用建议有哪些?
- 不建议使用 # 订阅所有主题
- 不建议主题以 / 开头或结尾,例如 /chat 或 chat/
- 不建议在主题里添加空格及非ASCII特殊字符
- 同一主题层级内建议使用下 划线 _ 或 横杆 - 连接单词或者使用 驼峰命名
- 尽量使用较少的主题层级
- 当使用通配符时,将唯一值的主题层(例如设备号)越靠近第一层越好。例如,device/00000001/command/# 比device/command/00000001/# 更好
MQTT服务质量QoS
很多时候,使用MQTT协议的设备都运行在网络受限的环境下,而只依靠底层的TCP传输协议,并不能完全保证消息的可靠到达。因此,MQTT提供了QoS机制,其核心是设计了多种消息交互机制来提供不同的服务质量,来满足用户在各种场景下对消息可靠性的要求。每条消息都可以在发布时独立设置自己的 QoS。
MQTT定义了三个QoS等级,分别为:
- QoS 0 :最多交付一次,消息可能丢失
- QoS 1 :至少交付一次,消息可以保证到达,但是可能重复
- QoS 2 :恰好交付一次,消息保证到达,并且不会重复
其中,使用QoS 0可能丢失消息,使用QoS 1可以保证收到消息,但消息可能重复,使用QoS 2可以保证消息既不丢失也不重复。QoS等级从低到高,不仅意味着消息可靠性的提升,也意味着传输复杂程度的提升。
在一个完整的从发布者到订阅者的消息投递流程中,QoS等级是由发布者在PUBLISH报文中指定的,大部分情况下Broker向订阅者转发消息时都会维持原始的QoS不变。不过也有一些例外的情况,根据订阅者的订阅要求,消息的QoS等级可能会在转发的时候发生 降级 。例如,订阅者在订阅时要求Broker可以向其转发的消息的最大QoS等级为QoS 1,那么后续所有QoS 2消息都会降级至QoS 1转发给此订阅者,而所有QoS 0和QoS 1消息则会保持原始的QoS等级转发。
MQTT中每个QoS等级的具体原理
QoS 0 最多交付一次
QoS 0 是最低的QoS等级。QoS 0消息即发即弃,不需要等待确认,不需要存储和重传,因此对于接收方来说,永远都不需要担心收到重复的消息。
为什么QoS 0消息会丢失?
当我们使用QoS 0传递消息时, 消息的可靠性完全依赖于底层的TCP协议 。而TCP只能保证在连接稳定不关闭的情况下消息的可靠到达,一旦出现连接关闭、重置,仍有可能丢失当前处于网络链路或操作系统底层缓冲区中的消息,这也是QoS 0消息最主要的丢失场景。
QoS 1 至少交付一次
为了保证消息到达,QoS 1加入了 应答与重传机制 ,发送方只有在收到接收方的 PUBACK报文 以后,才能认为消息投递成功,在此之前,发送方需要存储该PUBLISH报文以便下次重传。QoS 1需要在PUBLISH报文中设置 Packet ID ,而作为响应的PUBACK报文,则会使用与PUBLISH报文相同的Packet ID, 以便发送方收到后删除正确的PUBLISH报文缓存 。
为什么QoS 1消息会重复?
对于发送方来说,没收到PUBACK报文分为以下两种情况:
- PUBLISH未到达接收方
- PUBLISH已经到达接收方,接收方的PUBACK报文还未到达发送方
在第一种情况下,发送方虽然重传了PUBLISH报文,但是对于接收方来说,实际上仍然仅收到了一次消息。但是在第二种情况下,在发送方重传时,接收方已经收到过这个PUBLISH报文,这就导致接收方将收到重复的消息。
虽然重传时PUBLISH报文中的 DUP标志会被设置为1,用以表示这是一个重传的报文 。但是接收方并不能因此假定自己曾经接收过这个消息,仍然需要将其视作一个全新的消息。
这是因为对于接收方来说,可能存在以下两种情况:
第一种情况,发送方由于没有收到PUBACK报文而重传了PUBLISH报文。此时,接收方收到的前后两个PUBLISH报文使用了相同的Packet ID,并且第二个PUBLISH报文的DUP标志为1,此时它确实是一个重复的消息。
第二种情况,第一个PUBLISH报文已经完成了投递,1024这个Packet ID重新变为可用状态。发送方使用这个Packet ID发送了一个全新的PUBLISH报文,但这一次报文未能到达对端,所以发送方后续重传了这个PUBLISH报文。这就使得虽然接收方收到的第二个PUBLISH报文同样是相同的Packet ID,并且DUP为1,但确实是一个全新的消息。
由于我们无法区分这两种情况,所以只能让接收方将这些PUBLISH报文都当作全新的消息来处理。因此当我们使用QoS 1时,消息的重复在协议层面上是无法避免的。
甚至在比较极端的情况下,例如Broker从发布方收到了重复的PUBLISH报文,而在将这些报文转发给订阅方的过程中,再次发生重传,这将导致订阅方最终收到更多的重复消息。
在下图表示的例子中,虽然发布者的本意只是发布一条消息,但对接收方来说,最终却收到了三条相同的消息:
以上,就是QoS 1保证消息到达带来的副作用。
QoS 2 恰好交付一次
QoS 2解决了QoS 0、1消息可能丢失或者重复的问题,但相应地,它也带来了最复杂的交互流程和最高的开销。每一次的QoS 2消息投递,都要求发送方与接收方进行至少两次请求/响应流程。
首先,发送方存储并发送QoS为2的PUBLISH报文以启动一次QoS 2消息的传输,然后等待接收方回复 PUBREC报文 。这一部分与QoS 1基本一致,只是响应报文从PUBACK变成了PUBREC。
当发送方收到PUBREC报文,即可确认对端已经收到了PUBLISH报文,发送方将不再需要重传这个报文,并且也不能再重传这个报文。所以此时发送方可以删除本地存储的PUBLISH报文,然后发送一个PUBREL报文,通知对端自己准备将本次使用的Packet ID标记为可用了。与PUBLISH报文一样,我们需要确保PUBREL报文到达对端,所以也需要一个响应报文,并且这个PUBREL报文需要被存储下来以便后续重传。
当接收方收到PUBREL报文,也可以确认在这一次的传输流程中不会再有重传的PUBLISH报文到达,因此回复PUBCOMP报文表示自己也准备好将当前的Packet ID用于新的消息了。
当发送方收到PUBCOMP报文,这一次的QoS 2消息传输就算正式完成了。在这之后,发送方可以再次使用当前的Packet ID发送新的消息,而接收方再次收到使用这个Packet ID的PUBLISH报文时,也会将它视为一个全新的消息。
为什么QoS 2消息不会重复?
QoS 2消息保证不会丢失的逻辑与QoS 1相同,所以这里我们就不再重复了。
与QoS 1相比,QoS 2新增了PUBREL报文和PUBCOMP报文的流程,也正是这个新增的流程带来了消息不会重复的保证。
在我们更进一步之前,我们先快速回顾一下QoS 1消息无法避免重复的原因。
当我们使用QoS 1消息时,对接收方来说,回复完PUBACK这个响应报文以后Packet ID就重新可用了,也不管响应是否确实已经到达了发送方。所以就无法得知之后到达的,携带了相同Packet ID的PUBLISH报文,到底是发送方因为没有收到响应而重传的,还是发送方因为收到了响应所以重新使用了这个Packet ID发送了一个全新的消息。
所以,消息去重的关键就在于,通信双方如何正确地同步释放Packet ID,换句话说,不管发送方是重传消息还是发布新消息,一定是和对端达成共识了的。
而QoS 2中增加的PUBREL流程,正是提供了帮助通信双方协商Packet ID何时可以重用的能力。
QoS 2规定,发送方只有在收到PUBREC报文之前可以重传PUBLISH报文。一旦收到PUBREC报文并发出PUBREL报文,发送方就进入了Packet ID释放流程,不可以再使用当前Packet ID重传PUBLISH报文。同时,在收到对端回复的PUBCOMP报文确认双方都完成Packet ID释放之前,也不可以使用当前Packet ID发送新的消息。
因此,对于接收方来说,能够以PUBREL报文为界限,凡是在PUBREL报文之前到达的PUBLISH报文,都必然是重复的消息;而凡是在PUBREL报文之后到达的PUBLISH报文,都必然是全新的消息。
一旦有了这个前提,我们就能够在协议层面完成QoS 2消息的去重。
不同QoS的适用场景和注意事项
QoS 0
QoS 0的缺点是可能会丢失消息,消息丢失的频率依赖于你所处的网络环境,并且可能使你错过断开连接期间的消息,不过优点是投递的效率较高。所以我们通常选择使用QoS 0传输一些高频且不那么重要的数据,比如 传感器数据,周期性更新 ,即使遗漏几个周期的数据也可以接受。
QoS 1
QoS 1可以保证消息到达,所以适合传输一些较为重要的数据,比如下达关键指令、更新重要的有实时性要求的状态等。但因为QoS 1还可能会导致消息重复,所以当我们选择使用QoS 1时,还需要 能够处理消息的重复,或者能够允许消息的重复 。在我们决定使用QoS 1并且不对其进行去重处理之前,我们需要先了解,允许消息的重复,可能意味着什么。如果我们不对QoS 1进行去重处理,我们可能会遭遇这种情况,发布方以1、2的顺序发布消息,但最终订阅方接收到的消息顺序可能是1、2、1、2。如果1表示开灯指令,2表示关灯指令,我想大部分用户都不会接受自己仅仅进行了开灯然后关灯的操作,结果灯在开和关的状态来回变化。
QoS 2
QoS 2既可以保证消息到达,也可以保证消息不会重复,但传输成本最高。如果我们不愿意自行实现去重方案,并且能够接受QoS 2带来的额外开销,那么QoS 2将是一个合适的选择。通常我们会在 金融、航空 等行业场景下会更多地见到QoS 2的使用。
关于MQTT QoS的Q&A
如何为QoS 1消息去重?
在我们介绍QoS 1的时候讲到,QoS 1消息的重复在协议层面上是无法避免的。所以如果我们想要对QoS 1消息进行去重,只能从业务层面入手。一个比较常用且简单的方法是, 在每个PUBLISH报文的Payload中都带上一个时间戳或者一个单调递增的计数 ,这样上层业务就可以根据当前收到消息中的时间戳或计数是否大于自己上一次接收的消息中的时间戳或计数来判断这是否是一个新消息。
何时向后分发QoS 2消息?
我们已经了解到,QoS 2的流程是非常长的,为了不影响消息的实时性,我们可以在第一次收到PUBLISH报文时,就启动消息的向后分发。当然一旦开始向后分发,后续收到在PUBREL报文之前到达的PUBLISH报文,都不能再重复分发操作,以免消息重复。
不同QoS的性能有差距么?
以EMQX为例,在相同的硬件配置下进行点对点通信,通常QoS 0与QoS 1能够达到的吞吐比较接近,不过QoS 1的CPU占用会略高于QoS 0,负载较高时,QoS 1的消息延迟也会进一步增加。而QoS 2能够达到的吞吐一般仅为QoS 0、1的一半左右。
会话
QoS 只是设计了消息可靠到达的理论机制,而会话则确保了 QoS 1、2 的协议流程得以真正实现。会话是客户端与服务端之间的有状态交互,它可以仅持续和网络连接一样长的时间,也可以跨越多个网络连接存在,我们通常将后者称为持久会话。我们可以选择让连接从已存在的会话中恢复,也可以选择从一个全新的会话开始。
不稳定的网络及有限的硬件资源是物联网应用需要面对的两大难题,MQTT 客户端与服务器的连接可能随时会因为网络波动及资源限制而异常断开。为了解决网络连接断开对通信造成的影响,MQTT 协议提供了持久会话功能。
MQTT 客户端在发起到服务器的连接时,可以设置是否创建一个持久会话。持久会话会保存一些重要的数据,以使会话能在多个网络连接中继续。持久会话主要有以下三个作用:
- 避免因网络中断导致需要反复订阅带来的额外开销。
- 避免错过离线期间的消息。
- 确保 QoS 1 和 QoS 2 的消息质量保证不被网络中断影响。
持久会话需要存储哪些数据?
通过上文我们知道持久会话需要存储一些重要的数据,以使会话能被恢复。这些数据有的存储在客户端,有的则存储在服务端。
客户端中存储的会话数据:
- 已发送给服务端,但是还没有完成确认的 QoS 1 与 QoS 2 消息。
- 从服务端收到的,但是还没有完成确认的 QoS 2 消息。
服务端中存储的会话数据:
- 会话是否存在,即使会话状态其余部分为空。
- 已发送给客户端,但是还没有完成确认的 QoS 1 与 QoS 2 消息。
- 等待传输给客户端的 QoS 0 消息(可选),QoS 1 与 QoS 2 消息。
- 从客户端收到的,但是还没有完成确认的 QoS 2 消息,遗嘱消息和遗嘱延时间隔。
MQTT Clean Session 的使用
Clean Session 是用来控制会话状态生命周期的标志位,为
true
时表示创建一个新的会话,在客户端断开连接时,会话将自动销毁。为
false
时表示创建一个持久会话,在客户端断开连接后会话仍然保持,直到会话超时注销。
注意: 持久会话能被恢复的前提是客户端使用固定的 Client ID 再次连接,如果 Client ID 是动态的,那么连接成功后将会创建一个新的持久会话。
如下为EMQX的 Dashboard,可以看到图中的连接虽然是断开状态,但是因为它是持久会话,所以仍然能被查看到,并且可以在 Dashboard 中手动清除该会话。
同时,EMQX 也支持在 Dashboard 中设置 Session 相关参数。
MQTT 3.1.1 没有规定持久会话应该在什么时候过期,如果仅从协议层面理解的话,这个持久会话应该永久存在。但在实际场景中这并不现实,因为它会非常占用服务端的资源,所以服务端通常不会完全遵循协议来实现,而是向用户提供一个全局配置来限制会话的过期时间。
比如 EMQ 提供的 免费的公共 MQTT 服务器 设置的会话过期时间为 5 分钟,最大消息数为 1000 条,且不保存 QoS 0 消息。
MQTT 5.0 中的会话改进
MQTT 5.0 中将 Clean Session 拆分成了 Clean Start 与 Session Expiry Interval。Clean Start 用于指定连接时是创建一个全新的会话还是尝试复用一个已存在的会话,Session Expiry Interval 用于指定网络连接断开后会话的过期时间。
Clean Start 为
true
时表示必须丢弃任何已存在的会话,并创建一个全新的会话;为
false
时表示必须使用与 Client ID 关联的会话来恢复与客户端的通信(除非会话不存在)。
Session Expiry Interval 解决了 MQTT 3.1.1 中持久会话永久存在造成的服务器资源浪费问题。设置为 0 或未设置,表示断开连接时会话即到期;设置为大于 0 的数值,则表示会话在网络连接关闭后会保持多少秒;设置为
0xFFFFFFFF
表示会话永远不会过期。
关于 MQTT 会话的 Q&A
当会话结束后,保留消息还存在么?
MQTT 保留消息不是会话状态的一部分,它们不会在会话结束时被删除。
客户端如何知道当前会话是被恢复的会话?
MQTT 协议从 v3.1.1 开始,就为 CONNACK 报文设计了 Session Present 字段。当服务器返回的该字段值为 1 时,表示当前连接将会复用服务器保存的会话。 客户端可通过该字段值决定在连接成功后是否需要重新订阅 。
使用持久会话时有哪些建议?
- 不能使用动态 Client ID,需要保证客户端每次连接的 Client ID 都是固定的。
- 根据服务器性能、网络状况、客户端类型等合理评估会话过期时间。设置过长会占用更多的服务端资源,设置过短会导致未重连成功会话就失效。
- 当客户端确定不再需要会话时,可使用 Clean Session 为 true 进行重连,重连成功后再断开连接。如果是 MQTT 5.0 则可 在断开连接时直接设置 Session Expiry Interval 为 0,表示连接断开后会话即失效 。
保留消息
EMQX 实现了 MQTT 的保留消息功能。可以通过 retain 字段将要发布到某个主题的消息标记为“保留消息”,并将其保存为 EMQX 上的持久消息。当任何新的订阅者订阅与保留消息的主题匹配的主题时,他们会立即接收到该消息,即使该消息是在他们订阅该主题之前发布的。EMQX 仅存储每个主题的最后一个保留消息。 保留消息的默认过期时间是永不过期,除非用户手动删除该消息。
Retain as Published
:这一选项用来指定服务端向客户端转发消息时是否要保留其中的 RETAIN 标识,注意这一选项不会影响保留消息中的 RETAIN 标识。因此当 Retain As Publish 选项被设置为 0 时,客户端直接依靠消息中的 RETAIN 标识来区分这是一个正常的转发消息还是一个保留消息,而不是去判断消息是否是自己订阅后收到的第一个消息(转发消息甚至可能会先于保留消息被发送,视不同 Broker 的具体实现而定)。
Retain Handling
:这一选项用来指定订阅建立时服务端是否向客户端发送保留消息:
- Retain Handling 等于 0,只要客户端订阅成功,服务端就发送保留消息。
- Retain Handling 等于 1,客户端订阅成功且该订阅此前不存在,服务端才发送保留消息。毕竟有些时候客户端重新发起订阅可能只是为了改变一下 QoS,并不意味着它想再次接收保留消息。
- Retain Handling 等于 2,即便客户订阅成功,服务端也不会发送保留消息。
保留消息列表
在 监控 -> 保留消息 页面中,用户可以查看系统中的所有保留消息,包括主题、QoS、发布时间和客户端 ID。页面还提供了 查看 Payload 和 删除 这两个操作选项用来查看保留消息 Payload 以及删除保留消息。用户可以使用 刷新 按钮刷新列表,并使用 设置 按钮访问保留消息设置页面。
默认情况下,将保存三种类型的保留消息系统主题。如果是集群环境,则会根据其他节点名称保留不同系统主题的保留消息,例如:
- $SYS/brokers/+/sysdescr:当前 EMQX 节点的系统描述
- $SYS/brokers/+/version:当前 EMQX 节点的版本号
- $SYS/brokers:当前 EMQX 的节点数量和名称
删除保留消息
要删除 EMQX 中的保留消息,用户可以在客户端向保留消息的主题发布一个空消息,或在 EMQX Dashboard 中,简单地点击保留消息列表页面上的 删除 按钮来删除保留消息。此外,用户还可以在保留消息配置页面上设置保留消息的过期时间,使其在过期时自动删除。
遗嘱消息
发布订阅模式的特性决定了,除了服务器以外没有客户端能够感知到某个客户端从通信网络中离开。而遗嘱消息则为连接意外断开的客户端提供了向其他客户端发出通知的能力。
客户端可以在连接时向服务器设置自己的遗嘱消息,服务器将在客户端异常断开后立即或延迟一段时间后发布这个遗嘱消息。而订阅了对应遗嘱主题的客户端,将收到这个遗嘱消息,并且采取相应的措施,例如更新该客户端的在线状态等等。
EMQX 实现了 MQTT 的遗嘱消息功能。如果为客户端设置了遗嘱消息,在客户端意外断开连接时,EMQX 将把遗嘱消息发送给相关的订阅者,以便订阅者可以得知并更新客户端状态。
共享订阅
共享订阅是 MQTT 5.0 引入的新特性,用于在多个订阅者之间实现订阅的负载均衡,MQTT 5.0规定的共享订阅主题以 $share 开头。
下图中,3个订阅者用共享订阅的方式订阅了同一个主题 $share/g/topic,其中topic 是它们订阅的真实主题名,而 $share/g/ 是共享订阅前缀(g/ 是群组名,可为任意UTF-8编码字符串)。
EMQX 实现了 MQTT 的共享订阅功能。共享订阅是一种订阅模式,用于在多个订阅者之间实现负载均衡。客户端可以分为多个订阅组,消息仍然会被转发到所有订阅组,但每个订阅组内只有一个客户端接收消息。您可以为一组订阅者的原始主题添加前缀以启用共享订阅。EMQX 支持两种格式的共享订阅前缀,分别为带群组的共享订阅(前缀为
$share/<group-name>/
)和不带群组的共享订阅(前缀为
$queue/
)。两种共享订阅格式示例如下:
前缀格式 | 示例 | 前缀 | 真实主题名 |
---|---|---|---|
带群组格式 | $share/abc/t/1 | $share/abc/ | t/1 |
不带群组格式 | $queue/t/1 | $queue/ | t/1 |
您可以使用客户端工具连接 EMQX 并尝试这个消息服务。 本节介绍了共享订阅的机制并演示了如何使用 MQTTX Desktop 和 MQTTX CLI 来模拟客户端尝试通过共享订阅来接收消息。
带群组的共享订阅
您可以通过在原始主题前添加
$share/<group-name>
前缀为分组的订阅者启用共享订阅。组名可以是任意字符串。EMQX 同时将消息转发给不同的组,属于同一组的订阅者可以使用负载均衡接收消息。
例如,如果订阅者
s1
、
s2
和
s3
是组
g1
的成员,订阅者
s4
和
s5
是组
g2
的成员,而所有订阅者都订阅了原始主题
t1
,共享订阅的主题必须是
$share/g1/t1
和
$share/g2/t1
。当 EMQX 发布消息
msg1
到原始主题
t1
时:
-
EMQX 将
msg1
发送给g1
和g2
两个组。 -
s1
、s2
、s3
中的一个订阅者将接收msg1
。 -
s4
和s5
中的一个订阅者将接收msg1
。
不带群组的共享订阅
以
$queue/
为前缀的共享订阅是不带群组的共享订阅。它是
$share
订阅的一种特例。您可以将其理解为所有订阅者都在一个订阅组中,如
$share/$queue
。
共享订阅与会话
在 MQTT 客户端中,共享订阅和持久会话功能的概念存在矛盾,因此无法同时使用这两个特性。
如果您正在使用共享订阅功能,则必须通过将
clean_session
参数设置为
true
来启用 clean session 功能。
持久会话功能(
clean_session=false
)确保订阅者在重新连接后可以立即恢复数据流,而不会丢失任何消息。这对于确保消息传递的可靠性至关重要。通过将
clean_session
参数设置为
false
,即使客户端离线,会话仍将持续存在,使设备可以继续接收消息。然而,由于设备处于离线状态,可能无法及时处理接收到的消息,导致消息在会话中随着时间的推移不断积累。
当共享订阅功能被启用,同一组内的另一个设备接管离线设备的数据流,但它不会收到任何已积累的消息,因为这些消息属于原始设备的会话。 因此,如果设备长时间保持离线状态,持久会话的消息缓冲区可能会溢出,导致消息丢失。这种情况会影响负载平衡,并最终导致内存和存储资源耗尽,对系统的稳定性和性能产生负面影响。
排它订阅
排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅, 一个主题同一时刻仅被允许存在一个订阅者 ,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。
要进行排它订阅,您需要为主题名称添加前缀,如以下表格中的示例:
示例 | 前缀 | 真实主题名 |
---|---|---|
$exclusive/t/1 | $exclusive/ | t/1 |
当某个客户端
A
订阅
$exclusive/t/1
后,其他客户端再订阅
$exclusive/t/1
时都会失败,直到
A
取消了对
$exclusive/t/1
的订阅为止。
注意
:
排它订阅必须使用
$exclusive/
前缀,在上面的示例中,其他客户端依然可以通过
t/1
成功进行订阅。
订阅失败错误码
错误码 | 原因 |
---|---|
0x8F | 使用了 $exclusive/,但并未开启排它订阅 |
0x97 | 已经有客户端订阅了该主题 |
通过配置文件配置排它订阅
目前只能在配置文件中配置排它订阅,不支持通过 Dashboard 配置。
排它订阅默认未开启,可在
etc/emqx.conf
中配置:
mqtt.exclusive_subscription {
enable = true
}
延迟发布
延迟发布是 EMQX 支持的 MQTT 扩展功能。当客户端使用特殊主题前缀 $delayed/{DelayInteval} 发布消息时,将触发延迟发布功能,可以实现按照用户配置的时间间隔延迟发布消息。
延迟发布主题的具体格式如下:
$delayed/{DelayInterval}/{TopicName}
-
$delayed
:使用$delay
作为主题前缀的消息都将被视为需要延迟发布的消息,延迟间隔由下一主题层级中的内容决定 -
{DelayInterval}
:指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒, 如果{DelayInterval}
无法被解析为一个整型数字,EMQX 将丢弃该消息,客户端不会收到任何信息 -
{TopicName}
:MQTT 消息的主题名称
例如:
-
$delayed/15/x/y
:15 秒后将 MQTT 消息发布到主题x/y
-
$delayed/60/a/b
:1 分钟后将 MQTT 消息发布到a/b
-
$delayed/3600/$SYS/topic
:1 小时后将 MQTT 消息发布到$SYS/topic
自动订阅
自动订阅是 EMQX 支持的 MQTT 扩展功能。自动订阅能够给 EMQX 设置多个规则, 在设备成功连接后按照规则为其订阅指定主题,不需要额外发起订阅 。在 EMQX 5.0 之前,该功能叫做代理订阅。
通过 Dashboard 配置自动订阅
- 打开 EMQX Dashboard,在左侧导航菜单中,点击 管理 -> 代理订阅 。
- 在 代理订阅 页面,点击右上角的 + 添加 按钮。
-
在弹出的对话框中,在
主题
文本框中输入测试主题
a/1
。其他设置保持默认值。
- 主题 : 输入客户端自动订阅的主题。
-
QoS
: 指定主题的服务质量。选项:
0
、1
和2
。 -
No local
: 选项:
False
或True
。 -
保留发布
: 指定是否保留使用指定主题发送的消息。选项:
False
或True
。 -
保留处理
: 选项:
0
、1
和2
。
点击对话框中的
添加
按钮。自动订阅主题
a/1
创建成功。
现在自动订阅功能已启用,新的订阅者一旦连接到代理服务器,将自动订阅主题
a/1
。
主题重写
很多物联网设备不支持重新配置或升级,修改设备业务主题会非常困难。
主题重写功能可以帮助使这种业务升级变得更容易:通过给 EMQX 设置一套规则,它可以在订阅、发布时改变将原有主题重写为新的目标主题。
保留消息 和 延迟发布 也可以与主题重写结合使用。例如,当用户想使用延迟发布时,他们可以使用主题重写来将消息重定向到所需的主题。
由于发布/订阅授权检查会在主题重写之前执行,所以只要确保重写之前的主题能够通过 ACL 检查即可。
配置主题重写规则
EMQX 的主题重写规则需要用户自行配置,用户可以自行添加多条主题重写规则,规则的数量没有限制,但由于任何携带主题的 MQTT 报文都需要匹配一遍重写规则,因此此功能在高吞吐场景下带来的性能损耗与规则数量是成正比的,用户需要谨慎地使用此功能。
每条主题重写规则的格式如下:
rewrite = [
action: "all"
source_topic: "x/#"
dest_topic: "x/y/z/$1"
re: "^x/y/(.+)$"
]
每个重写规则由 过滤器 、 正则表达式 和 目标表达式 组成。
重写规则分为
publish
、
subscribe
和
all
规则,
publish
规则匹配 PUBLISH 报文携带的主题,
subscribe
规则匹配 SUBSCRIBE、UNSUBSCRIBE 报文携带的主题。
all
规则对 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 报文携带的主题都生效。
在启用主题重写的前提下,当收到 MQTT 数据包(如带有主题的PUBLISH消息)时,EMQX 将使用数据包中的主题来依次匹配配置文件中规则的主题过滤器部分。匹配成功之后,正则表达式就会被用来提取主题中的信息,然后用目标表达式替换旧的主题,生成一个新的主题。
目标表达式可以使用
$N
格式的变量来匹配从正则表达式中提取的元素。
$N
的值是指从正则表达式中提取的第 N 个元素,例如,
$1
是正则表达式提取的第一个元素。
同时,表达式中也可以使用
${clientid}
代表
客户端Id
, 使用
${username}
代表
客户端用户名
。
注意:EMQX 会按照配置文件中规则配置的顺序来执行主题重写。当一个主题可以同时匹配多个主题重写规则的主题过滤器时,EMQX 仅使用第一个匹配的规则来重写该主题。
如果规则中的正则表达式与 MQTT 数据包的主题不匹配,则重写失败,其他规则将不会被用来重写。因此,需要仔细设计 MQTT 数据包主题和主题重写规则。
示例
假设
etc/emqx.conf
文件中已经添加了以下主题重写规则:
rewrite = [
action: "all"
source_topic: "y/+/z/#"
dest_topic: "y/z/$2"
re: "^y/(.+)/z/(.+)$"
action: "all"
source_topic: "x/#"
dest_topic: "z/y/x/$1"
re: "^x/y/(.+)$"
action: "all"
source_topic: "x/y/+"
dest_topic: "z/y/$1"
re: "^x/y/(\d+)$"
]
如果订阅五个主题:
y/a/z/b
,
y/def
,
x/1/2
,
x/y/2
, 和
x/y/z
。
-
y/def
不符合任何主题过滤器,所以它不执行主题重写,只是订阅y/def
主题。 -
y/a/z/b
匹配y/+/z/#
主题过滤器,EMQX 执行第一条规则,并通过正则表达式匹配元素[a、b]
,将匹配的第二个元素带入y/z/$2
,并实际订阅主题y/z/b
。 -
x/1/2
匹配x/#
主题过滤器,EMQX 执行第二个规则。它不通过正则表达式匹配元素,不执行主题重写,并实际订阅了x/1/2
的主题。 -
x/y/2
同时匹配x/#
和x/y/+
两个主题过滤器,EMQX 以相反的顺序读取配置,所以它优先匹配第三个。通过正则替换,它实际上订阅了z/y/2
主题。 -
x/y/z
同时匹配x/#
和x/y/+
两个主题过滤器,EMQX 以相反的顺序读取配置,所以优先级匹配第三个。该元素没有通过正则表达式进行匹配,没有进行主题重写,它实际上订阅了x/y/z
主题。需要注意的是,即使第三条的正则表达式匹配失败,它也不会再匹配第二条的规则。
规则引擎
规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配 数据桥接 使用无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。
规则的组成
规则描述了 数据来源 、 数据处理过程 、 处理结果去向 三个方面:
- 数据来源 :规则的数据源可以是消息或事件,也可以是外部的数据系统。规则通过 SQL 的 FROM 子句指定数据的来源;
- 数据处理过程 :规则通过 SQL 语句和函数来描述数据的处理过程。SQL 的 WHERE 子句用于过滤数据,SELECT 子句以及 SQL 函数用于提取和转换数据;
- 处理结果去向 :规则可以定义一个或多个动作来处理 SQL 的输出结果。如果 SQL 执行通过,规则将按顺序执行相应的动作,比如将处理结果存储到数据库、或者重新发布到另一个 MQTT 主题等。
规则 SQL 语句简介
SQL 语句用于指定规则的数据来源、定义数据处理过程等。下面给出了一个 SQL 语句的例子:
SELECT
payload.data as d
"t/#"
WHERE
clientid = "foo"
-
数据来源:主题为
t/#
的消息 -
数据处理过程:如果发送消息的客户端 ID 为
foo
,则从消息内容中选出data
字段并赋值给新的变量d
"." 语法要求数据必须是 JSON 或者 Map 类型,如果是其他数据类型,须要使用 SQL 函数做数据类型转换。
动作
动作是用于处理规则的输出结果的组件,决定了数据的最终去向。
目前规则支持以下两种动作:
-
内置动作:目前仅有两种内置动作:消息重发布(
republish
) 和控制台输出(console
)。 - 数据桥接:数据桥接是通往外部数据系统的通道,规则可以直接使用数据桥接的 ID 作为动作, 将规则的输出交给数据桥接处理。
消息重发布
消息重发布动作用来发布一条新的 MQTT 消息,适用于需要向设备发送下行消息的场景。
“消息重发布”动作不会阻止原来的消息的投递。举例来说,如果一条 "a/1" 消息通过规则触发了重发布动作, 并发出一条新的消息 "a/2",那么 "a/1" 消息仍然会被投递到订阅了该主题的客户端。
消息重发布动作里面,可以自定义消息的消息内容、主题、QoS 等参数,可以用
${field-name}
的形式引用规则输出里的字段作为参数值。
控制台输出动作
控制台输出动作用于查看规则的输出结果,结果将以日志的形式打印到控制台里。
如果是用
emqx console
启动的 EMQX,结果将打印到前台。 如果用是
emqx start
启动的 EMQX,结果将打印到 EMQX 日志路径下的
erlang.log.*
文件里。
输出格式
输出第一行打印
[rule action]
头和规则的 ID。 从第二行开始打印分为两部分:
-
Action Data
部分为规则的输出结果,Action Data
中包含的字段都可以在动作的参数里面以${field-name}
的形式引用。 -
Envs
为动作可用的环境变量信息。环境变量信息包含该数据源的所有可用字段,以及其他跟本次动作执行相关的内部信息。
输出格式示例:
[rule action] rule_id1
Action Data: #{key1 => val1}
Envs: #{key1 => val1, key2 => val2}
控制台输出动作仅用于规则测试过程中的调试,用于生产环境将引起性能问题。
规则的典型应用场景举例
- 动作监听:智慧家庭智能门锁开发中,门锁会因为网络、电源故障、人为破坏等原因离线导致功能异常,使用规则配置监听离线事件向应用服务推送该故障信息,可以在接入层实现第一时间的故障检测的能力
- 数据筛选:车联网的卡车车队管理,车辆传感器采集并上报了大量运行数据,应用平台仅关注车速大于 40 km/h 时的数据,此场景下可以使用规则对消息进行条件过滤,向业务消息队列写入满足条件的数据
- 消息路由:智能计费应用中,终端设备通过不同主题区分业务类型,可通过配置规则将计费业务的消息接入计费消息队列并在消息抵达设备端后发送确认通知到业务系统,非计费信息接入其他消息队列,实现业务消息路由配置
- 消息编解码:其他公共协议 / 私有 TCP 协议接入、工控行业等应用场景下,可以通过规则的本地处理函数(可在 EMQX 上定制开发)做二进制 / 特殊格式消息体的编解码工作;亦可通过规则的消息路由将相关消息流向外部计算资源如函数计算进行处理(可由用户自行开发处理逻辑),将消息转为业务易于处理的 JSON 格式,简化项目集成难度、提升应用快速开发交付能力
获取设备上下线信息
通用方式
- MQTT协议-遗嘱机制 :MQTT 协议层面获取设备下线状态,可利用 MQTT 协议遗嘱机制获取设备状态,设备连接时启用遗嘱,在连接到Broker时,设置 Will-Topic、Will-Payload等属性,当Client异常断开时(即设备没有发送Disconnect报文),Broker会发布遗嘱 Topic 与 Payload,但遗嘱机制明显弊端是, 只有异常断开才会获得下线状态。
- MQTT协议-主题设计 :从MQTT协议侧,来获取设备上下线状态,更好的技巧和解决方法,可在 MQTT 协议 Topic 侧做设计,可以为 “presence” 进行主题设计。如,“presence/connect/client-id” ,当设备上线时,对其发布上线消息,当设备正常离线时(即设备发送Disconnect报文)对主题”presence/disconnect/client-id“发布其离线消息。
EMQ X实现方式
以上两种方式,是依赖MQTT 协议的遗嘱机制,或是主题层面的设计来获取设备在线状态,可适用任何实现MQTT 协议的Broker,但 EMQ君,在这里介绍如何简单、快速从EMQ X Broker获取设备在线状态的三种方式。
EMQ X 系统主题
EMQ X Broker 上下线状态主题:
- 上线主题:$SYS/brokers/${node}/clients/${clientId}/connected,当任意客户端上线时,EMQ就会向该主题发布消息
- 下线主题:$SYS/brokers/${node}/clients/${clientId}/disconnected,当任意客户端下线时,EMQ就会向该主题发布消息
node、clientid 分别指具体节点名、设备的客户端id,在这里我们可以采用主题通配符模式直接订阅一个主题即可:$SYS/brokers/+/clients/# 或者使用 $SYS/brokers/+/clients/+/connected 和 $SYS/brokers/+/clients/+/disconnected
如果想监听系统主题上下线,需要类似如下的ACL配置:
{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
connected 事件消息的 Payload 解析成 JSON 格式如下:
{
"username": "foo",
"ts": 1625572213873,
"sockport": 1883,
"proto_ver": 4,
"proto_name": "MQTT",
"keepalive": 60,
"ipaddress": "127.0.0.1",
"expiry_interval": 0,
"connected_at": 1625572213873,
"connack": 0,
"clientid": "emqtt-8348fe27a87976ad4db3",
"clean_start": true
}
disconnected 事件消息的 Payload 解析成 JSON 格式如下:
{
"username": "foo",
"ts": 1625572213873,
"sockport": 1883,
"reason": "tcp_closed",
"proto_ver": 4,
"proto_name": "MQTT",
"ipaddress": "127.0.0.1",
"disconnected_at": 1625572213873,
"clientid": "emqtt-8348fe27a87976ad4db3"
}
LWT遗嘱机制和Emqx SYS主题通知效果对比:
机制\事件 | connect | disconnect | 异常disconnect |
---|---|---|---|
$SYS/brokers/${node}/clients/${clientId}/connected | ✔ | ||
$SYS/brokers/${node}/clients/${clientId}/disconnected | ✔ | ✔ | |
LWT | ✔ |
通过实际测试对比发现,LWT仅支持异常disconnect的通知,而emqx的disconnected系统主题可以捕捉到所有离线通知(主动diconnect、异常disconnect),即emqx的系统主题即可完整支持客户端上线、下线监听的需求。可以根据实际的需求选择适当的机制。
EMQ X Web Hook插件
EMQ X Broker官方提供了多种插件,其中emqx-web-hook(EMQ X 2.0版本为emq-web-hook)可将MQTT 消息桥接到用户所指定Web Server,其中包括设备上、下线状态,在Dashboard插件管理或终端emqx_ctl(2.0为emqttd_ctl) 启动emqx_web_hook,如有设备上下线,即可获得设备上、下状态数据 ,以下是通过emq-web-hook桥接Web Server上下数据示例:
POST / HTTP/1.1
content-type: application/json
content-length: 93
host: 127.0.0.1:8087
connection: keep-alive
{"action":"client_connected","client_id":"mqttjs_bc06a34f41","username":"admin","conn_ack":0}
下线消息:
POST / HTTP/1.1
content-type: application/json
content-length: 101
host: 127.0.0.1:8087
connection: keep-alive
{"action":"client_disconnected","client_id":"mqttjs_bc06a34f41","username":"admin","reason":"normal"}
EMQ X Enterprise - 直接存取数据库
以上两种方式,在 EMQ X 开源社区版已经支持,商业化版本EMQ X Enterprise可将 MQTT消息(订阅关系、设备在线状态、离线消息、保留消息)更高效存储到后端数据库(MySQL、PostgreSQL、Cassandra、Redis、MongoDB)、消息中间件(Kafka、RabbitMQ),用户可以通过直接查询数据库中相关的数据就得到设备上下线的状态信息。
MQTT客户端接入
使用 Go SDK 连接 MQTT Broker
Eclipse Paho MQTT Go Client 为 Eclipse Paho 项目下的 Go 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。
客户端依赖于 Google 的 proxy 和 websockets 软件包,通过以下命令完成安装:
go get github.com/eclipse/paho.mqtt.golang
MQTT Go 使用示例(emqx官网示例)
本示例包含 Go 语言的 Paho MQTT 连接 EMQX,并进行消息收发完整代码:
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"time"
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
sub(client)
publish(client)
client.Disconnect(250)
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("topic/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
func sub(client mqtt.Client) {