相关文章推荐
打篮球的自行车  ·  氧气瓶_百度百科·  1 年前    · 
聪明的手电筒  ·  SpringBatch从入门到精通-5 ...·  2 年前    · 
儒雅的炒饭  ·  数组,图片,二进制数据互转_cv2 ...·  2 年前    · 
文雅的移动电源  ·  URLencode 特殊字符 转义 ...·  2 年前    · 
非常酷的啄木鸟  ·  断桥余生_热点_新京报电子报·  2 年前    · 
Code  ›  Mqtt C实现记录,流程分析_mqtt-c_bossanovo的博客
char mosquitto char函数 mqtt
https://blog.csdn.net/u011228598/article/details/86570035
没有腹肌的野马
2 年前
    • 1. 访问模式
    • 2. 主要数据结构
      • 1. MQTTAsync
      • 2. MQTTAsyncs
      • 3. MQTTAsync_message
      • 4. MQTTAsync_responseOptions
      • 5. List
    • 2. 流程
      • 1. 订阅
      • 2. 发布
    • 3. Mqttc信令发送流程(以Connect为例)
      • 1. 信令入队
      • 2. 信令出队
    • 4. Mqtt C消息接受流程

    1. 访问模式

    Mqtt C中分为 同步访问 和 异步访问 模式

    • 同步访问的时候,请求会阻塞到访问处,知道有结果返回;
    • 异步访问的时候,会将请求委托给Mqtt c client然后直接返回(零等待),最后结果返回之后,会回调对应的回调函数。
    • 文章主要讲异步模式

    2. 主要数据结构

    1. MQTTAsync

    • MQTTAsync 是一个void*类型的指针,表示一个 Mqtt client(context) 句柄, 即 MQTTAsyncs
    * A handle representing an MQTT client. A valid client handle is available * following a successful call to MQTTAsync_create(). typedef void * MQTTAsync ;

    2. MQTTAsyncs

    用于封装请求的信息,包括URL,端口,回调函数等访问参数

    typedef struct MQTTAsync_struct
    	char* serverURI; //服务器url
    	int ssl;
    	int websocket;
    	Clients* c;
    	/* "Global", to the client, callback definitions */
    	//连接失效的回调函数
        //函数原型: typedef void MQTTAsync_connectionLost(void* context, char* cause);
    	MQTTAsync_connectionLost* cl; 
        //消息到来的回调函数
        //原型:typedef int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen,
        // MQTTAsync_message* message);
    	MQTTAsync_messageArrived* ma; 
        //消息发送成功的回调通知函数
        //原型:typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);
    	MQTTAsync_deliveryComplete* dc; 
    	void* clContext; /* the context to be associated with the conn lost callback*/
    	void* maContext; /* the context to be associated with the msg arrived callback*/
    	void* dcContext; /* the context to be associated with the deliv complete callback*/
        //连接成功的回调函数
        //原型:typedef void MQTTAsync_connected(void* context, char* cause);
    	MQTTAsync_connected* connected;
    	void* connected_context; /* the context to be associated with the connected callback*/
        //连接断开的回调函数
    	//原型:typedef void MQTTAsync_disconnected(void* context, MQTTProperties* properties,
    		//enum MQTTReasonCodes reasonCode);
    	MQTTAsync_disconnected* disconnected; 
    	void* disconnected_context; /* the context to be associated with the disconnected callback*/
    	/* Each time connect is called, we store the options that were used.  These are reused in
    	   any call to reconnect, or an automatic reconnect attempt */
    	MQTTAsync_command  connect;		/* Connect operation properties */
    	MQTTAsync_command disconnect;		/* Disconnect operation properties */
    	MQTTAsync_command* pending_write;       /* Is there a socket write pending? */
    	List* responses;
    	unsigned int command_seqno;
    	MQTTPacket* pack;
    	/* added for offline buffering */
    	MQTTAsync_createOptions* createOptions;
    	int shouldBeConnected;
    	/* added for automatic reconnect */
    	int automaticReconnect;
    	int minRetryInterval;
    	int maxRetryInterval;
    	int serverURIcount;
    	char** serverURIs;
    	int connectTimeout;
    	int currentInterval;
    	START_TIME_TYPE lastConnectionFailedTime;
    	int retrying;
    	int reconnectNow;
    	/* MQTT V5 properties */
    	MQTTProperties* connectProps;
    	MQTTProperties* willProps;
    } MQTTAsyncs;
    

    3. MQTTAsync_message

    Mqtt的Message结构体

    typedef struct
    	/** The eyecatcher for this structure.  must be MQTM. */
    	char struct_id[4];
    	/** The version number of this structure.  Must be 0 or 1.
    	 *  0 indicates no message properties */
    	int struct_version;
    	/** The length of the MQTT message payload in bytes. */
    	int payloadlen;
    	/** A pointer to the payload of the MQTT message. */
    	void* payload;
         * The quality of service (QoS) assigned to the message.
         * There are three levels of QoS:
         * <DT><B>QoS0</B></DT>
         * <DD>Fire and forget - the message may not be delivered</DD>
         * <DT><B>QoS1</B></DT>
         * <DD>At least once - the message will be delivered, but may be
         * delivered more than once in some circumstances.</DD>
         * <DT><B>QoS2</B></DT>
         * <DD>Once and one only - the message will be delivered exactly once.</DD>
         * </DL>
    	int qos;
         * The retained flag serves two purposes depending on whether the message
         * it is associated with is being published or received.
         * <b>retained = true</b><br>
         * For messages being published, a true setting indicates that the MQTT
         * server should retain a copy of the message. The message will then be
         * transmitted to new subscribers to a topic that matches the message topic.
         * For subscribers registering a new subscription, the flag being true
         * indicates that the received message is not a new one, but one that has
         * been retained by the MQTT server.
         * <b>retained = false</b> <br>
         * For publishers, this indicates that this message should not be retained
         * by the MQTT server. For subscribers, a false setting indicates this is
         * a normal message, received as a result of it being published to the
         * server.
    	int retained;
          * The dup flag indicates whether or not this message is a duplicate.
          * It is only meaningful when receiving QoS1 messages. When true, the
          * client application should take appropriate action to deal with the
          * duplicate message.
    	int dup;
    	/** The message identifier is normally reserved for internal use by the
          * MQTT client and server.
    	int msgid;
    	 * The MQTT V5 properties associated with the message.
    	MQTTProperties properties;
    } MQTTAsync_message;
    

    4. MQTTAsync_responseOptions

    • MQTTAsync_responseOptions表示每次操作的响应配置,包括 操作成功,操作失败 等操作;
    • 同类的还有:
    • MQTTAsync_disconnectOptions : 断开连接选项
    • MQTTAsync_connectOptions: 连接选项
    • MQTTAsync_init_options:初始化选项 等等
    typedef struct MQTTAsync_responseOptions
    	/** The eyecatcher for this structure.  Must be MQTR */
    	char struct_id[4];
    	/** The version number of this structure.  Must be 0 or 1
    	 *   if 0, no MQTTV5 options */
    	int struct_version;
        * A pointer to a callback function to be called if the API call successfully
        * completes.  Can be set to NULL, in which case no indication of successful
        * completion will be received.
    	MQTTAsync_onSuccess* onSuccess;
        * A pointer to a callback function to be called if the API call fails.
        * Can be set to NULL, in which case no indication of unsuccessful
        * completion will be received.
    	MQTTAsync_onFailure* onFailure;
        * A pointer to any application-specific context. The
        * the <i>context</i> pointer is passed to success or failure callback functions to
        * provide access to the context information in the callback.
    	void* context;
        * A token is returned from the call.  It can be used to track
        * the state of this request, both in the callbacks and in future calls
        * such as ::MQTTAsync_waitForCompletion.
    	MQTTAsync_token token;
        * A pointer to a callback function to be called if the API call successfully
        * completes.  Can be set to NULL, in which case no indication of successful
        * completion will be received.
    	MQTTAsync_onSuccess5* onSuccess5;
        * A pointer to a callback function to be called if the API call successfully
        * completes.  Can be set to NULL, in which case no indication of successful
        * completion will be received.
    	MQTTAsync_onFailure5* onFailure5;
    	 * MQTT V5 input properties
    	MQTTProperties properties;
    	 * MQTT V5 subscribe options, when used with subscribe only.
    	MQTTSubscribe_options subscribeOptions;
    	 * MQTT V5 subscribe option count, when used with subscribeMany only.
    	 * The number of entries in the subscribe_options_list array.
    	int subscribeOptionsCount;
    	 * MQTT V5 subscribe option array, when used with subscribeMany only.
    	MQTTSubscribe_options* subscribeOptionsList;
    } MQTTAsync_responseOptions;
    

    5. List

    List是一个链表, Mqttc中的commands信令队列就是用该链表存储/获取的

    • static List* commands = NULL; //Commands信令队列;
    • static List* handles = NULL; //clients队列,记录所有的client(MQTTAsync)对象;
    * Structure to hold all data for one list typedef struct ListElement *first, /**< first element in the list */ *last, /**< last element in the list */ *current; /**< current element in the list, for iteration */ int count; /**< no of items */ size_t size; /**< heap storage used */ } List;

    2. 流程

    • Mqtt c的同步没什么可说的,类似于调用一个RPC接口一样的,主要看异步请求的流程
    • Mqtt C实现的src/samples文件夹下有异步的subscribe和publish的例子,从这里可以作为切入点来看MQTTC
      的请求流程

    1. 订阅

    1. MQTTAsync_subscribe.c
    //MQTTAsync_subscribe.c
    //main()
    int main(int argc, char* argv[])
        //[*1] 创建一个MQTTAsync的handle
    	MQTTAsync client; 
    	//设置连接选项参数,这里默认给了初始化的参数,mqtt-c自己提供的
    	MQTTAsync_connectOptions conn_opts =
    
    
    
    
        
     MQTTAsync_connectOptions_initializer;//[*2]
        //设置断开连接选项参数,这里默认给了初始化的参数,mqtt-c自己提供的
    	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;//[*3s]
    	int rc;
    	int ch;
        //真正创建一个MQTTAsync 的结构体(这里其实是MQTTAsyncs结构体,下面解释)
    	MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        //设置回调函数
    	MQTTAsync_setCallbacks(
    	client/*MQTTAsync handle*/,
    	client/*context*/,
    	connlost/*MQTTAsync_connectionLost*/,
    	msgarrvd/*MQTTAsync_messageArrived*/,
    	NULL/*MQTTAsync_deliveryComplete*/);
    	conn_opts.keepAliveInterval = 20; //心跳间隔
    	conn_opts.cleansession = 1; //在connect/disconnect的时候,是否清除上一个连接的seesion状态信息缓存
    	                           //(重启一个新的)。这个session状态用来确认消息质量保证(至少一次或者只有一次)
    	conn_opts.onSuccess = onConnect; //连接成功的回调函数
    	conn_opts.onFailure = onConnectFailure; //连接失败的回调函数
    	conn_opts.context = client; //上下文对象,即MQTTAsync
    	//开始请求连接
    	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    		printf("Failed to start connect, return code %d\n", rc);
    		exit(EXIT_FAILURE);
    	while	(!subscribed)
    		#if defined(WIN32)
    			Sleep(100);
    		#else
    		    //等待10ms, 一定时间内确保subscribe动作完成
    			usleep(10000L);
    		#endif
    	if (finished)
    		goto exit;
    		ch = getchar();
    	} while (ch!='Q' && ch != 'q');
    	disc_opts.onSuccess = onDisconnect;
    	if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
    		printf("Failed to start disconnect, return code %d\n", rc);
    		exit(EXIT_FAILURE);
     	while	(!disc_finished)
    		#if defined(WIN32)
    			Sleep(100);
    		#else
    			usleep(10000L);
    		#endif
    exit:
    	MQTTAsync_destroy(&client);
     	return rc;
    //--------------------------------------------------------------------------------------
    //连接成功回调
    void onConnect(void* context, MQTTAsync_successData* response)
    	MQTTAsync client = (MQTTAsync)context;
    	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;//获取默认的Option
    	int rc;
    	printf("Successful connection\n");
    	printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
               "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
    	opts.onSuccess = onSubscribe; //订阅成功时回调
    	                             //原型:typedef void MQTTAsync_onSuccess(void* context, 
    	                                  //MQTTAsync_successData* response);
    	opts.onFailure = onSubscribeFailure; //订阅失败时回调
    	                                     //原型:typedef void MQTTAsync_onFailure(void* context, 
    	                                         // MQTTAsync_failureData* response);
    	opts.context = client;
    	deliveredtoken = 0;
        //开始订阅
    	if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
    		printf("Failed to start subscribe, return code %d\n", rc);
    		exit(EXIT_FAILURE);
    //--------------------------------------------------------------------------------------
    //订阅成功
    void onSubscribe(void* context, MQTTAsync_successData* response)
    	printf("Subscribe succeeded\n");
    	subscribed = 1; //标记订阅成功
    
    1. MQTTAsync_create
    int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
    		int persistence_type, void* persistence_context)
    	return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
    		persistence_context, NULL);
    //--------------------------------------------------------------------------------------
    int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
    		int persistence_type, void* persistence_context,  MQTTAsync_createOptions* options)
        int rc = 0;
    	MQTTAsyncs *m = NULL;
    	...
    	//初始化全局对象
    	if (!global_initialized)
    		#if defined(HEAP_H)
    			Heap_initialize();
    		#endif
    		Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
    		bstate->clients = ListInitialize(); // 初始化ClientStates列表
    		Socket_outInitialize();
    		Socket_setWriteCompleteCallback(MQTTAsync_writeComplete); //Socket写入完成回调
    		handles = ListInitialize(); // 创建handles队列
    		commands = ListInitialize(); //command列表,代表mqtt信令请求的队列
    	...
    	m = malloc(sizeof(MQTTAsyncs));
    
    
    
    
        
    
    	*handle = m;//为handle申请内存,解释类型为MQTTAsyncs
        memset(m, '\0', sizeof(MQTTAsyncs));
        //为m设置内容
        m->serverURI = MQTTStrdup(serverURI);
    	m->responses = ListInitialize();
    	//添加该handle到handles链表队列中
    	ListAppend(handles, m, sizeof(MQTTAsyncs));
    	m->c = malloc(sizeof(Clients));
    	memset(m->c, '\0', sizeof(Clients));
    	m->c->context = m;
    	m->c->outboundMsgs = ListInitialize();
    	m->c->inboundMsgs = ListInitialize();
    	m->c->messageQueue = ListInitialize();
    	m->c->clientID = MQTTStrdup(clientId);
    	m->c->MQTTVersion = MQTTVERSION_DEFAULT;
    	m->shouldBeConnected = 0;
    	if (options)
    		m->createOptions = malloc(sizeof(MQTTAsync_createOptions));
    		memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
    		m->c->MQTTVersion = options->MQTTVersion;
    	...
    	//将新建的client,添加到ClientStates列表中
    	ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
       //m代替handle,设置完成了所有请求的参数,MqttAsync对象创建完毕(MqttAsyncs)
    

    Subscribe的过程: 创建连接的参数 -> 然后连接 -> 连接成功 -> 订阅 -> 订阅成功 -> 标记订阅状态(成功);

    2. 发布

    //MQTTAsync_publish.c
    //main()
    //流程和Subscribe一样,在main中主要是connect操作,请求mqtt服务器连接
    int main(int argc, char* argv[])
    	MQTTAsync client;
    	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    	int rc;
    	MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    	MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);
    	conn_opts.keepAliveInterval = 20;
    	conn_opts.cleansession = 1;
    	conn_opts.onSuccess = onConnect;
    	conn_opts.onFailure = onConnectFailure;
    	conn_opts.context = client;
    	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    		printf("Failed to start connect, return code %d\n", rc);
    		exit(EXIT_FAILURE);
    	printf("Waiting for publication of %s\n"
             "on topic %s for client with ClientID: %s\n",
             PAYLOAD, TOPIC, CLIENTID);
    	while (!finished)
    		#if defined(WIN32)
    			Sleep(100);
    		#else
    			usleep(10000L);
    		#endif
    	MQTTAsync_destroy(&client);
     	return rc;
    //--------------------------------------------------------------------------------------
    //连接成功
    void onConnect(void* context, MQTTAsync_successData* response)
    	MQTTAsync client = (MQTTAsync)context;
    	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
        //创建要发送的Message MQTTAsync_message
    	MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
    	int rc;
    	printf("Successful connection\n");
    	//发送成功的回调
    	opts.onSuccess = onSend;
    	opts.context = client;
        //设置发送内容(payload)
    	pubmsg.payload = PAYLOAD;
    	pubmsg.payloadlen = (int)strlen(PAYLOAD);
    	pubmsg.qos = QOS;//设置质量等级保证
    	pubmsg.retained = 0;
    	deliveredtoken = 0;
    	if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
    		printf("Failed to start sendMessage, return code %d\n", rc);
    		exit(EXIT_FAILURE);
    //--------------------------------------------------------------------------------------
    //发送成功(该用例中发送一次成功后直接断开了连接)
    void onSend(void* context, MQTTAsync_successData* response)
    	MQTTAsync client = (MQTTAsync)context;
    	//创建断开连接选项
    	MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
    	int rc;
    	printf("Message with token value %d delivery confirmed\n", response->token);
    	opts.onSuccess = onDisconnect; //断开连接成功
    	opts.context = client;
        //断开连接
    	if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
    		printf("Failed to start sendMessage, return code %d\n", rc);
    		exit(EXIT_FAILURE);
    

    3. Mqttc信令发送流程(以Connect为例)

    1. 信令入队

    //MQTTAsync.c
    
    
    
    
        
    
    int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
    	MQTTAsyncs* m = handle; //将void* 转换为MQTTAsyncs* 准备填充数据
    	int rc = MQTTASYNC_SUCCESS;
    	MQTTAsync_queuedCommand* conn; //执行单位, 这里代表连接
    	...
    	//将options的内容填充给m->connect
    	m->connect.onSuccess = options->onSuccess;
    	m->connect.onFailure = options->onFailure;
    	...
    	if (sendThread_state != STARTING && sendThread_state != RUNNING)
    		MQTTAsync_lock_mutex(mqttasync_mutex);
    		sendThread_state = STARTING;
    		//启动信令发送线程
    		Thread_start(MQTTAsync_sendThread, NULL);
    		MQTTAsync_unlock_mutex(mqttasync_mutex);
    	if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
    		MQTTAsync_lock_mutex(mqttasync_mutex);
    		receiveThread_state = STARTING;
    		//启动信令接收线程
    		Thread_start(MQTTAsync_receiveThread, handle);
    		MQTTAsync_unlock_mutex(mqttasync_mutex);
    	...
    	//将options的内容填充给m->c(Clients)
    	m->c->keepAliveInterval = options->keepAliveInterval;
    	...
    	//填充m的其他参数
    	...
    	//为MQTTAsync_queuedCommand申请内存,初始化并将m赋值给conn->client;
    	conn = malloc(sizeof(MQTTAsync_queuedCommand));
    	memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
    	conn->client = m;
    	//将options的内容(回调等)赋值给conn->command;
        if (options)
    		conn->command.onSuccess = options->onSuccess;
    		conn->command.onFailure = options->onFailure;
    		conn->command.onSuccess5 = options->onSuccess5;
    		conn->command.onFailure5 = options->onFailure5;
    		conn->command.context = options->context;
    	conn->command.type = CONNECT;
    	conn->command.details.conn.currentURI = 0;
        //将conn入队到commands列表(MQTTAsync_createWithOptions中初始化的全局列表)
    	rc = MQTTAsync_addCommand(conn, sizeof(conn));
    	...
    //--------------------------------------------------------------------------------------
    static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
        int rc = 0;
        ...
        if ((command->command.type != CONNECT) || (command->client->c->connect_state == NOT_IN_PROGRESS))
    		command->command.start_time = MQTTAsync_start_clock();
    	if (command->command.type == CONNECT ||
    		(command->command.type == DISCONNECT && command->command.details.dis.internal))
    	{//CONNECT  / DISCONNECT 的情况下
    		MQTTAsync_queuedCommand* head = NULL;
    		if (commands->first)
    			head = (MQTTAsync_queuedCommand*)(commands->first->content);
    		if (head != NULL && head->client == command->client && head->command.type == command->command.type)
    			MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */
    		    //对于Connect或者Disconnect优先处理, 将command插入到指定的index上,这里是插入到队头;
    			ListInsert(commands, command, command_size, commands->first); /* add to the head of the list */
    	    //非Connect的情况下,将command续接到队尾;
    		ListAppend(commands, command, command_size);
    		...
    	...
    	//通知发送线程接触等待状态,开始处理发送队列中的信令给Mqtt服务器
    	// 这里对应唤醒的等待线程是MQTTAsync_sendThread
    	rc = Thread_signal_cond(send_cond);
    	...
    

    2. 信令出队

    在入队完毕之后,会唤醒MQTTAsync_sendThread并解除等待状态

    //MQTTAsync.c
    static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
    	...
    	while (!tostop)
    		int rc;
    		while (commands->count > 0)
    		    //循环处理commands队列中的信令,处理完成后,跳出循环等待,一直到队列有内容,会被唤醒并继续处理
    			if (MQTTAsync_processCommand() == 0)
    				break;  /* no commands were processed, so go into a wait */
    ...
            //队列为空之后,等待...
    		if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
    			Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
    ...
    	...
    	return 0;
    

    MQTTAsync_processCommand 函数中,就是从队列获取一个command信令发送给MQTT服务器,然后返回,并继续循环处理下一个,直到队列为空,返回0;

    4. Mqtt C消息接受流程

    接受流程在MQTTAsync_receiveThread 线程中, 不断的轮询, 以linux的selector 作为监听, 查询是否有已经准备好的消息的socket(有内容写入),然后解析, 分发接收到的信息.

    文章目录1. 访问模式2. 主要数据结构1. MQTTAsyncs2. 流程1. 访问模式Mqtt C中分为同步访问和异步访问模式同步访问的时候,请求会阻塞到访问处,知道有结果返回;异步访问的时候,会将请求委托给Mqtt c client然后直接返回(零等待),最后结果返回之后,会回调对应的回调函数。2. 主要数据结构1. MQTTAsyncs用于封装请求的信息,包括URL,端...
    最近做了个中移物联平台的项目,设备需要用到MQTTS来完成通讯,所以使用到了paho-mqtt-c的开源的MQTT协议栈库。 https://www.eclipse.org/paho/index.php?page=downloads.php 非常全,各种语言的源码都有,根据自己需求下载,我这里需要的是C/C++的协议栈。 由于我只需要做一个客户端,做的又是嵌入式设备,源码太多也没用,就精简了一下。 实际上只用了几个重要的 MQTTClient.h MQTTConnect.h MQTTFormat.h mqtt client在connect Broker成功后往往紧接着subscribe topics。 IDA反编译了一段mqtt client,找到subscribe部分的伪代码,力图复现代码里真实subscribe内容。 IDA 相关内容如下 # path to compiler and utilities # specify the cross compiler SET(CMAKE_C_COMPILER aarch64-himix200-linux-gcc) # Name of the target platform SET(CMAKE_SYSTEM_NAME Linux) SET(CMAKE_SYSTEM_PROCE
    MQTT客户端实现(嵌入式) 最新最容易使用的mqtt封装。 MQTT客户端实现(使用Eclipse Paho C库,进行了二次封装)。 在Linkit7688单片机上运行测试通过,附例程。 https://download.csdn.net/download/skyformat99/9969954 MQTT 客户端C语言编好的库和例子(example) https://downlo...
    paho.mqtt.c 是Eclipse编写的开源mqtt c库,支持Posix标准操作系统(如Linux,Android,Mac)和windows操作系统。 Paho MQTT C客户端支持全部MQTT协议客户端特性,它使用ANSI标准C编写。 实际上这个库提供两套API,分别是"同步"的MQTTClient和“异步”的MQTTAsync,同步API目的是更加简单、更加有用的。为了达到这个目的,部分操作将被阻塞,直到这个操作完成,这样程序的框架更加简单。 相反,在异步模式中只有一个调用会..
    MQTT介绍 MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。 开放消息协议,简单易实现 发布订阅模式,一对多消息发布 基于TCP/IP网络连接 1字节固定报头,2字节心跳报文,报文结构紧凑 消息QoS支持,可靠传输保证 MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。 物联网M2M通
    本规范中用到的关键字 必须 MUST, 不能 MUST NOT, 要求 REQUIRED, 将会 SHALL, 不会 SHALL NOT, 应该 SHOULD, 不应该 SHOULD NOT, 推荐 RECOMMENDED, 可以 MAY, 可选 OPTIONAL 都是按照 IETF RFC 2119 [RFC2119] 中的描述解释。
    在paho-mqtt-c提供的demo中,有paho_c_sub.c和paho_c_pub.c两个demo,使用了加密连接(秘钥、证书,用户名和密码)。可以基于这两个demo来进行调试,查看参数如何设置。 1. 首先确保机器上安装了openssl的库。 2. 在cmakelists.txt中,使能sshl和samples, SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries
    mqtt在connectionLost和connectComplete不断切换 是之前连接了mqtt,虽然是执行disconnect了,但再次人为主动重新连接的话就会出现在connectionLost和connectComplete不断切换 。 原因是clientId未变更,建议把clientId设置成带System.currentTimeMillis()的内容,问题解决。
    mqtt 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放,简单,轻量级,且易于实现,这些优点使得他受用于任何环境 该协议的特点有:  使用发布/订阅消息的模式,提供一对多的消息发布,解除应用程序耦合 对负载内容屏蔽的消息传输 使用TCP/IO 提供的网络连接 有三种消息发布服务质量:   "至多一次",消息发布完全依赖底层TCP/IP 网络,会发生消息丢失或者重复,
    ### 回答1: MQTT是一种轻量级的消息传输协议,适用于物联网设备之间的通信。在Linux系统下,可以使用C语言实现MQTT协议。具体实现可以参考MQTT C Client库,该库提供了一些API函数,可以方便地实现MQTT客户端的开发。在使用该库时,需要注意一些细节,如连接服务器、订阅主题、发布消息等。同时,还需要了解MQTT协议的一些基本概念,如QoS、保留消息、遗嘱消息等。总之,使用C语言实现MQTT协议需要一定的技术水平和经验,但是可以为物联网设备的通信提供可靠的支持。 ### 回答2: MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,适用于物联网设备之间的通信。MQTT协议采用“发布/订阅”模式,订阅者可以在发布者发送消息时接收到消息,从而实现设备间的实时通信。 在Linux系统下,可以使用C语言实现MQTT协议。下面是MQTT C语言库的实现步骤: 1. 下载MQTT库 在Linux系统中,可以下载paho.mqtt.c库。在终端中输入以下命令: git clone https://github.com/eclipse/paho.mqtt.c.git 这个库是Eclipse提供的一个MQTT C语言客户端库,可以在Linux系统中实现MQTT协议的通信。 2. 编写代码 可以使用C语言编写MQTT协议通信库。具体实现使用Eclipse的Mosquitto库。 3. MQTT库的程序结构 使用MQTT库必须有一个主程序结构,主要包括初始化连接,连接与订阅等操作。主程序结构如下所示: #include <stdio.h> #include <mosquitto.h> int main() struct mosquitto *mosq = NULL; int rc; mosquitto_lib_init(); mosq = mosquitto_new("subscriber-test", true, NULL); if (mosq) { rc = mosquitto_connect(mosq, "broker.hivemq.com", 1883, 60); if (rc == MOSQ_ERR_SUCCESS) { mosquitto_subscribe(mosq, NULL, "test", 0); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_loop_forever(mosq, -1, 1); } else { printf("Failed to connect, return code %d\n", rc); } else { printf("Failed to create mosquitto client object\n"); mosquitto_lib_cleanup(); return 0; 上述代码中mosquitto_new()函数创建了一个MQTT客户端,并连接到了MQTT服务器broker.hivemq.com。然后使用mosquitto_subscribe()函数订阅了“test”主题,并使用mosquitto_message_callback_set()函数设置回调函数。 4. MQTT的回调函数 当MQTT订阅了消息时,服务器会发送消息到客户端。此时需要使用客户端的回调函数响应MQTT服务器的消息。回调函数如下: void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) printf("New message with data %s\n", (char *)message->payload); mosquitto_disconnect(mosq); 上述代码中的回调函数使用了mosquitto_disconnect()函数断开了MQTT连接。 5. 编译代码 使用gcc编译MQTT代码,如下: gcc -lmosquitto -o subscriber-test subscriber-test.c 6. 运行代码 在终端中运行MQTT代码,如下: ./subscriber-test 通过上面的步骤,就可以在Linux系统中使用C语言实现MQTT协议的通信。 ### 回答3: MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的发布/订阅模式的消息传递协议,旨在实现设备与设备之间的通信。它广泛应用于物联网、传感器网络等领域。而在Linux环境下,可以通过C语言实现MQTT协议。 首先,我们需要安装mqtt库。常用的mqtt库包括libmosquitto、paho等。这里我们以libmosquitto为例进行介绍。在Ubuntu系统下,可以通过以下命令安装: sudo apt-get install libmosquitto-dev 安装完成后,我们可以通过以下的C代码来实现mqtt的发布和订阅: #include <stdio.h> #include <stdlib.h> #include <mosquitto.h> void on_connect(struct mosquitto *mosq, void *obj, int rc) { printf("Connected\n"); void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) { printf("Received message: %s\n", msg->payload); int main (int argc, char *argv[]) { struct mosquitto *mosq = NULL; mosquitto_lib_init(); mosq = mosquitto_new("ClientID", true, NULL); mosquitto_connect_callback_set(mosq, on_connect); mosquitto_message_callback_set(mosq, on_message); mosquitto_connect(mosq, "localhost", 1883, 60); mosquitto_subscribe(mosq, NULL, "topic", 0); mosquitto_publish(mosq, NULL, "topic", strlen("message"), "message", 0, false); mosquitto_loop_forever(mosq, -1, 1); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; 在这里,我们首先通过调用mosquitto_lib_init()函数初始化mqtt库,在mosq = mosquitto_new("ClientID", true, NULL)中创建一个新的mqtt客户端,mosquitto_connect()连接到消息代理服务器。然后,通过调用mosquitto_subscribe()和mosquitto_publish()函数来订阅和发布消息。最后,在mosquitto_loop_forever()函数中开启mqtt客户端的接收循环。 在以上代码中,我们也可以通过on_connect()函数和on_message()函数来处理连接和接收消息的回调。这些回调函数可以很好地帮助我们处理mqtt消息。 需要注意的是,使用C语言实现mqtt时,需要手动处理许多细节,包括消息头的处理,连接和消息超时的处理等。因此,如果您没有足够的基础,建议先熟练掌握mqtt的基本概念和用法,再开始写代码。 总之,通过C语言实现mqtt协议可以为物联网和传感器网络的开发提供更多的灵活性和定制性。虽然C语言的编写难度较大,但是它可以提高代码的效率,并提供更多的控制和自定义功能。
 
推荐文章
打篮球的自行车  ·  氧气瓶_百度百科
1 年前
聪明的手电筒  ·  SpringBatch从入门到精通-5 数据源配置相关 - 知乎
2 年前
儒雅的炒饭  ·  数组,图片,二进制数据互转_cv2 数组转图片_**星光*的博客-CSDN博客
2 年前
文雅的移动电源  ·  URLencode 特殊字符 转义 遇上的坑-阿里云开发者社区
2 年前
非常酷的啄木鸟  ·  断桥余生_热点_新京报电子报
2 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号