相关文章推荐
飞翔的雪糕  ·  DataIntegrityViolation ...·  1 年前    · 
粗眉毛的牛排  ·  wordpress - After ...·  1 年前    · 
冷静的帽子  ·  Notify Update·  1 年前    · 

最近在做IM(及时通信)相关的应用,长连接(消息收发)采用的Mqtt5,mqtt支持会话保存(cleanStart)、会话超期(sessionExpire)、Qos等设置,可以实现移动端弱网络(或网络断开后再重连)的离线消息接收。
之所以选择Mqtt协议,因为mqtt是一种极其轻量级的发布/订阅消息传输协议(专为受限设备和低带宽、高延迟或不可靠的网络而设计),且代码体积小、功耗低,适合移动设备、车机等终端,且需要支持手机、车机等在网络信号不稳定(弱网、断网、进隧道没有网络等)且之后再恢复网络时,可以继续收发消息、且可以收到之前离线时消息的补充推送。关于离线消息的补充推送亦可由IM服务端自己控制,但若Mqtt协议原生支持离线推送,岂不是省的开发者再去自己处理。同时秉承着用新不用旧的观点,果断选用Mqtt5而弃用Mqtt3,Mqtt5相较于Mqtt3有了很多升级,如:原因代码(PUBACK / PUBREC)、共享订阅、会话过期、请求/响应模式(ResponseTopic, CorrelationData)、Will Delay等。

关于Mqtt的服务端、客户端选型可参考如下链接:
Mqtt官网
Mqtt中文网
Mqtt Server端
Mqtt Client端

实际开发过程中,Server端选用的Emq,Client端选用的HiveMq,二者均支持Mqtt5。

Mqtt5支持离线消息接收的几个核心设置:
ClientId
CleanStart: false
SessionExpiry
Qos:2

ClientId 用于唯一标识用户session。
CleanStart 设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。CleanStart设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
SessionExpiry 即指定在CleanStart为0时,会话的保存时长,如果客户端未在用户定义的时间段内连接,则可以丢弃状态(例如,订阅和缓冲的消息)而无需进行清理。
Qos 即消息的Quality of Service,若要支持离线消息,需要订阅端、发布端Qos >= 1

如ClientId=1, CleanStart=false, SessionExpiry=3600s, Qos=2即指定clientId=1的会话为持久会话,用户在离线后3600s的的离线消息都会被Mqtt服务器保存,用户在离线时间不超过3600s且再次以ClientId=1重新上线时,是可以收到离线期间消息的补充推送的,同时Qos=2(exactly once)保证消息只会被客户端收到一次且一定一次。

以HiveMq客户端代码为例:

