1.自制证书

官网配置全过程,里面有自制证书方法
https://www.emqx.cn/blog/emqx-server-ssl-tls-secure-connection-configuration-guide
pyton-mqtt-emqx启用ssl证书认证后使用连接demo_文件拷贝

2.SSL/TLS 启用及验证

在 EMQ X 中 mqtt:ssl 的默认监听端口为 8883。

EMQ X 配置
将前文中通过 OpenSSL 工具生成的 emqx.pem、emqx.key 及 ca.pem 文件拷贝到 EMQ X 的 etc/certs/ 目录下,并参考如下配置修改 emqx.conf

## listener.ssl.$name is the IP address and port that the MQTT/SSL
## Value: IP:Port | Port
listener.ssl.external = 8883
## Path to the file containing the user's private PEM-encoded key.
## Value: File
listener.ssl.external.keyfile = etc/certs/emqx.key
## 注意:如果 emqx.pem 是证书链,请确保第一个证书是服务器的证书,而不是 CA 证书。
## Path to a file containing the user certificate.
## Value: File
listener.ssl.external.certfile = etc/certs/emqx.pem
## 注意:ca.pem 用于保存服务器的中间 CA 证书和根 CA 证书。可以附加其他受信任的 CA,用来进行客户端证书验证。
## Path to the file containing PEM-encoded CA certificates. The CA certificates
## Value: File
listener.ssl.external.cacertfile = etc/certs/ca.pem
3.show code 
import datetime
import functools
import json
import os
import ssl
import time
import traceback
import paho.mqtt.client as mqclient
import threading
class MQTTClientInfoX:
    def __init__(self, brokerIP: str, brokerPort: int, clientId: str, user: str, pwd: str, certPath: str):
        self.brokerIP = brokerIP
        self.brokerPort = brokerPort
        self.clientId = clientId
        self.user = user
        self.pwd = pwd
        self.certPath = certPath
        # FIXME: how to annotation list?
        self.clientCfgs = list()
        self.eIds = None
    def __repr__(self):
        # user/password@127.0.0.1:8080, sys_robot, [<XXX: a->1, b->2>]
        return "<MQTT {}/{}@{}:{}, {}\n{}\n{}>".format(self.user, self.pwd, self.brokerIP, self.brokerPort,
                                                       self.clientId, self.certPath, self.clientCfgs)
