相关文章推荐
有情有义的开水瓶  ·  WPF ...·  6 月前    · 

引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面

由于没做过这一块,之前只搭建过mqtt服务器,经过一番波折之后,了解到传感器的数据传送原理是这样的:

首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来.

最坑的来了

由于转换器上默认mqtt是透传的,就是说收到什么内容就会推送什么内容,然后传感器是使用RS485输出,用的是MODBUS-RTU数据帧格式,也就是说项目中接收到的是字节流,但是spring-mqtt默认配置不支持接收字节流文件,所以导致控制台输出的数据一直是乱码.

下面是开启spring-mqtt支持接收字节流数据方式

1.引入maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2.inbound函数中开启bytes转换器

        // 设置转换器,接收bytes
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);

3.收到的消息转换为byte[ ]

        // 如果不设置转换器这里强转byte[]会报错
        byte[] payLoad = (byte[])message.getPayload();

4,拿到byte[]数组之后就好办啦,可以转换为16进制字符串

String s = HexUtils.bytes2HexString(bytes)

5.根据报文示例对16进制字符串进行解析 拿到想要的数据

* @param b 字节数组 * @return 16进制字符串 * @throws * @Title:bytes2HexString * @Description:字节数组转16进制字符串 public static String bytes2HexString(byte[] b) { StringBuffer result = new StringBuffer(); for (int i = 0; i < b.length; i++) { result.append(String.format("%02X", b[i])); result.append(' '); return result.toString().trim(); * @param src 16进制字符串 * @return 字节数组 * @throws * @Title:hexString2Bytes * @Description:16进制字符串转字节数组 public static byte[] hexString2Bytes(String src) { int l = src.length() / 2; byte[] ret = new byte[l]; for (int i = 0; i < l; i++) { ret[i] = Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue(); return ret; * 获取湿度 public static Double getHum(byte[]bytes) { String s = HexUtils.bytes2HexString(bytes); String substring1 = s.substring(9, 11); String substring2 = s.substring(12, 14); double humidity = (double) Integer.parseInt(substring1 + substring2, 16) / 10; return humidity; * 获取温度 public static Double getTem(byte[]bytes) { String s = HexUtils.bytes2HexString(bytes); String substring3 = s.substring(15, 17); String substring4 = s.substring(18, 20); double Temp = (double) Integer.parseInt(substring3 + substring4, 16) / 10; return Temp;

最后贴上两段关键代码 开启强转

@Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(host, clientId, mqttClientFactory());
        adapter.setCompletionTimeout(5000);
        // 设置转换器,接收bytes
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.addTopic(topic, 2);
        adapter.setOutputChannel(mqttInputChannel());
        //恢复间隔–控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
        adapter.setRecoveryInterval(10000);
        logger.info("中央客户端clientId:{},正在连接,EMQ地址:{},使用端口号:{}", clientId, host, "1884");
        return adapter;
@Bean
    //ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
            String qos = message.getHeaders().get("mqtt_receivedQos").toString();
            System.out.println("接收消息主题:" + topic);
            System.out.println("接收消息Qos:" + qos);
            Double hum = SensorCoverUtils.getHum(message.getPayload());
            Double tem = SensorCoverUtils.getTem(message.getPayload());
            System.out.println("温度是" + tem);
            System.out.println("湿度是" + hum);

很感谢某起大佬的耐心指点,又是远程,又是找资料和解读,虽然过程中很想放弃,但还是坚持了下来,在成长的道路上又迈进了坚实的一步,结果总是美好的.

引子:最近在做物联网这一块,需求是根据传感器推送过来的数据,动态实时展示到前端页面由于没做过这一块,之前只搭建过mqtt服务器,经过一番波折之后,了解到传感器的数据传送原理是这样的:首选传感器(目前用的是温湿度传感器测试)通电后,将收集到的数据推送到支持mqtt通讯的转换器上面,转换器链接自己的mqtt服务器,并且对默认订阅和推送主题进行写码之后,在自己的项目中链接上mqtt服务器,并且订阅该转换器默认推送主题,启动项目后,每隔一秒,转换器就会收到传感器推送的数据,然后通过主题推送到项目里面来.
MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。 首先需要引入spring-integration-mqt的包,这里只需要引入这一个包即可。 <dependency> <groupId>org.s...
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency& MQTT是一款针对机对机(M2M)通信的,非常轻量级的的消息订阅、发布协议。它适用于一些系统资源和网络带宽非常有限的情况下的远程连接。MQTT-Client提供一个ASL 2.0证书下的MQTT接口。在网络连接失败时,它能够自动地重新连接服务器并尝试恢复会话。应用程序能够使用阻塞API、基于Future的API和回调API,共三种接口形式。 在Maven中引用MQTT-Clien...
最近接收第三方发送的UDP数据包,通过wirshark抓包后使用其原始数据(16进制),在自己写的UDPclient/server测试成功,在生产环境中,出现了接收数据乱码的问题。 基于Spring Boot2.1.5.RELEASE,jdk 1.8开发 相关jar包: <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-.
mqtt整合 最近有一个业务,要求连接多个非集群不同的mqtt服务,于是乎写了一个可根据配置动态配置的工具。starter-integration-mqtt整合了spring-integration-mqtt,只需添加配置,并实现消息订阅接口即可。可以实现订阅多个mqtt。 github源码. wedSocketMessageHandler(e:MessageEvent){ let data = this.messageFilter(e.data); let message = JSON.parse(data); if(data==null){ return if(message.action=="edit"){ if(message.status== void write(int c) 写入单个字符 void write(char[] cbuf)写入单个字符数组 abstract void write(char[] cbuf,int off,int len) 写入字符数组的某一部分,off数组的开始索引,len写入的字符个数 void write(String str)写入字符串 void write(Stri