最近项目用到了mqtt,所以记录下SpringBoot集成MQTT的步骤和注意事项,整理一下知识,方便自己和他人。

一、pom文件里引入maven依赖jar包

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>

二、在application.yml配置文件里加入mqtt配置信息

snake:
server:
mqttAddr: 162.11.22.33:1883
mqttUser: admin
mqttPwd: 111111

三、新建配置映射类SnakeServerProperties

@Data
@Component
@ConfigurationProperties(prefix = "snake.server")
public class SnakeServerProperties {

private String mqttPwd;
private String mqttAddr;
private String mqttUser;
}

四、新建mqtt连接的工厂类MqttFactory

里面会有一个getInstance方法,用来构造MqttClient。

这边需要注意的有两点:

1.这边要考虑mqtt断开自动重连的情况。
2.如果要分布式部署,clientId不能设置成一样的,不然会导致多个实例相同mqttClient抢占连接。这会导致明明获取到mqttClient,但发送消息却报异常。

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

/**
* @Author: 夏威夷8080
* @Date: 2022/8/12 15:38
*/
@Slf4j
@Component
public class MqttFactory {

private ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();

@Autowired
private SnakeServerProperties snakeServerProperties;


/**
* 初始化客户端
*/
public MqttClient getInstance(String clientId) {
MqttClient client = null;
String key = clientId;
if (clientMap.get(key) == null) {
try {
client = new MqttClient("tcp://" + snakeServerProperties.getMqttAddr(), clientId);
// MQTT配置对象
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置自动重连, 其它具体参数可以查看MqttConnectOptions
mqttConnectOptions.setAutomaticReconnect(true);
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
// mqttConnectOptions.setCleanSession(true);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setUserName(snakeServerProperties.getMqttUser());
mqttConnectOptions.setPassword(snakeServerProperties.getMqttPwd().toCharArray());
// mqttConnectOptions.setServerURIs(new String[]{url});
// 设置会话心跳时间 单位为秒
mqttConnectOptions.setKeepAliveInterval(10);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
// mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);

if (!client.isConnected()) {
client.connect(mqttConnectOptions);
}
log.info("MQTT创建client成功={}", JSONObject.toJSONString(client));
clientMap.put(key, client);
} catch (MqttException e) {
log.error("MQTT连接消息服务器[{}]失败", key + "-" + snakeServerProperties.getMqttAddr());
}
} else {
client = clientMap.get(key);
log.info("MQTT从map里获取到client={}", JSONObject.toJSONString(client));
if (!client.isConnected()) {
// 如果缓存里的client已经断开,则清除该缓存,再重新创建客户端连接
clientMap.remove(key);
this.getInstance(clientId);
}
}
return client;
}

}

五、新建MqttTemplate模板类

里面定义了发送消息和订阅消息两个方法。

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
* @Author: 夏威夷8080
* @Date: 2022/8/12 15:47
*/
@Slf4j
@Component
public class MqttTemplate {

@Autowired
private MqttFactory mqttFactory;

/**
* 发送消息
*
* @param topic 主题
* @param data 消息内容
*/
public void send(String clientId, String topic, Object data) {
// 获取客户端实例
MqttClient client = mqttFactory.getInstance(clientId);
try {
// 转换消息为json字符串
String json = JSONObject.toJSONString(data);
log.info("MQTT主题[{}]发送消息...\r\n{}", topic, json);
client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));
} catch (MqttException e) {
log.error("MQTT主题[{}]发送消息失败,{}", topic, Throwables.getStackTraceAsString(e));
}
}

/**
* 订阅主题
*
* @param topic 主题
* @param listener 消息监听处理器
*/
public void subscribe(String clientId, String topic, IMqttMessageListener listener) {
MqttClient client = mqttFactory.getInstance(clientId);
try {
log.info("MQTT订阅主题[{}]...", topic);
client.subscribe(topic, listener);
} catch (MqttException e) {
log.error("MQTT订阅主题[{}]失败,{}", topic, Throwables.getStackTraceAsString(e));
}
}

}

六、新建业务操作MQTT处理类MyHandler

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
* @Author: 夏威夷8080
* @Date: 2022/8/12 15:59
*/
@Component
public class MyHandler {

@Autowired
private MqttTemplate mqttTemplate;

public void listener(String clientId, String topic) {
mqttTemplate.subscribe(clientId, topic, new SnakeMqttMessageListener());
}

public void send(String clientId, String topic, CmdRequest cmdRequest) {
mqttTemplate.send(clientId, topic, cmdRequest);
}

}

七、新建自己的mqtt消息监听处理类,需要实现IMqttMessageListener接口

这边无法直接注入bean,需要从applicationContext里拿

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
* @Author: 夏威夷8080
* @Date: 2022/8/12 15:50
*/
@Slf4j
public class SnakeMqttMessageListener implements IMqttMessageListener {

/**
* 处理消息
*
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("MQTT实时订阅主题[{}]发来消息[{}]", topic, new String(mqttMessage.getPayload()));
ApplicationContext applicationContext = SpringContextHolder.getApplicationContext();
xxxxxxx xxx = applicationContext.getBean(xxxx.class);
}

}

八、顺便把工具类也贴出来SpringContextHolder

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;

/**
* @author 夏威夷8080
* @date 2018/6/27 Spring 工具类
*/
@Slf4j
@Service
@Lazy(false)
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {

private static ApplicationContext applicationContext = null;

/**
* 取得存储在静态变量中的ApplicationContext.
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

/**
* 实现ApplicationContextAware接口, 注入Context到静态变量中.
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
SpringContextHolder.applicationContext = applicationContext;
}

/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) {
return (T) applicationContext.getBean(name);
}

/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
public static <T> T getBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType);
}

/**
* 清除SpringContextHolder中的ApplicationContext为Null.
*/
public static void clearHolder() {
if (log.isDebugEnabled()) {
log.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext);
}
applicationContext = null;
}

/**
* 发布事件
* @param event
*/
public static void publishEvent(ApplicationEvent event) {
if (applicationContext == null) {
return;
}
applicationContext.publishEvent(event);
}

/**
* 实现DisposableBean接口, 在Context关闭时清理静态变量.
*/
@Override
public void destroy() {
SpringContextHolder.clearHolder();
}

}

OK,SpringBoot集成MQTT的步骤和注意事项到这边就介绍完了,希望对大家有帮助,有问题可以在下面留言。



java 重入锁原理 java重入锁有什么用

一、重入锁的定义:为什么会叫重入锁,顾名思义,表示这个锁可以返回被添加,就是一个线程可以多次获得一把锁,只要在最后的时候做相同次数的锁释放即可。Lock lock = new ReentrantLock(); lock.lock(); lock.lock(); try { //业务代码 } finally { lock.unlock(); lock.unlock(); }二、重入