相关文章推荐

emqx是一种基于mqtt协议实现的消息框架,目前很多地方已经开始使用。简单的emqx安装配置,其实就可以作为一个broker来使用,客户端只需要通过ip、端口、用户名、密码、clientid就可以连接了,至于发送消息,直接在发送的时候指定topic即可。

简单的emqx安装配置,使用的协议是:mqtt:tcp,使用的url是tcp://ip:1883。这种方式,其实可以很容易被模拟(理论上的可能),所以有了mqtts:tls协议,一般使用ssl协议来实现。这也是本文所要阐述的问题。

我在配置ssl证书的时候,服务启动没问题,最后客户端连接,死活连接失败。最后看了 官方的文档 ,按照那个文档很容易就配置成功,并且客户端连接也成功了。

其实我大概知道我配置失败的原因,可能是因为证书中的一个跟主机ip相关的设置问题,但是这个问题我按照一些博客的提示做了修改,但是后来还是没有成功,很郁闷,最后按照官方配置,很快设置好了。思路基本一模一样,而且配置证书的步骤也都类似,先生成私钥,然后生成证书请求,最后根据根证书生成服务端证书和客户端证书。

emqx的下载安装这里不做过多的说明,在官网直接下载,解压然后运行bin/emqx start就可以了。

emqx安装目录下有一个etc/certs目录,它自带了一些根证书、服务端证书、客户端证书,但是为了保险起见,最好自己根据自己的主机地址生成。

下面基本是按照emqx官方的一篇博客来配置的,就是主机地址做了修改。

以下的证书配置,是自签名证书,不是花钱购买的那种服务器证书。

1、生成根证书私钥

openssl genrsa -out ca.key 2048

    2、生成根证书

openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem

    这一步会有交互提示,需要输入省份、城市、公司、组织、姓名、右键,随便填写就可以,这里的信息不会影响根证书使用。 

    3、生成服务端证书私钥

openssl genrsa -out emqx.key 2048

     4、生成服务端证书请求

    按照官方博客的意思,这里需要手动创建一个openssl.cnf的文件,其实主要就是设置IP地址

[req]
default_bits  = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = Hubei
localityName = Wuhan
organizationName = EMQX
commonName = CA
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = 192.168.226.100
DNS.1 = 192.168.226.100

    如果你的服务器是虚拟机,只需要修改IP.1的值即可。

    执行如下命令生成服务端证书请求:

openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr

    私钥的生成,就使用了openssl.cnf配置文件 ,所以我之前的配置失败,应该是这里的IP设置问题。但是我确实根据某些博客的提示,设置subj "/CN=192.168.226.100",但是不生效。

     5、生成服务端证书

openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf

     6、生成客户端证书私钥

openssl genrsa -out client.key 2048

     7、生成客户端证书请求

openssl req -new -key client.key -out client.csr -subj "/C=CN/ST=Zhejiang/L=Hangzhou/O=EMQX/CN=client"

      8、生成客户端证书

openssl x509 -req -days 3650 -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem

一顿操作之后certs目录下生成的文件:

以上,只是生成了相关证书,在emqx的配置文件etc/emqx.conf中还需要做相关设置:

listener.ssl.external.keyfile = etc/certs/emqx.key
listener.ssl.external.certfile = etc/certs/emqx.pem
listener.ssl.external.cacertfile = etc/certs/ca.pem
listener.ssl.external.verify = verify_peer
listener.ssl.external.fail_if_no_peer_cert = true

    这几个主要的配置在原来的emqx.conf中都有,只需要打开或者修改配置值即可,主要是证书的名字。

    重启emqx服务,验证。bin/emqx restart

     这里使用mqttbox来验证,其实mqtt.x也是可以的,客户端需要的文件:ca.pem,client.pem,client.key 

     普通的mqtt/tcp协议连接配置:

    mqtts/tls协议连接配置:

    配置需要注意的地方在上图中已经标识出来了,协议类型:mqtts/tls ,证书类型:自签名证书,ca文件:ca.pem,客户端证书:client.pem,客户端私钥:client.key。

     配置完成,保存,可以看到连接状态变为绿色,如果是红色,那么就需要检查证书是否配置正确,包括生成和在emqx.conf配置文件中的设置。

     发送消息测试:

  下面通过java代码来验证,这里使用springboot+spring-integration-mqtt来实现。

  maven工程的pom.xml

<dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.eclipse.paho</groupId>
    	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    	<version>1.2.5</version>
    </dependency>
    <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk15on</artifactId>
            <version>1.47</version>
     </dependency>

    application-dev.yml

mqtt:
  serverURIs: ssl://192.168.226.100:8883
  username: admin
  password: public
  client:
    id: ${random.value}
  topic: test

     MqttConfig.java

