openssl x509 -req -days 3650 -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem
一顿操作之后certs目录下生成的文件:
以上,只是生成了相关证书,在emqx的配置文件etc/emqx.conf中还需要做相关设置:
listener.ssl.external.keyfile = etc/certs/emqx.key
listener.ssl.external.certfile = etc/certs/emqx.pem
listener.ssl.external.cacertfile = etc/certs/ca.pem
listener.ssl.external.verify = verify_peer
listener.ssl.external.fail_if_no_peer_cert = true
这几个主要的配置在原来的emqx.conf中都有,只需要打开或者修改配置值即可,主要是证书的名字。
重启emqx服务,验证。bin/emqx restart
这里使用mqttbox来验证,其实mqtt.x也是可以的,客户端需要的文件:ca.pem,client.pem,client.key
普通的mqtt/tcp协议连接配置:
mqtts/tls协议连接配置:
配置需要注意的地方在上图中已经标识出来了,协议类型:mqtts/tls ,证书类型:自签名证书,ca文件:ca.pem,客户端证书:client.pem,客户端私钥:client.key。
配置完成,保存,可以看到连接状态变为绿色,如果是红色,那么就需要检查证书是否配置正确,包括生成和在emqx.conf配置文件中的设置。
发送消息测试:
下面通过java代码来验证,这里使用springboot+spring-integration-mqtt来实现。
maven工程的pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.47</version>
</dependency>
application-dev.yml
mqtt:
serverURIs: ssl://192.168.226.100:8883
username: admin
password: public
client:
id: ${random.value}
topic: test
MqttConfig.java
package com.huali.mec.receiver.config;
import java.util.Objects;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.huali.mec.receiver.util.SslUtil;
@IntegrationComponentScan
@Configuration
public class MqttConfig {
public static final Logger log = LoggerFactory.getLogger(MqttConfig.class);
public static final String OUTPUT_CHANNEL = "mqttOutputChannel";
public static final String INPUT_CHANNEL = "mqttInputChannel";
@Value("${mqtt.username}")
private String username ;
@Value("${mqtt.password}")
private String password ;
@Value("${mqtt.serverURIs}")
private String serverURIs ;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic ;
@Bean
public MqttPahoClientFactory clientFactory() {
final MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] {serverURIs});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(10);
options.setAutomaticReconnect(true);
try {
options.setSocketFactory(SslUtil.getSocketFactory("src/main/resources/ssl/ca.pem", "src/main/resources/ssl/client.pem", "src/main/resources/ssl/client.key", password));
} catch (Exception e) {
e.printStackTrace();
final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
return factory;
@Bean(value = OUTPUT_CHANNEL)
public MessageChannel mqttOutChannel() {
return new DirectChannel();
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler mqttOutbound() {
final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
handler.setDefaultQos(1);
handler.setDefaultRetained(false);
handler.setDefaultTopic(defaultTopic);
handler.setAsync(false);
handler.setAsyncEvents(false);
return handler;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId+"_inbound", clientFactory(), defaultTopic);
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
@Autowired
private ApplicationEventPublisher eventPublisher ;
@Bean
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public MessageHandler handler() {
return message -> {
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
log.info("topic :{} ,payload :{}",topic,message.getPayload().toString());
//eventPublisher.publishEvent(new MqttEvent(this, topic, message.getPayload().toString()));
SslUtil.java,读取客户端证书并生成SSLSocketFactory给MQTT连接使用
package com.huali.mec.receiver.util;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMReader;
import org.bouncycastle.openssl.PasswordFinder;
public class SslUtil {
public static SSLSocketFactory getSocketFactory(final String caCrtFile,final String clientCrtFile,
final String keyFile,final String password) throws Exception {
Security.addProvider(new BouncyCastleProvider());
//load ca certificate
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
X509Certificate caCert = (X509Certificate)reader.readObject();
reader.close();
//load client certificate
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(clientCrtFile)))));
X509Certificate clientCert = (X509Certificate)reader.readObject();
reader.close();
//load client key
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
new PasswordFinder() {
@Override
public char[] getPassword() {
return password.toCharArray();
KeyPair key = (KeyPair)reader.readObject();
reader.close();
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", clientCert);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),new java.security.cert.Certificate[] {clientCert});
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks,password.toCharArray());
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context.getSocketFactory();
客户端使用的三个证书文件放到resources/ssl目录下,ca.pem,client.pem,client.key
emqx是一种基于mqtt协议实现的消息框架,目前很多地方已经开始使用。简单的emqx安装配置,其实就可以作为一个broker来使用,客户端只需要通过ip、端口、用户名、密码、clientid就可以连接了,至于发送消息,直接在发送的时候指定topic即可。 简单的emqx安装配置,使用的协议是:mqtt:tcp,使用的url是tcp://ip:1883。这种方式,其实可以很容易被模拟(理论上的可能),所以有了mqtts:tls协议,一般使用ssl协议来实现。这也是本文所要阐述的问题。...
1.linux系统下,centos 7环境
2.使用docker运行emqtt服务器,参照:https://blog.csdn.net/u011089760/article/details/89892591
实现服务器 TLS 需要几个步骤:
1. 安装并配置 TLS 证书: 在使用 TLS 之前,您需要获得一个证书并配置服务器以使用它。您可以通过购买证书或使用 Let's Encrypt 免费证书获得证书。
2. 修改代码以启用 TLS: 您需要在服务器代码中添加代码,以启用 TLS 并将其与 epoll 结合起来。您需要在套接字上设置 TLS 参数,并在接受到客户端连接时协商 TLS 握手。
3. 测试 TLS 连接: 测试您的服务器以确保 TLS 已正确配置且能够安全地与客户端通信。
这些步骤中的每一步都有许多细节,您可能需要参考相关文档和教程来完成。您可以使用 OpenSSL 库或其他库来实现 TLS,但是您仍需要了解如何使用这些库与 epoll 配合使用。