* 构建mqtt客户端连接 * @param clientId 客户端ID * @return public Mqtt5BlockingClient buildMqtt5Client ( String clientId ) { /** blocking客户端 */ Mqtt5BlockingClient client = Mqtt5Client . builder ( ) . identifier ( clientId ) . serverHost ( mqttConfig . getInternalHost ( ) ) . serverPort ( mqttConfig . getPort ( ) ) //自动重连(指数级延迟重连(起始延迟1s,之后每次2倍,到2分钟封顶) delay : 1s-> 2s -> 4s -> ... -> 2min) . automaticReconnectWithDefaultConfig ( ) . buildBlocking ( ) ; /** Emqx JWT认证 */ String authJwt = jwtManager . generateMqttAuthJwt ( clientId ) ; Mqtt5SimpleAuth auth = Mqtt5SimpleAuth . builder ( ) . password ( authJwt . getBytes ( ) ) . build ( ) ; logger . info ( "连接mqtt参数:internalHost={}, port={}, clientId={}, authPassword={}" , mqttConfig . getInternalHost ( ) , mqttConfig . getPort ( ) , clientId , authJwt ) ; Mqtt5ConnAck connAck = null ; try { connAck = client . connectWith ( ) . simpleAuth ( auth ) /** cleanSession=false */ . cleanStart ( false ) /** session 7天过期 */ . sessionExpiryInterval ( MqttConsts . SESSION_EXPIRATION ) . send ( ) ; logger . info ( "连接mqtt结果:reasonCode={}, reasonStr={}, reasonInfo={}" , connAck . getReasonCode ( ) , connAck . getReasonString ( ) , connAck . getResponseInformation ( ) ) ; } catch ( Mqtt5ConnAckException e ) { logger . warn ( "连接mqtt - ack异常 - " . concat ( e . getMessage ( ) ) ) ; connAck = e . getMqttMessage ( ) ; } catch ( ConnectionFailedException cfe ) { logger . warn ( "连接mqtt - fail异常 - " . concat ( cfe . getMessage ( ) ) ) ; throw new MqttException ( "连接mqtt - fail异常 - " . concat ( cfe . getMessage ( ) ) ) ; } catch ( Exception e ) { logger . warn ( "连接mqtt - 系统异常!" , e ) ; throw new MqttException ( "连接mqtt - 系统异常" ) ; if ( connAck . getReasonCode ( ) . isError ( ) ) { throw new MqttException ( "Mqtt5连接失败 - " . concat ( connAck . getReasonCode ( ) . toString ( ) ) ) ; return client ; * 订阅主题、处理消息 * @param topic 主题 * @param callback 消息处理回调 public void subscribeWith ( String topic , Consumer < Mqtt5Publish > callback ) { //订阅主题 asyncClient . subscribeWith ( ) . topicFilter ( topic ) //Qos: 2(EXACTLY_ONCE) . qos ( MqttQos . EXACTLY_ONCE ) //消费主题消息(异步) . callback ( callback ) . send ( ) ;

以上的几个核心设置:
clientId,
cleanStart=fasle,
sessionExpiry > 0,
Qos>=1,
缺一不可,少一项设置便无法实现离线消息的接受。

MQTT v5.0添加了以下特性会话过期把清理会话标志拆分成新开始标志(指示会话应该在不使用现有会话的情况下开始)和会话过期间隔标志(指示连接断开之后会话保留的时间)。会话过期间隔时间可以在断开时修改... 来自: hui6075的博客 概述微消息队列MQ for IoT在处理离线消息时,为了简化离线消息获取机制,微消息队列系统在客户端成功建立连接并通过权限校验后,会自动加载离线消息并下发到客户端,但是实际在使用过程中会出现消费端启动... 来自: weixin_34342207的博客 学习mqtt协议和emqttd开源项目http://emqtt.com/emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/11131、-module(emqt... 来自: $firecat的代码足迹$ public class MQTTPublish implements MqttCallback { //public static final String HOST = "tcp://10.0.0.250:1884"; public static final String HOST = "tcp://192.168.67.130:61613"; public static final String TOPIC = "MQTTtest"; private static final String clientid ="publisher"; private static final String str = "ad钙奶"; private static MqttClient client; private MqttTopic topic; private String userName = "admin"; private String passWord = "password"; private MqttMessage message; public MQTTPublish() throws MqttException { client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); try { client.setCallback(this); client.connect(options); topic = client.getTopic(TOPIC); } catch (Exception e) { e.printStackTrace(); public void publish(MqttMessage message) throws MqttPersistenceException, MqttException{ MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("Token is complete:" + token.isComplete()); public static void main(String[] args) throws MqttException { MQTTPublish mqttpub = new MQTTPublish(); mqttpub.message = new MqttMessage(); mqttpub.message.setQos(2); mqttpub.message.setRetained(true); mqttpub.message.setPayload(str.getBytes()); mqttpub.publish(mqttpub.message); System.out.println("Ratained state:" + mqttpub.message.isRetained()); client.disconnect(); System.out.println("Disconnected"); System.exit(0); @Override public void connectionLost(Throwable arg0) { // TODO Auto-generated method stub public void deliveryComplete(IMqttDeliveryToken token) { try { System.out.println("deliveryComplete---------"+ token.isComplete()); } catch (Exception e) { e.printStackTrace(); @Override public void messageArrived(String arg0, MqttMessage arg1) throws Exception { // TODO Auto-generated method stub public class MQTTSubscribe implements MqttCallback { //public static final String HOST = "tcp://10.0.0.250:1884"; public static final String HOST = "tcp://192.168.67.130:61613"; public static final String TOPIC = "MQTTtest"; private static final String clientid = "subscriber"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin"; private String passWord = "password"; private ScheduledExecutorService scheduler; public void startReconnect() { scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { if (!client.isConnected()) { try { client.connect(options); } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS); private void start() { try { client = new MqttClient(HOST, clientid, new MemoryPersistence()); options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); client.setCallback(this); // MqttTopic topic = client.getTopic(TOPIC); // options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); int[] Qos = {2}; String[] topic1 = {TOPIC}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); public static void main(String[] args) throws MqttException { MQTTSubscribe client = new MQTTSubscribe(); client.start(); public void connectionLost(Throwable cause) { System.out.println("Connection lost, reconnect please!"); @Override public void deliveryComplete(IMqttDeliveryToken token) { try { System.out.println("deliveryComplete---------"+ token.isComplete()); } catch (Exception e) { e.printStackTrace(); @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Message arrived on topic:"+topic); System.out.println("Message arrived on QoS:"+message.getQos()); System.out.println("Message arrived on content:"+new String(message.getPayload())); MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法:mqttClient.registerSimpleHand... 来自: weixin_34124651的博客 QoS、Retain、Clean Session一段简单说明QoSQoS:Pub=0,Sub=0功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一... 来自: zhuo_lee_new的博客 ESP8266 家庭自动化项目中文版目录​​​​​​​现在让我们使用 mosquitto_pub 发布消息并在 ESP8266 中接收它。为此,ESP8266 需要订阅 mosquitto_pub 将... 来自: countofdane的博客 Retained 消息Retained 消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息,当有一个新的订阅者订... 来自: solo_jm的博客 点击上面↑「爱开发」关注我们每晚10点,捕获技术思考和创业资源洞察什么是ThreadLocalThreadLocal是一个本地线程副本变量工具类,各个线程都拥有一份线程私...... 来自: 爱开发 由于我之前一直强调数据结构以及算法学习的重要性,所以就有一些读者经常问我,数据结构与算法应该要学习到哪个程度呢?,说实话,这个问题我不知道要怎么回答你,主要取决于你想学习到哪些程度,不过针对这个问题,... 来自: 帅地 你知道的越多,你不知道的越多 点赞再看,养成习惯 GitHub上已经开源 https://github.com/JavaFamily 有一线大厂面试点脑图、个人联系方式和人才交流群,欢迎Star和... 来自: 敖丙 作者 |胡书敏责编 | 刘静出品 | CSDN(ID:CSDNnews)本人目前在一家知名外企担任架构师,而且最近八年来,在多家外企和互联网公司担任Java技术面试官,前后累计面试了有两三百位候选人。... 来自: CSDN资讯 我清晰的记得,刚买的macbook pro回到家,开机后第一件事情,就是上了淘宝网,花了500元钱,找了一个上门维修电脑的师傅,上门给我装了一个windows系统。。。。。。表砍我。。。当时买mac的... 来自: qq_45036710的博客 二哥,你好,我想知道一般程序猿都如何接私活,我也想接,能告诉我一些方法吗?上面是一个读者“烦不烦”问我的一个问题。其实不止是“烦不烦”,还有很多读者问过我类似这样的问题。我接的私活不算多,挣到的钱也没... 来自: 沉默王二 Java编程规约命名风格命名风格类名使用UpperCamelCase风格方法名,参数名,成员变量,局部变量都统一使用lowerCamelcase风格常量命名全部大写,单词间用下划线隔开, 力求语义表达... 来自: Chova的博客 小编是一个理科生,不善长说一些废话。简单介绍下原理然后直接上代码。使用的工具(Python+pycharm2019.3+selenium+xpath+chromedriver)其中要使用pycharm... 来自: qq_43764365的博客 前奏:今天2B哥和大家分享一位前几天面试的一位应聘者,工作4年26岁,统招本科。以下就是他的简历和面试情况。基本情况:专业技能:1、 熟悉Sping了解SpringMVC、SpringBoo... 来自: HarderXin的专栏 爬虫,从本质上来说,就是利用程序在网上拿到对我们有价值的数据。爬虫能做很多事,能做商业分析,也能做生活助手,比如:分析北京近两年二手房成交均价是多少?广州的Python工程师平均薪资是多少?北京哪家餐... 来自: LoraRae的博客 每周每日,分享Python实战代码,入门资料,进阶资料,基础语法,爬虫,数据分析,web网站,机器学习,深度学习等等。公众号回复【进群】沟通交流吧,QQ扫码进群学习吧微信群 QQ群 1.画圣诞树imp... 来自: Python家庭的博客 CPU对每个程序员来说,是个既熟悉又陌生的东西?如果你只知道CPU是中央处理器的话,那可能对你并没有什么用,那么作为程序员的我们,必须要搞懂的就是CPU这家伙是如何运行的,尤其要搞懂它里面的寄存器是怎... 来自: 编码之外的技术博客 一、背景二、爬取数据三、数据分析1、总人口2、男女人口比例3、人口城镇化4、人口增长率5、人口老化(抚养比)6、各省人口7、世界人口四、遇到的问题遇到的问题1、数据分页,需要获取从1949-2018年... 来自: 猪哥 第零关进入传送门开始第0关(游戏链接)请点击链接进入第1关: 连接在左边→ ←连接在右边看不到啊。。。。(只能看到一堆大佬做完的留名,也能看到菜鸡的我,在后面~~)直接fn+f12吧<span&... 来自: 永恒之蓝的博客 相信大家都已经收到国务院延长春节假期的消息,接下来,在家远程办公可能将会持续一段时间。但是问题来了。远程办公不是人在电脑前就当坐班了,相反,对于沟通效率,文件协作,以及信息安全都有着极高的要求。有着非... 来自: CSDN资讯 截止目前,我已经分享了如下几篇文章:一个程序在计算机中是如何运行的?超级干货!!!作为一个程序员,CPU的这些硬核知识你必须会!作为一个程序员,内存的这些硬核知识你必须懂!这些知识可以说是我们之前都不... 来自: 编码之外的技术博客 所有群全部吵翻天,朋友圈全部沦陷,公众号疯狂转发。这两周没怎么发原创,只发新闻,可能有人注意到了。我不是懒,是文章写了却没发,因为大家的关注力始终在这次的疫情上面,发了也没人看。当然,我...... 来自: 德国IT那些事 偶然间,在知乎上看到一个问题一时间,勾起了我深深的回忆。以前在厂里打过两次工,做过家教,干过辅导班,做过中介。零下几度的晚上,贴过广告,满脸、满手地长冻疮。再回首那段岁月,虽然苦,但让我学会了坚持和忍... 来自: 启舰 MyBatis 是一款优秀的持久层框架,它支持定制化 SQL、存储过程以及高级映射。MyBatis原本是apache的一个开源项目iBatis, 2010年该项目由apache software fo... 1.Javascript 语法.用途 javascript 在前端网页中占有非常重要的地位,可以用于验证表单,制作特效等功能,它是一种描述语言,也是一种基于对象(Object)和事件驱动并具有安全性... 今天,群里白垩老师问如何用python画武汉肺炎疫情地图。白垩老师是研究海洋生态与地球生物的学者,国家重点实验室成员,于不惑之年学习python,实为我等学习楷模。先前我并没有关注武汉肺炎的具体数据,... 灰鸽子( Huigezi),原本该软件适用于公司和家庭管理,其功能十分强大,不但能监视摄像头、键盘记录、监控桌面、文件操作等。还提供了黑客专用功能,如:伪装系统图标、随意更换启动项名称和表... 目前每天各大平台,如腾讯、今日头条都会更新疫情每日数据,他们的数据源都是一样的,主要都是通过各地的卫健委官网通报。 以全国、湖北和上海为例,分别为以下三个网站: 国家卫健委官网:http://w... 哇说起B站,在小九眼里就是宝藏般的存在,放年假宅在家时一天刷6、7个小时不在话下,更别提今年的跨年晚会,我简直是跪着看完的!! 最早大家聚在在B站是为了追番,再后来我在上面刷欧美新歌和漂亮小姐姐的舞蹈... Web播放器解决了在手机浏览器和PC浏览器上播放音视频数据的问题,让视音频内容可以不依赖用户安装App,就能进行播放以及在社交平台进行传播。在视频业务大数据平台中,播放数据的统计分析非常重要,所以We... 本文知识点较多,篇幅较长,请耐心学习 MySQL已经成为时下关系型数据库产品的中坚力量,备受互联网大厂的青睐,出门面试想进BAT,想拿高工资,不会点MySQL优化知识,拿offer的成功率会大大下降... 我本人因为高中沉迷于爱情,导致学业荒废,后来高考,毫无疑问进入了一所普普通通的大学,实在惭愧???? 我又是那么好强,现在学历不行,没办法改变的事情了,所以,进入大学开始,我就下定决心,一定要让自己掌... 1.Matlab实现粒子群算法的程序代码:https://www.cnblogs.com/kexinxin/p/9858664.html matlab代码求解函数最优值:https://blog.cs... 很多人知道爬虫,也很想利用爬虫去爬取自己想要的数据,那么爬虫到底怎么用呢?今天就教大家编写一个简单的爬虫。 下面以爬取笔者的个人博客网站为例获取第一篇文章的标题名称,教大家学会一个简单的爬虫。 第一步... 作者:隋顺意 一段时间前,自己制作了一个库 “sui-math”。这其实是math的翻版。做完后,python既然可以轻易的完成任何的数学计算,何不用python开发一个小程序专门用以计算呢? 现在我... 本篇博客大部分内容摘自埃里克·马瑟斯所著的《Python编程:从入门到实战》(入门类书籍),采用举例的方式进行知识点提要 关于Python学习书籍推荐文章 《学习Python必备的8本书》 Pytho... 明天就是情人节了。这个情人节,注定是一个不能约会的情人节,但不能约会不代表不能浪漫。古人比我们出生早,那些浪漫的诗词早都被他们挖掘一空,比诗词我们肯定没有机会了。好在我们还有Python,不然都不知道... 计算机考研指导建议背景开始备考时间学校选择复习计划学科复习考研资料和平台心得杂杂答疑     我是广东双非本科计算机类专业,大一高数没学好,英语在大四最后一次考试里过了6级,专业课掌握情况尚好。... 国内的普通开发者对于掌握一门新的技术不知道从哪里下手,看哪些书。为了获得相关知识会关注各种公众号、购买各种视频课程来学习,但由于这些内容本身有碎片化的特点,效果往往不太理想。以至于付出了大量的时间到最... luo15242208310: [reply]Venry_[/reply] 可以通过getMP3Image获取图片字节数组byte[],如果你想在WEB页面展示,可以直接通过流将byte[]写出去就行了啊

jaudiotagger获取MP3... Venry_: 只是想显示封面,为什么要保存该图片,不可能读一首歌就保存一张图片吧?