class SimpleMqttClientCallElevTest(threading.Thread):
    def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId):
        threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
        self.mqsCI = mqttClientInfo
        self.connectedCB = None
        self.disconnectedCB = None
        self.mqttMsgRecvCB = None
        self.keepAliveInterval = None
        self.clientId = clientId
        self._isConnected = False
        self.sn = 1
    ''' should set before start '''
    def setCallback(self, connectedCB, disconnectedCB, mqttMsgRecvCB):
        self.connectedCB = connectedCB
        self.disconnectedCB = disconnectedCB
        self.mqttMsgRecvCB = mqttMsgRecvCB
    ''' should set before start '''
    def setKeepAlive(self, keepAliveInterval):
        self.keepAliveInterval = keepAliveInterval
    @property
    def isConnected(self):
        return self._isConnected
    def run(self):
        # FIXME: if clientId is different, multiple clients will request same topics, so use fixed number: 1
        # self.startClient("mqttElevClient_{}_{}_{}".format(MM.TOPIC_KEY_APP, MM.TOPIC_KEY_BUILDING_ID, 1))
        self.startClient(self.clientId)
    def startClient(self, clientId):
        print("starting mqttElevClient...")
        self.clientConn = mqclient.Client(client_id=clientId, clean_session=False)
        self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
        print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
        if self.mqsCI.certPath is not None:
            print('certPath validation completed', self.mqsCI.certPath)
            # self.clientConn.tls_set(ca_certs, certfile, keyfile, cert_reqs, tls_version, ciphers)
            if not os.path.exists(self.mqsCI.certPath):
                self.mqsCI.certPath = None  # ssl.CERT_NONE
            # FIXME: local server need a ca_cert, but server side does not need
            self.clientConn.tls_set(ca_certs=self.mqsCI.certPath, tls_version=ssl.PROTOCOL_TLSv1_2)
            # disables peer verification
            self.clientConn.tls_insecure_set(True)
            print('certPath is cer', self.mqsCI.certPath)
        else:
            print("ssl/tls is disabled")
        self.clientConn.on_connect = functools.partial(self.on_connect)
        self.clientConn.on_message = functools.partial(self.mqttMsgRecvCB)
        self.clientConn.on_disconnect = functools.partial(self.on_disconnect)
        self.clientConn.on_log = functools.partial(self.on_log)
        keepAliveTime = 30 if self.keepAliveInterval is None or self.keepAliveInterval < 0 else self.keepAliveInterval
        # -----------------------------------------------------------------------------
        while True:
            try:
                print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
                self.clientConn.connect(self.mqsCI.brokerIP, self.mqsCI.brokerPort, keepAliveTime)
                break
            except Exception as e:
                # traceback.print_exc()
                print('startClient :{}'.format(e))
                time.sleep(5)
        self.clientConn.loop_forever()
    def on_disconnect(self, client, userdata, rc):
        print("connection to MQTT broker is broken, {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
        if self.disconnectedCB is not None:
            try:
                self.disconnectedCB(client, userdata)
            except:
                traceback.print_exc()
        self._isConnected = False
    ''' connection event listener '''
    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("============================================")
            print("MQTT broker {}:{} - connected !!!".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
            print("============================================")
            if self.connectedCB is not None:
                try:
                    self.connectedCB(client, userdata)
                except:
                    traceback.print_exc()
            self._isConnected = True
        else:
            print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
            print(
                "MQTT broker {}:{} - Failed to connect, rc->{} !!!".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort,
                                                                           rc))
            print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
               5 - login failed for not_authorized
               4 - login failed for password_
            self.startClient(clientId)
            return
        # don't add blocking operation in event
    def on_log(self, client, userdata, level, buf):
        # print("log:{}".format(buf))
    def subscribeE(self, topic, qos):
        if self.isConnected:
            self.clientConn.subscribe(topic, qos)
        else:
            print("the connection is broken, subscribeE swallowed")
    def publishE(self, topic, payload, qos, retain):
        if self.isConnected:
            self.clientConn.publish(topic, payload, qos, retain)
            print('publish==========================')
        else:
            print("the connection is broken, publishE swallowed")
def mqttMsgRecvCB(client, userdata, msg):
    try:
        mqttMsg = json.loads(msg.payload.decode('utf-8'))
        print('recv mqttMsg:{}'.format(mqttMsg))
    except Exception as e:
        print("mqttMsgRecvCB:{}".format(e))
def connectedCB(client, userdata, topic):
    sub_topic1 = topic
    client.subscribe(sub_topic1)
    print('connectedCB subscribe ==> {}'.format(sub_topic1))
if __name__ == '__main__':
    brokerIP = '127.0.0.1'
    brokerPort = 8883
    clientId = 'mq_test_ca'
    user = 'mq_test_ca'
    pwd = '123456'
    basedir = os.path.abspath(os.path.dirname(__file__)).replace('\\', '/')
    certPath = basedir + '/cacert.pem'
    topic = '/demo/test/ca'
    mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd, certPath)
    mqttClient = SimpleMqttClientCallElevTest(mqCI, clientId)
    mqttClient.setCallback(functools.partial(connectedCB, topic=topic), None, mqttMsgRecvCB)
    mqttClient.setKeepAlive(30)
    mqttClient.start()
    while 1:
        try:
            time.sleep(1)
            cmd = {"sn": 10086, "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "f": 3, "p1": 120, }
            cmd_s = json.dumps(cmd)
            mqttClient.publishE(topic, payload=cmd_s, qos=0, retain=False)
        except Exception as e:
            print(e)