引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面
由于没做过这一块,之前只搭建过mqtt服务器,经过一番波折之后,了解到传感器的数据传送原理是这样的:
首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来.
最坑的来了
由于转换器上默认mqtt是透传的,就是说收到什么内容就会推送什么内容,然后传感器是使用RS485输出,用的是MODBUS-RTU数据帧格式,也就是说项目中接收到的是字节流,但是spring-mqtt默认配置不支持接收字节流文件,所以导致控制台输出的数据一直是乱码.
下面是开启spring-mqtt支持接收字节流数据方式
1.引入maven
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.inbound函数中开启bytes转换器
// 设置转换器,接收bytes
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
3.收到的消息转换为byte[ ]
// 如果不设置转换器这里强转byte[]会报错
byte[] payLoad = (byte[])message.getPayload();
4,拿到byte[]数组之后就好办啦,可以转换为16进制字符串
String s = HexUtils.bytes2HexString(bytes)
5.根据报文示例对16进制字符串进行解析 拿到想要的数据
* @param b 字节数组
* @return 16进制字符串
* @throws
* @Title:bytes2HexString
* @Description:字节数组转16进制字符串
public static String bytes2HexString(byte[] b) {
StringBuffer result = new StringBuffer();
for (int i = 0; i < b.length; i++) {
result.append(String.format("%02X", b[i]));
result.append(' ');
return result.toString().trim();
* @param src 16进制字符串
* @return 字节数组
* @throws
* @Title:hexString2Bytes
* @Description:16进制字符串转字节数组
public static byte[] hexString2Bytes(String src) {
int l = src.length() / 2;
byte[] ret = new byte[l];
for (int i = 0; i < l; i++) {
ret[i] = Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
return ret;
* 获取湿度
public static Double getHum(byte[]bytes) {
String s = HexUtils.bytes2HexString(bytes);
String substring1 = s.substring(9, 11);
String substring2 = s.substring(12, 14);
double humidity = (double) Integer.parseInt(substring1 + substring2, 16) / 10;
return humidity;
* 获取温度
public static Double getTem(byte[]bytes) {
String s = HexUtils.bytes2HexString(bytes);
String substring3 = s.substring(15, 17);
String substring4 = s.substring(18, 20);
double Temp = (double) Integer.parseInt(substring3 + substring4, 16) / 10;
return Temp;
最后贴上两段关键代码 开启强转
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(host, clientId, mqttClientFactory());
adapter.setCompletionTimeout(5000);
// 设置转换器,接收bytes
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.addTopic(topic, 2);
adapter.setOutputChannel(mqttInputChannel());
//恢复间隔–控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
adapter.setRecoveryInterval(10000);
logger.info("中央客户端clientId:{},正在连接,EMQ地址:{},使用端口号:{}", clientId, host, "1884");
return adapter;
@Bean
//ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String qos = message.getHeaders().get("mqtt_receivedQos").toString();
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + qos);
Double hum = SensorCoverUtils.getHum(message.getPayload());
Double tem = SensorCoverUtils.getTem(message.getPayload());
System.out.println("温度是" + tem);
System.out.println("湿度是" + hum);
很感谢某起大佬的耐心指点,又是远程,又是找资料和解读,虽然过程中很想放弃,但还是坚持了下来,在成长的道路上又迈进了坚实的一步,结果总是美好的.
引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面由于没做过这一块,之前只搭建过mqtt服务器,经过一番波折之后,了解到传感器的数据传送原理是这样的:首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来.
MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。
首先需要引入spring-integration-mqt的包,这里只需要引入这一个包即可。
<dependency>
<groupId>org.s...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency&
MQTT是一款针对机对机(M2M)通信的,非常轻量级的的消息订阅、发布协议。它适用于一些系统资源和网络带宽非常有限的情况下的远程连接。MQTT-Client提供一个ASL 2.0证书下的MQTT接口。在网络连接失败时,它能够自动地重新连接服务器并尝试恢复会话。应用程序能够使用阻塞API、基于Future的API和回调API,共三种接口形式。
在Maven中引用MQTT-Clien...
最近接收第三方发送的UDP数据包,通过wirshark抓包后使用其原始数据(16进制),在自己写的UDPclient/server测试成功,在生产环境中,出现了接收数据乱码的问题。
基于Spring Boot2.1.5.RELEASE,jdk 1.8开发
相关jar包:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-.
mqtt整合
最近有一个业务,要求连接多个非集群不同的mqtt服务,于是乎写了一个可根据配置动态配置的工具。starter-integration-mqtt整合了spring-integration-mqtt,只需添加配置,并实现消息订阅接口即可。可以实现订阅多个mqtt。
github源码.
wedSocketMessageHandler(e:MessageEvent){
let data = this.messageFilter(e.data);
let message = JSON.parse(data);
if(data==null){
return
if(message.action=="edit"){
if(message.status==
void write(int c) 写入单个字符
void write(char[] cbuf)写入单个字符数组
abstract void write(char[] cbuf,int off,int len) 写入字符数组的某一部分,off数组的开始索引,len写入的字符个数
void write(String str)写入字符串
void write(Stri