引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面
首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来
最坑的来了
由于转换器上默认mqtt是透传的,就是说收到什么内容就会推送什么内容,然后传感器是使用RS485输出,用的是MODBUS-RTU数据帧格式,也就是说项目中接收到的是
字节流
,但是spring-mqtt默认配置不支持接收字节流文件,所以导致控制台输出的数据一直是乱码.
下面是开启spring-mqtt支持接收字节流数据方式
1.引入maven
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.inbound函数中开启bytes转换器
// 设置转换器,接收bytes
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
3.收到的消息转换为byte[ ]
// 如果不设置转换器这里强转byte[]会报错
byte[] payLoad = (byte[])message.getPayload();
4,拿到byte[]数组之后就好办啦,可以转换为16进制字符串
String s = HexUtils.bytes2HexString(bytes)
5.根据报文示例对16进制字符串进行解析 拿到想要的数据
最后贴上两段关键代码
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(host, clientId, mqttClientFactory());
adapter.setCompletionTimeout(5000);
// 设置转换器,接收bytes
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.addTopic(topic, 2);
adapter.setOutputChannel(mqttInputChannel());
//恢复间隔–控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
adapter.setRecoveryInterval(10000);
logger.info("中央客户端clientId:{},正在连接,EMQ地址:{},使用端口号:{}", clientId, host, "1884");
return adapter;
@Bean
//ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String qos = message.getHeaders().get("mqtt_receivedQos").toString();
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + qos);
Double hum = SensorCoverUtils.getHum(message.getPayload());
Double tem = SensorCoverUtils.getTem(message.getPayload());
System.out.println("温度是" + tem);
System.out.println("湿度是" + hum);
* @param b 字节数组
* @return 16进制字符串
* @throws
* @Title:bytes2HexString
* @Description:字节数组转16进制字符串
public static String bytes2HexString(byte[] b) {
StringBuffer result = new StringBuffer();
for (int i = 0; i < b.length; i++) {
result.append(String.format("%02X", b[i]));
result.append(' ');
return result.toString().trim();
* @param src 16进制字符串
* @return 字节数组
* @throws
* @Title:hexString2Bytes
* @Description:16进制字符串转字节数组
public static byte[] hexString2Bytes(String src) {
int l = src.length() / 2;
byte[] ret = new byte[l];
for (int i = 0; i < l; i++) {
ret[i] = Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
return ret;
* 获取湿度
public static Double getHum(byte[]bytes) {
String s = HexUtils.bytes2HexString(bytes);
String substring1 = s.substring(9, 11);
String substring2 = s.substring(12, 14);
double humidity = (double) Integer.parseInt(substring1 + substring2, 16) / 10;
return humidity;
* 获取温度
public static Double getTem(byte[]bytes) {
String s = HexUtils.bytes2HexString(bytes);
String substring3 = s.substring(15, 17);
String substring4 = s.substring(18, 20);
double Temp = (double) Integer.parseInt(substring3 + substring4, 16) / 10;
return Temp;
很感谢某起大佬的耐心指点,又是远程,又是找资料和解读,虽然过程中很想放弃,但还是坚持了下来,在成长的道路上又迈进了坚实的一步,结果总是美好的.
引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面由于没做过这一块,之前只搭建过mqtt服务器,经过一番波折之后,了解到传感器的数据传送原理是这样的:首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来最
MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。
首先需要引入spring-integration-mqt的包,这里只需要引入这一个包即可。
<dependency>
<groupId>org.s...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
import cn.hutool.core.util.ArrayUtil;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factor.
网上说是因为 client ID 重复,最开始是不相信的,因为我测试只启动了一个客户端。但是却怎么都定位不到异常原因,用重新回到 client ID 重复的这个思路上来:
因为程序里同时作为订阅者和.