3.show code
import datetime
import functools
import json
import os
import ssl
import time
import traceback
import paho.mqtt.client as mqclient
import threading
class MQTTClientInfoX:
def __init__(self, brokerIP: str, brokerPort: int, clientId: str, user: str, pwd: str, certPath: str):
self.brokerIP = brokerIP
self.brokerPort = brokerPort
self.clientId = clientId
self.user = user
self.pwd = pwd
self.certPath = certPath
# FIXME: how to annotation list?
self.clientCfgs = list()
self.eIds = None
def __repr__(self):
# user/password@127.0.0.1:8080, sys_robot, [<XXX: a->1, b->2>]
return "<MQTT {}/{}@{}:{}, {}\n{}\n{}>".format(self.user, self.pwd, self.brokerIP, self.brokerPort,
self.clientId, self.certPath, self.clientCfgs)
class SimpleMqttClientCallElevTest(threading.Thread):
def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId):
threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
self.mqsCI = mqttClientInfo
self.connectedCB = None
self.disconnectedCB = None
self.mqttMsgRecvCB = None
self.keepAliveInterval = None
self.clientId = clientId
self._isConnected = False
self.sn = 1
''' should set before start '''
def setCallback(self, connectedCB, disconnectedCB, mqttMsgRecvCB):
self.connectedCB = connectedCB
self.disconnectedCB = disconnectedCB
self.mqttMsgRecvCB = mqttMsgRecvCB
''' should set before start '''
def setKeepAlive(self, keepAliveInterval):
self.keepAliveInterval = keepAliveInterval
@property
def isConnected(self):
return self._isConnected
def run(self):
# FIXME: if clientId is different, multiple clients will request same topics, so use fixed number: 1
# self.startClient("mqttElevClient_{}_{}_{}".format(MM.TOPIC_KEY_APP, MM.TOPIC_KEY_BUILDING_ID, 1))
self.startClient(self.clientId)
def startClient(self, clientId):
print("starting mqttElevClient...")
self.clientConn = mqclient.Client(client_id=clientId, clean_session=False)
self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
if self.mqsCI.certPath is not None:
print('certPath validation completed', self.mqsCI.certPath)
# self.clientConn.tls_set(ca_certs, certfile, keyfile, cert_reqs, tls_version, ciphers)
if not os.path.exists(self.mqsCI.certPath):
self.mqsCI.certPath = None # ssl.CERT_NONE
# FIXME: local server need a ca_cert, but server side does not need
self.clientConn.tls_set(ca_certs=self.mqsCI.certPath, tls_version=ssl.PROTOCOL_TLSv1_2)
# disables peer verification
self.clientConn.tls_insecure_set(True)
print('certPath is cer', self.mqsCI.certPath)
else:
print("ssl/tls is disabled")
self.clientConn.on_connect = functools.partial(self.on_connect)
self.clientConn.on_message = functools.partial(self.mqttMsgRecvCB)
self.clientConn.on_disconnect = functools.partial(self.on_disconnect)
self.clientConn.on_log = functools.partial(self.on_log)
keepAliveTime = 30 if self.keepAliveInterval is None or self.keepAliveInterval < 0 else self.keepAliveInterval
# -----------------------------------------------------------------------------
while True:
try:
print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
self.clientConn.connect(self.mqsCI.brokerIP, self.mqsCI.brokerPort, keepAliveTime)
break
except Exception as e:
# traceback.print_exc()
print('startClient :{}'.format(e))
time.sleep(5)
self.clientConn.loop_forever()
def on_disconnect(self, client, userdata, rc):
print("connection to MQTT broker is broken, {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
if self.disconnectedCB is not None:
try:
self.disconnectedCB(client, userdata)
except:
traceback.print_exc()
self._isConnected = False
''' connection event listener '''
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("============================================")
print("MQTT broker {}:{} - connected !!!".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
print("============================================")
if self.connectedCB is not None:
try:
self.connectedCB(client, userdata)
except:
traceback.print_exc()
self._isConnected = True
else:
print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
print(
"MQTT broker {}:{} - Failed to connect, rc->{} !!!".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort,
rc))
print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
5 - login failed for not_authorized
4 - login failed for password_
self.startClient(clientId)
return
# don't add blocking operation in event
def on_log(self, client, userdata, level, buf):
# print("log:{}".format(buf))
def subscribeE(self, topic, qos):
if self.isConnected:
self.clientConn.subscribe(topic, qos)
else:
print("the connection is broken, subscribeE swallowed")
def publishE(self, topic, payload, qos, retain):
if self.isConnected:
self.clientConn.publish(topic, payload, qos, retain)
print('publish==========================')
else:
print("the connection is broken, publishE swallowed")
def mqttMsgRecvCB(client, userdata, msg):
try:
mqttMsg = json.loads(msg.payload.decode('utf-8'))
print('recv mqttMsg:{}'.format(mqttMsg))
except Exception as e:
print("mqttMsgRecvCB:{}".format(e))
def connectedCB(client, userdata, topic):
sub_topic1 = topic
client.subscribe(sub_topic1)
print('connectedCB subscribe ==> {}'.format(sub_topic1))
if __name__ == '__main__':
brokerIP = '127.0.0.1'
brokerPort = 8883
clientId = 'mq_test_ca'
user = 'mq_test_ca'
pwd = '123456'
basedir = os.path.abspath(os.path.dirname(__file__)).replace('\\', '/')
certPath = basedir + '/cacert.pem'
topic = '/demo/test/ca'
mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd, certPath)
mqttClient = SimpleMqttClientCallElevTest(mqCI, clientId)
mqttClient.setCallback(functools.partial(connectedCB, topic=topic), None, mqttMsgRecvCB)
mqttClient.setKeepAlive(30)
mqttClient.start()
while 1:
try:
time.sleep(1)
cmd = {"sn": 10086, "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "f": 3, "p1": 120, }
cmd_s = json.dumps(cmd)
mqttClient.publishE(topic, payload=cmd_s, qos=0, retain=False)
except Exception as e:
print(e)