引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面

由于没做过这一块,之前只搭建过mqtt服务器,经过一番波折之后,了解到传感器的数据传送原理是这样的:

首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来

最坑的来了

由于转换器上默认mqtt是透传的,就是说收到什么内容就会推送什么内容,然后传感器是使用RS485输出,用的是MODBUS-RTU数据帧格式,也就是说项目中接收到的是 字节流 ,但是spring-mqtt默认配置不支持接收字节流文件,所以导致控制台输出的数据一直是乱码.

下面是开启spring-mqtt支持接收字节流数据方式

1.引入maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2.inbound函数中开启bytes转换器

// 设置转换器,接收bytes
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);

3.收到的消息转换为byte[ ]

// 如果不设置转换器这里强转byte[]会报错
byte[] payLoad = (byte[])message.getPayload();

4,拿到byte[]数组之后就好办啦,可以转换为16进制字符串

String s = HexUtils.bytes2HexString(bytes)

5.根据报文示例对16进制字符串进行解析 拿到想要的数据

最后贴上两段关键代码

@Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(host, clientId, mqttClientFactory());
        adapter.setCompletionTimeout(5000);
        // 设置转换器,接收bytes
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.addTopic(topic, 2);
        adapter.setOutputChannel(mqttInputChannel());
        //恢复间隔–控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
        adapter.setRecoveryInterval(10000);
        logger.info("中央客户端clientId:{},正在连接,EMQ地址:{},使用端口号:{}", clientId, host, "1884");
        return adapter;
@Bean
    //ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
            String qos = message.getHeaders().get("mqtt_receivedQos").toString();
            System.out.println("接收消息主题:" + topic);
            System.out.println("接收消息Qos:" + qos);
            Double hum = SensorCoverUtils.getHum(message.getPayload());
            Double tem = SensorCoverUtils.getTem(message.getPayload());
            System.out.println("温度是" + tem);
            System.out.println("湿度是" + hum);
     * @param b 字节数组
     * @return 16进制字符串
     * @throws
     * @Title:bytes2HexString
     * @Description:字节数组转16进制字符串
    public static String bytes2HexString(byte[] b) {
        StringBuffer result = new StringBuffer();
        for (int i = 0; i < b.length; i++) {
            result.append(String.format("%02X", b[i]));
            result.append(' ');
        return result.toString().trim();
     * @param src 16进制字符串
     * @return 字节数组
     * @throws
     * @Title:hexString2Bytes
     * @Description:16进制字符串转字节数组
    public static byte[] hexString2Bytes(String src) {
        int l = src.length() / 2;
        byte[] ret = new byte[l];
        for (int i = 0; i < l; i++) {
            ret[i] = Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
        return ret;
     * 获取湿度
    public static Double getHum(byte[]bytes) {
        String s = HexUtils.bytes2HexString(bytes);
        String substring1 = s.substring(9, 11);
        String substring2 = s.substring(12, 14);
        double humidity = (double) Integer.parseInt(substring1 + substring2, 16) / 10;
        return humidity;
     * 获取温度
    public static Double getTem(byte[]bytes) {
        String s = HexUtils.bytes2HexString(bytes);
        String substring3 = s.substring(15, 17);
        String substring4 = s.substring(18, 20);
        double Temp = (double) Integer.parseInt(substring3 + substring4, 16) / 10;
        return Temp;

很感谢某起大佬的耐心指点,又是远程,又是找资料和解读,虽然过程中很想放弃,但还是坚持了下来,在成长的道路上又迈进了坚实的一步,结果总是美好的.

引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面由于没做过这一块,之前只搭建过mqtt服务器,经过一番波折之后,了解到传感器的数据传送原理是这样的:首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来最
MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。 首先需要引入spring-integration-mqt的包,这里只需要引入这一个包即可。 <dependency> <groupId>org.s...
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency>
import cn.hutool.core.util.ArrayUtil; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factor.
网上说是因为 client ID 重复,最开始是不相信的,因为我测试只启动了一个客户端。但是却怎么都定位不到异常原因,用重新回到 client ID 重复的这个思路上来: 因为程序里同时作为订阅者和.