package com.huali.mec.receiver.config;
import java.util.Objects;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.huali.mec.receiver.util.SslUtil;
@IntegrationComponentScan
@Configuration
public class MqttConfig {
	public static final Logger log  = LoggerFactory.getLogger(MqttConfig.class);
	public static final String OUTPUT_CHANNEL = "mqttOutputChannel";
	public static final String INPUT_CHANNEL = "mqttInputChannel";
	@Value("${mqtt.username}")
	private String username ;
	@Value("${mqtt.password}")
	private String password ;
	@Value("${mqtt.serverURIs}")
	private String serverURIs ;
	@Value("${mqtt.client.id}")
	private String clientId;
	@Value("${mqtt.topic}")
	private String defaultTopic ;
	@Bean
	public MqttPahoClientFactory clientFactory() {
		final MqttConnectOptions options = new MqttConnectOptions();
		options.setServerURIs(new String[] {serverURIs});
		options.setUserName(username);
		options.setPassword(password.toCharArray());
		options.setKeepAliveInterval(10);
		options.setAutomaticReconnect(true);
		try {
			options.setSocketFactory(SslUtil.getSocketFactory("src/main/resources/ssl/ca.pem", "src/main/resources/ssl/client.pem", "src/main/resources/ssl/client.key", password));
		} catch (Exception e) {
			e.printStackTrace();
		final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		factory.setConnectionOptions(options);
		return factory;
	@Bean(value = OUTPUT_CHANNEL)
	public MessageChannel mqttOutChannel() {
		return new DirectChannel();
	@Bean
	@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
	public MessageHandler mqttOutbound() {
		final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
		handler.setDefaultQos(1);
		handler.setDefaultRetained(false);
		handler.setDefaultTopic(defaultTopic);
		handler.setAsync(false);
		handler.setAsyncEvents(false);
		return handler;
	@Bean
	public MessageChannel mqttInputChannel() {
		return new DirectChannel();
	@Bean
	public MessageProducer inbound() {
		MqttPahoMessageDrivenChannelAdapter adapter = 
				new MqttPahoMessageDrivenChannelAdapter(
				clientId+"_inbound", clientFactory(), defaultTopic);
		adapter.setCompletionTimeout(3000);
		adapter.setConverter(new DefaultPahoMessageConverter());
		adapter.setQos(1);
		adapter.setOutputChannel(mqttInputChannel());
		return adapter;
	@Autowired
	private ApplicationEventPublisher eventPublisher ;
	@Bean
	@ServiceActivator(inputChannel = INPUT_CHANNEL)
	public MessageHandler handler() {
		return message -> {
			String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
			log.info("topic :{} ,payload :{}",topic,message.getPayload().toString());
			//eventPublisher.publishEvent(new MqttEvent(this, topic, message.getPayload().toString()));

    SslUtil.java,读取客户端证书并生成SSLSocketFactory给MQTT连接使用

package com.huali.mec.receiver.util;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMReader;
import org.bouncycastle.openssl.PasswordFinder;
public class SslUtil {
	public static SSLSocketFactory getSocketFactory(final String caCrtFile,final String clientCrtFile,
			final String keyFile,final String password) throws Exception {
		Security.addProvider(new BouncyCastleProvider());
		//load ca certificate
		PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
		X509Certificate caCert = (X509Certificate)reader.readObject();
		reader.close();
		//load client certificate
		reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(clientCrtFile)))));
		X509Certificate clientCert = (X509Certificate)reader.readObject();
		reader.close();
		//load client key
		reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
				new PasswordFinder() {
					@Override
					public char[] getPassword() {
						return password.toCharArray();
		KeyPair key = (KeyPair)reader.readObject();
		reader.close();
		KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
		keyStore.load(null, null);
		keyStore.setCertificateEntry("ca-certificate", caCert);
		TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
		tmf.init(keyStore);
		KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
		ks.load(null, null);
		ks.setCertificateEntry("certificate", clientCert);
		ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),new java.security.cert.Certificate[] {clientCert});
		KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
		kmf.init(ks,password.toCharArray());
		SSLContext context = SSLContext.getInstance("TLSv1.2");
		context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
		return context.getSocketFactory();

客户端使用的三个证书文件放到resources/ssl目录下,ca.pem,client.pem,client.key

emqx是一种基于mqtt协议实现的消息框架,目前很多地方已经开始使用。简单的emqx安装配置,其实就可以作为一个broker来使用,客户端只需要通过ip、端口、用户名、密码、clientid就可以连接了,至于发送消息,直接在发送的时候指定topic即可。 简单的emqx安装配置,使用的协议是:mqtt:tcp,使用的url是tcp://ip:1883。这种方式,其实可以很容易被模拟(理论上的可能),所以有了mqtts:tls协议,一般使用ssl协议来实现。这也是本文所要阐述的问题。... 1.linux系统下,centos 7环境 2.使用docker运行emqtt服务器,参照:https://blog.csdn.net/u011089760/article/details/89892591
实现服务器 TLS 需要几个步骤: 1. 安装并配置 TLS 证书: 在使用 TLS 之前,您需要获得一个证书并配置服务器以使用它。您可以通过购买证书或使用 Let's Encrypt 免费证书获得证书。 2. 修改代码以启用 TLS: 您需要在服务器代码中添加代码,以启用 TLS 并将其与 epoll 结合起来。您需要在套接字上设置 TLS 参数,并在接受到客户端连接时协商 TLS 握手。 3. 测试 TLS 连接: 测试您的服务器以确保 TLS 已正确配置且能够安全地与客户端通信。 这些步骤中的每一步都有许多细节,您可能需要参考相关文档和教程来完成。您可以使用 OpenSSL 库或其他库来实现 TLS,但是您仍需要了解如何使用这些库与 epoll 配合使用。