1、SSL/TLS简介
SSL(SecureSocket Layer)安全套接层,是网景公司提出的用于保证Server与client之间安全通信的一种协议,该协议位于TCP/IP协议与各应用层协议之间,即SSL独立于各应用层协议,因此各应用层协议可以透明地调用SSL来保证自身传输的安全性。目前,SSL被大量应用于http的安全通信中,MQTT协议与http协议同样属于应用层协议,因此也可以像http协议一样使用ssl为自己的通信提供安全保证。
SSL与TLS(Transport LayerSecurity Protocol)之间的关系:TLS(TransportLayer Security,传输层安全协议)是IETF(InternetEngineering Task Force,Internet工程任务组)制定的一种新的协议,它建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本。在TLS与SSL3.0之间存在着显著的差别,主要是它们所支持的加密算法不同,所以TLS与SSL3.0不能互操作。
2、使用Openssl创建tls证书
SSL在身份认证过程中需要有一个双方都信任的CA签发的证书,CA签发证书是需要收费的,但是在测试过程中,可以自己产生一个CA,然后用自己产生的CA签发证书,下面的mosquitto的ssl功能的测试过程就是采用这一方式,其过程如下:
步骤一:
产生自己的CA
openssl req -new -x509 -days 36500 -extensions v3_ca -keyout ca.key -out ca.crt
openssl req -new -x509 -days 36500 -extensions v3_ca -keyout ca.key -out ca.pem
步骤二:
产生服务端证书
openssl genrsa -des3 -out server.key 2048
openssl req -out server.csr -key server.key -new
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 36500
步骤三:
产生客户端证书
openssl genrsa -out client-key.pem 2048
openssl req -out client.csr -key client-key.pem -new
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client-crt.pem -days 36500
经过上面8条命令后,即可生成所需的所有证书文件,其中:
客户端使用:ca.pem、client-crt.pem、client-key.pem
服务端使用:ca.crt、server.crt、server.key
3、
mosquitto.conf配置如下:
4、golang客户端测试代码
1 package cmd
3 import (
4 "crypto/tls"
5 "crypto/x509"
6 fmt "fmt"
7 "io/ioutil"
8 "os"
9 "time"
11 "github.com/apex/log"
12 MQTT "github.com/eclipse/paho.mqtt.golang"
13 )
15 var ctx log.Interface
17 const QoS = 0x02
19 func init() {
20 fmt.Printf("init mqtt test\n")
22 }
24 func RunMqttClient() {
25 fmt.Printf("Run mqtt test\n")
26 var logLevel = log.InfoLevel
27 ctx = &log.Logger{
28 Level: logLevel,
29 Handler: NewLogHanler(os.Stdout),
30 }
32 mqttClient := NewClient(
33 ctx,
34 "ttnhdl",
35 "",
36 "",
37 fmt.Sprintf("ssl://%s", "192.168.195.201:8883"),
38 )
40 var err = mqttClient.Connect()
41 if err != nil {
42 ctx.WithError(err).Fatal("Could not connect to MQTT")
43 fmt.Printf("Could not connect to MQTT\n")
44 } else {
45 fmt.Printf("Success connect to MQTT\n")
46 }
48 mqttClient.PublishUplink("test", "hello mqtt!")
49 mqttClient.SubscribeUplink("test")
51 for true {
53 }
54 }
56 // Client connects to the MQTT server and can publish/subscribe on uplink, downlink and activations from devices
57 type Client interface {
58 Connect() error
59 Disconnect()
61 IsConnected() bool
63 // Uplink pub/sub
64 PublishUplink(topic string, msg string) Token
65 SubscribeUplink(topic string) Token
66 }
68 type Token interface {
69 Wait() bool
70 WaitTimeout(time.Duration) bool
71 Error() error
72 }
74 type simpleToken struct {
75 err error
76 }
78 // Wait always returns true
79 func (t *simpleToken) Wait() bool {
80 return true
81 }
83 // WaitTimeout always returns true
84 func (t *simpleToken) WaitTimeout(_ time.Duration) bool {
85 return true
86 }
88 // Error contains the error if present
89 func (t *simpleToken) Error() error {
90 return t.err
91 }
93 type defaultClient struct {
94 mqtt MQTT.Client
95 ctx log.Interface
96 }
98 func NewClient(ctx log.Interface, id, username, password string, brokers ...string) Client {
99 tlsconfig := NewTLSConfig()
101 mqttOpts := MQTT.NewClientOptions()
103 for _, broker := range brokers {
104 mqttOpts.AddBroker(broker)
105 }
107 mqttOpts.SetClientID("ypf_dewqfvcdeqfcdqwcdq")
108 mqttOpts.SetUsername(username)
109 mqttOpts.SetPassword(password)
111 // TODO: Some tuning of these values probably won't hurt:
112 mqttOpts.SetKeepAlive(30 * time.Second)
113 mqttOpts.SetPingTimeout(10 * time.Second)
115 // Usually this setting should not be used together with random ClientIDs, but
116 // we configured The Things Network's MQTT servers to handle this correctly.
117 mqttOpts.SetCleanSession(false)
119 mqttOpts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
120 ctx.WithField("message", msg).Warn("Received unhandled message")
121 })
123 mqttOpts.SetConnectionLostHandler(func(client MQTT.Client, err error) {
124 ctx.WithError(err).Warn("Disconnected, reconnecting...")
125 })
127 mqttOpts.SetOnConnectHandler(func(client MQTT.Client) {
128 ctx.Debug("Connected")
129 })
131 mqttOpts.SetTLSConfig(tlsconfig)
133 return &defaultClient{
134 mqtt: MQTT.NewClient(mqttOpts),
135 ctx: ctx,
136 }
137 }
139 var (
140 // ConnectRetries says how many times the client should retry a failed connection
141 ConnectRetries = 10
142 // ConnectRetryDelay says how long the client should wait between retries
143 ConnectRetryDelay = time.Second
144 )
146 func (c *defaultClient) Connect() error {
147 if c.mqtt.IsConnected() {
148 return nil
149 }
150 var err error
151 for retries := 0; retries < ConnectRetries; retries++ {
152 token := c.mqtt.Connect()
153 token.Wait()
154 err = token.Error()
155 if err == nil {
156 break
157 }
158 <-time.After(ConnectRetryDelay)
159 }
160 if err != nil {
161 return fmt.Errorf("Could not connect: %s", err)
162 }
163 return nil
164 }
166 func (c *defaultClient) Disconnect() {
167 if !c.mqtt.IsConnected() {
168 return
169 }
170 c.mqtt.Disconnect(25)
171 }
173 func (c *defaultClient) IsConnected() bool {
174 return c.mqtt.IsConnected()
175 }
177 func (c *defaultClient) PublishUplink(topic string, msg string) Token {
178 return c.mqtt.Publish(topic, QoS, false, msg)
179 }
181 func (c *defaultClient) SubscribeUplink(topic string) Token {
182 return c.mqtt.Subscribe(topic, QoS, func(mqtt MQTT.Client, msg MQTT.Message) {
183 // Determine the actual topic
184 fmt.Printf("Success SubscribeUplink with msg:%s\n", msg.Payload())
185 })
186 }
188 func NewTLSConfig() *tls.Config {
189 // Import trusted certificates from CAfile.pem.
190 // Alternatively, manually add CA certificates to
191 // default openssl CA bundle.
192 certpool := x509.NewCertPool()
193 pemCerts, err := ioutil.ReadFile("samplecerts/ca.pem")
194 if err == nil {
195 certpool.AppendCertsFromPEM(pemCerts)
196 }
197 fmt.Println("0. resd pemCerts Success")
199 // Import client certificate/key pair
200 cert, err := tls.LoadX509KeyPair("samplecerts/client-crt.pem", "samplecerts/client-key.pem")
201 if err != nil {
202 panic(err)
203 }
204 fmt.Println("1. resd cert Success")
206 // Just to print out the client certificate..
207 cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
208 if err != nil {
209 panic(err)
210 }
211 fmt.Println("2. resd cert.Leaf Success")
213 // Create tls.Config with desired tls properties
214 return &tls.Config{
215 // RootCAs = certs used to verify server cert.
216 RootCAs: certpool,
217 // ClientAuth = whether to request cert from server.
218 // Since the server is set up for SSL, this happens
219 // anyways.
220 ClientAuth: tls.NoClientCert,
221 // ClientCAs = certs used to validate client cert.
222 ClientCAs: nil,
223 // InsecureSkipVerify = verify that cert contents
224 // match server. IP matches what is in cert etc.
225 InsecureSkipVerify: true,
226 // Certificates = list of certs client sends to server.
227 Certificates: []tls.Certificate{cert},
228 }
229 }
5、测试效果
服务端启动:
客户端运行:
6、JAVA版客户端实现
依赖:
org.eclipse.paho.client.mqttv3
、
bcprov-jdk16-1.45.jar
MqttServiceClient代码:
1 package com.ypf.main;
3 import java.util.Properties;
5 import org.eclipse.paho.client.mqttv3.MqttCallback;
6 import org.eclipse.paho.client.mqttv3.MqttClient;
7 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
8 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
9 import org.eclipse.paho.client.mqttv3.MqttException;
10 import org.eclipse.paho.client.mqttv3.MqttMessage;
11 import org.eclipse.paho.client.mqttv3.MqttSecurityException;
12 import org.eclipse.paho.client.mqttv3.MqttTopic;
13 import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;
15 import com.ypf.mqtt.SslUtil;
17 /**
18 *
19 * @author LP by 2014-04-24
20 *
21 */
22 public class MqttServiceClient implements MqttCallback {
24 private static final String MQTT_HOST = "ssl://192.168.195.201:8884";
25 private static final String MQTT_CLIENT = "Test_";
26 public static String caFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/ca.crt";
27 public static String clientCrtFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/client.crt";
28 public static String clientKeyFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/client.key";
30 public static MqttServiceClient mqttServiceClient = null;
32 private MqttClient client = null;
33 private MqttConnectOptions options = null;
35 /**
36 * 单例模式构造类
37 */
38 public static MqttServiceClient getInstance() {
39 if (mqttServiceClient == null) {
40 mqttServiceClient = new MqttServiceClient();
41 }
42 return mqttServiceClient;
43 }
45 private MqttServiceClient() {
46 System.out.println("init MQTTClientService");
47 init();
48 }
49 // The major API implementation follows :-
51 /**
52 * 初始化
53 */
54 private void init() {
55 try {
57 // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
58 client = new MqttClient(MQTT_HOST, MQTT_CLIENT, new MemoryPersistence());
59 // MQTT的连接设置
60 options = new MqttConnectOptions();
61 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
62 options.setCleanSession(true);
63 // 设置连接的用户名
64 options.setUserName("ypf");
65 // 设置连接的密码
66 options.setPassword("ruijie".toCharArray());
67 // 设置超时时间 单位为秒
68 options.setConnectionTimeout(50);
69 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
70 options.setKeepAliveInterval(30);
71 // TLS连接配置
72 options.setSocketFactory(
73 SslUtil.getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "cs123456"));
75 // 设置回调
76 client.setCallback(this);
78 } catch (Exception e) {
79 e.printStackTrace();
80 }
81 }
82 /**
83 * 连接到MQTT
84 */
85 void connect() {
86 System.out.println("Start connect----------");
87 try {
88 client.connect(options);
89 //订阅主题的方法,2为消息的质量
90 client.subscribe("+/#", 2);
91 //发送消息
92 publish("test", "撒打发水电费水电费");
93 } catch (Exception e) {
94 e.printStackTrace();
95 }
96 }
98 /**
99 * 断开连接到MQTT
100 */
101 public void disconnect() {
102 System.out.println("Start disconnect----------");
103 try {
104 client.disconnect();
105 } catch (MqttSecurityException e) {
106 e.printStackTrace();
107 } catch (MqttException e) {
108 e.printStackTrace();
109 }
110 }
112 /**
113 * 发布消息
114 * @param topic 主题
115 * @param msg 消息
116 */
117 public void publish(String topic, String msg) {
118 System.out.println("Start publish----------");
119 try {
120 MqttTopic mqttTopic = client.getTopic(topic);
121 //2为消息的质量
122 MqttDeliveryToken messageToken = mqttTopic.publish(msg.getBytes(), 2, true);
123 System.out.println("publish success==>"+messageToken.getMessage());
124 // client.publish(topic, 2, msg);
125 } catch (Exception e) {
126 e.printStackTrace();
127 }
128 }
131 // -------------------------------------------------回调方法------------------------------------------------------------//
133 /**
134 * 连接断开触发此方法
135 */
136 @Override
137 public void connectionLost(Throwable cause) {
138 System.out.println("Connection Lost---------->" + cause.getMessage());
139 }
141 /**
142 * 消息达到触发此方法
143 */
144 @Override
145 public void messageArrived(MqttTopic topic, MqttMessage message)
146 throws Exception {
147 System.out.println(topic + ":" + message.toString());
148 }
150 /**
151 * 消息发送成功触发此方法
152 */
153 @Override
154 public void deliveryComplete(MqttDeliveryToken token) {
155 try {
156 System.out.println("deliveryComplete---------" + token.getMessage());
157 } catch (MqttException e) {
158 e.printStackTrace();
159 }
160 }
163 public static void main(String[] args)throws Exception {
165 //MqttServiceClient.getInstance().disconnect();
166 MqttServiceClient.getInstance().connect();
168 new Thread() {
169 public void run() {
170 int count = 0;
171 while(true && count < 3) {
172 try {
173 Thread.sleep(1000*3);
174 } catch (InterruptedException e) {
175 e.printStackTrace();
176 }
177 MqttServiceClient.getInstance().publish("test1/ypf", "hello world ! count=" + count);
178 count ++;
179 }
180 };
181 }.start();
182 }
184 }
SslUtil代码:
1 package com.ypf.mqtt;
3 import java.io.ByteArrayInputStream;
4 import java.io.InputStreamReader;
5 import java.nio.file.Files;
6 import java.nio.file.Paths;
7 import java.security.KeyPair;
8 import java.security.KeyStore;
9 import java.security.Security;
10 import java.security.cert.X509Certificate;
12 import javax.net.ssl.KeyManagerFactory;
13 import javax.net.ssl.SSLContext;
14 import javax.net.ssl.SSLSocketFactory;
15 import javax.net.ssl.TrustManagerFactory;
17 import org.bouncycastle.jce.provider.BouncyCastleProvider;
18 import org.bouncycastle.openssl.*;
20 public class SslUtil {
21 public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile,
22 final String password) throws Exception {
23 Security.addProvider(new BouncyCastleProvider());
25 // load CA certificate
26 PEMReader reader = new PEMReader(
27 new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
28 X509Certificate caCert = (X509Certificate) reader.readObject();
29 reader.close();
31 // load client certificate
32 reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile)))));
33 X509Certificate cert = (X509Certificate) reader.readObject();
34 reader.close();
36 // load client private key
37 reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
38 new PasswordFinder() {
39 @Override
40 public char[] getPassword() {
41 return password.toCharArray();
42 }
43 });
44 KeyPair key = (KeyPair) reader.readObject();
45 reader.close();
47 // CA certificate is used to authenticate server
48 KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
49 caKs.load(null, null);
50 caKs.setCertificateEntry("ca-certificate", caCert);
51 TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
52 tmf.init(caKs);
54 // client key and certificates are sent to server so it can authenticate
55 // us
56 KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
57 ks.load(null, null);
58 ks.setCertificateEntry("certificate", cert);
59 ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
60 new java.security.cert.Certificate[] { cert });
61 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
62 kmf.init(ks, password.toCharArray());
64 // finally, create SSL socket factory
65 SSLContext context = SSLContext.getInstance("TLSv1");
66 context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
68 return context.getSocketFactory();
69 }