• 添加依赖
  • 添加配置
  • 工具类说明
  • 订阅消息
  • 发送消息

    mqtt整合
    最近有一个业务,要求连接多个非集群不同的mqtt服务,于是乎写了一个可根据配置动态配置的工具。starter-integration-mqtt整合了spring-integration-mqtt,只需添加配置,并实现消息订阅接口即可。可以实现订阅多个mqtt。
    github源码:https://github.com/aLiang-xyl/integration-mqtt

    添加依赖

    <dependency>
        <groupId>cn.xyliang</groupId>
    	<artifactId>integration-mqtt-starter</artifactId>
    	<version>0.0.2</version>
    </dependency>

    添加配置

  • mqtt:
      config: 
        channel1:                                          #通道名称,可自定义,订阅消息时需要该名称
          url: [tcp://host1:1883, tcp://host1:1883]        #mqtt的url
          topics: [topic1, topic2]                         #监听的主题,和qos一一对应
          qos: [1, 0]                                      #监听主题的qos,和主题一一对应
          username: admin                                  #用户名
          password: public                                 #密码
          timeout: 60                                      #连接超时时间,单位:秒
          kep-alive-interval: 60                           #心跳时间,单位:秒
          async: true                                      #发送消息时是否异步发送
          client-id-append-ip: true                        #是否在clientId后面追加本机ip,因为clientid是唯一值,集群环境下不能使用相同的clientid,追加ip可解决该问题
          consumer-client-id: consumer_client_test1        #consumer client id配置
          producer-client-id: producer_client_test1        #producer client id配置
          consumer-will:                                   #consumer遗嘱消息配置
            qos: 1                                         #遗嘱qos
            topic: will_topic                              #遗嘱主题
            payload: '{"id": "consumer_client_test1"}'     #遗嘱内容
            retained: false                                #是否发送保留消息
          producer-will:                                   #producer遗嘱消息配置
            qos: 1                                         #遗嘱qos
            topic: will_topic                              #遗嘱主题
            payload: '{"id": "producer_client_test1"}'     #遗嘱内容
            retained: false                                #是否发送保留消息
        channel2:                                          #通道名称,第二个配置
          url: [tcp://host1:1883, tcp://host1:1883]
          topics: [topic1, topic2]
          qos: [1, 0]
          username: admin
          password: public
          timeout: 60
          kep-alive-interval: 60
          async: true
          consumer-client-id: consumer_client_test2
          producer-client-id: producer_client_test2
          consumer-will: 
            qos: 1
            topic: will_topic
            payload: '{"id": "consumer_client_test2"}'
            retained: false
          producer-will: 
            qos: 1
            topic: will_topic
            payload: '{"id": "producer_client_test2"}'
            retained: false
    

    工具类说明

    MqttUtils工具类,用来发送mqtt消息

  • package cn.xyliang.mqtt.utils;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.Message;
    import lombok.extern.log4j.Log4j2;
     * 描述: mqtt工具类,可以根据通道名称发送消息
     * @author xingyl
     * @date 2020-04-01 10:16:35
    @Log4j2
    public class MqttUtils {
    	 * qos 0
    	public static final int QOS_0 = 0;
    	 * qos 1
    	public static final int QOS_1 = 1;
    	 * qos 2
    	public static final int QOS_2 = 2;
    	private final static Map<String, MqttPahoMessageHandler> HANDLER_MAP = new HashMap<>(16);
    	public final static String CHANNEL_NAME_SUFFIX = "MqttPahoMessageHandler";
    	 * 存放handler
    	 * @param channelName
    	 * @param handler
    	public static void put(String channelName, MqttPahoMessageHandler handler) {
    		HANDLER_MAP.put(channelName + CHANNEL_NAME_SUFFIX, handler);
    	 * 发送消息
    	 * @param topic       要发送的主题
    	 * @param message     消息内容
    	 * @param qos         qos级别
    	 * @param channelName 发送到指定的通道
    	public static void sendMessage(String topic, String message, int qos, String channelName) {
    		MqttPahoMessageHandler handler = getHandler(channelName);
    		Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
    				.setHeader(MqttHeaders.QOS, qos).build();
    		handler.handleMessage(mqttMessage);
    	 * 发送消息,默认qos级别为1
    	 * @param topic       要发送的主题
    	 * @param message     消息内容
    	 * @param channelName 发送到指定的通道
    	public static void sendMessage(String topic, String message, String channelName) {
    		MqttPahoMessageHandler handler = getHandler(channelName);
    		Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
    				.setHeader(MqttHeaders.QOS, QOS_1).build();
    		handler.handleMessage(mqttMessage);
    	 * 发送消息
    	 * @param mqttMessage 消息
    	 * @param channelName 发送到指定的通道
    	public static void sendMessage(Message<String> mqttMessage, String channelName) {
    		MqttPahoMessageHandler handler = getHandler(channelName);
    		handler.handleMessage(mqttMessage);
    	 * 如果只有一个通道将使用该通道发送消息
    	 * @param topic
    	 * @param message
    	 * @param qos
    	public static void sendMessage(String topic, String message, int qos) {
    		MqttPahoMessageHandler handler = getDefaultHeadler();
    		Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
    				.setHeader(MqttHeaders.QOS, qos).build();
    		handler.handleMessage(mqttMessage);
    	 * 如果只有一个通道将使用该通道发送消息,默认qos级别为1
    	 * @param topic
    	 * @param message
    	public static void sendMessage(String topic, String message) {
    		MqttPahoMessageHandler handler = getDefaultHeadler();
    		Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
    				.setHeader(MqttHeaders.QOS, QOS_1).build();
    		handler.handleMessage(mqttMessage);
    	 * 如果只有一个通道将使用该通道发送消息,默认qos级别为1
    	 * @param mqttMessage 消息信息
    	public static void sendMessage(Message<String> mqttMessage) {
    		MqttPahoMessageHandler handler = getDefaultHeadler();
    		handler.handleMessage(mqttMessage);
    	 * 获取默认的handler
    	 * @return
    	private static MqttPahoMessageHandler getDefaultHeadler() {
    		Collection<MqttPahoMessageHandler> values = HANDLER_MAP.values();
    		Iterator<MqttPahoMessageHandler> iterator = values.iterator();
    		MqttPahoMessageHandler handler = iterator.next();
    		if (handler == null) {
    			log.error("发送消息失败,无可用的headler");
    			throw new RuntimeException("发送消息失败,无可用的headler");
    		return handler;
    	 * 根据通道获取handler
    	 * @param channelName
    	 * @return
    	private static MqttPahoMessageHandler getHandler(String channelName) {
    		MqttPahoMessageHandler handler = HANDLER_MAP.get(channelName + CHANNEL_NAME_SUFFIX);
    		if (handler == null) {
    			log.error("未查询到相应通道{}的handler,存在的通道名称{}", channelName, HANDLER_MAP.keySet());
    			throw new IllegalArgumentException("未查询到相应通道" + channelName + "的handler");
    		return handler;
    

    订阅消息

    订阅消息需要实现MessageHandler 接口
    注意: @ServiceActivator(inputChannel = "channel1") 中的配置必须和yml配置文件里的channel保持一致,一个channel配置对应一个MessageHandler接口实现。

  • import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.messaging.MessagingException;
    import org.springframework.stereotype.Component;
    import lombok.extern.log4j.Log4j2;
     * <p>描述:配置channel1消息处理 </p>
     * @author xingyl
     * @date 2020年3月27日 下午6:33:35
    @Log4j2
    @Component
    public class MqttMessageHandler implements MessageHandler {
    	@ServiceActivator(inputChannel = "channel1")
    	@Override
    	public void handleMessage(Message<?> message) throws MessagingException {
    		log.info("收到消息---{}", message);
    

    配置中每一个channel对应一个MessageHandler实现

    发送消息

    MqttUtils工具类中封装了多个发送消息的方法

    github源码:https://github.com/aLiang-xyl/integration-mqtt

    ————————————————
    版权声明:本文为CSDN博主「阿良~」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_20280007/article/details/105250986

springboot自动化配置mqtt,整合spring-integration-mqtt,连接多个mqttmqtt整合 添加依赖 添加配置 工具类说明 订阅消息 发送消息 mqtt整合 最近有一个业务,要求连接多个非集群不同的mqtt服务,于是乎写了一个可根据配置动态配置的工具。starter-integration-mqtt整合了spring-integration-mqtt,只需添加配置,并实现消息订阅接口即可。可以实现订阅多个mqtt。 github源码.
springboot +idea+ java + mqtt 实现 订阅 订阅 消息 针对业务需要和硬件对接,使用 mqtt 处理 硬件的数据 java 实现 订阅 订阅 消息,以及 处理 硬件的数据 是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据实际情况自行定义实体类以及数据库
物联网 项目的 MQTT 订阅 应用示例 一组简单的应用程序,可以从 MQTT 代理 订阅 主题 。 这些应用程序以多种编程 语言 编写,以提供适用于不同平台的示例。 js- mqtt -websockets-demo 一个简单的Web应用程序,可以使用websocket从 MQTT 代理 订阅 MQTT 主题 。 提供一个简单的用户界面,以输入主机名/ IP地址,端口号和 MQTT 主题
SpringBoot 整合 MQTT 发布和 订阅 消息 处理 上一篇内容讲述了单个 mqtt 客户端 连接 一个指定的 mqtt 服务器,一般来说是够用的,但最近一个项目需求是要接收 多个 mqtt 服务器的数据,如果还按上一篇文章的 处理 方法,就只能讲代码类复制 多个 这样实现,但这样做的后果就是代码不美观不便于维护,于是就整理了这篇文章, springboot 用同一套代码实现 多个 mqtt 服务器的 连接 和管理: 大致结构,三个类文件即可实现 需要在mysql创建表(表结构如下): 硬件采集的数据传入EMQX平台(采用 MQTT 协议), java 通过代码 连接 MQTT 服务器,进行采集数据接收、解析、业务 处理 、存储入库、数据展示。 MQTT 是基于发布(Publish)/ 订阅 (Subscribe)模式来进行通信及数据交换的。 二、本文只讲解 java 连接 MQTT 服务器进行数据 处理
最近嘛,遇到了客户使用 MQTT 发布消息。而我们则需把这些消息获取并解析进我们自己的系统中,并且对这些数据进行存储,这些就需要我们熟悉 MQTT 的使用,话不多说,搞起。 因我们项目使用的是 SpringBoot ,因此在pom中添加 MQTT 的相关依赖,因仅涉及 订阅 ,顾不考虑发布等。 <dependency> <groupId>org.eclipse.paho</groupId> import org.eclipse.paho.client. mqtt v3. Mqtt Client; import org.eclipse.paho.client. mqtt v3. Mqtt ConnectOptions; import org.eclipse.paho.cl