MQTTnet的C#版的开源MQTT通讯库,支持MQTT Server和Client,并提供各种类型的连接方法Demo。
MQTTnet库3.1升级到4.0,并不完全兼容,在连接方式构建、事件订阅等方面需要修改。
2、MQTTnet 3.1 Client编程
using部分
using MQTTnet;
using MQTTnet.Client.Options;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Publishing;
using MQTTnet.Exceptions;
连接与事件订阅
MqttClient mqttClient;
private async Task MqttClientStart()
var mqttFactory = new MqttFactory();
var options = new MqttClientOptions
ClientId = "clientid_pascalming",
ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
Server = "127.0.0.1",
Port = 1883
if (options.ChannelOptions == null)
throw new InvalidOperationException();
options.Credentials = new MqttClientCredentials
Username = "mqttusername",
Password = "mqttpassword"
options.CleanSession = false;
//心跳检测时间,单位:秒
options.KeepAlivePeriod = TimeSpan.FromSeconds(30);
mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
await mqttClient.ConnectAsync(options);
Console.WriteLine($"客户端[{options.ClientId}]尝试连接...");
catch (Exception ex)
Console.WriteLine($"客户端尝试连接出错.>{ex.Message}");
private async Task ClientStop()
if (mqttClient == null) return;
await mqttClient.DisconnectAsync();
mqttClient = null;
catch (Exception ex)
Console.WriteLine($"客户端尝试断开Server出错. {ex.Message}");
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs obj)
Console.WriteLine($"OnSubscriberMessageReceived..");
private void OnMqttClientDisConnected(MqttClientDisconnectedEventArgs obj)
Console.WriteLine($"Mqtt Client DisConnected.");
private void OnMqttClientConnected(MqttClientConnectedEventArgs obj)
Console.WriteLine($"Mqtt Client Connected.");
//质量:AtMostOnce(0,最多一次)/AtLeastOnce(1,最少一次)/ExactlyOnce(2,只一次)
string payload = "hello,from pascalming!";
Task<MqttClientPublishResult> task = mqttClient.PublishAsync("/topic/pascalming/v1",payload,MqttQualityOfServiceLevel.AtLeastOnce,true);
task.Wait();
3、MQTTnet 4.0 Client编程
using部分
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
连接与事件订阅
private IMqttClient mqttClient;
private void MqttConnectAsync()
mqttIsConnected = false;
var mqttFactory = new MqttFactory();
//使用Build构建
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 1883)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId("clientid_pascalming")
.WithCleanSession(false)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
.WithCredentials("user", "password")
.Build();
mqttClient = mqttFactory.CreateMqttClient();
//与3.1对比,事件订阅名称和接口已经变化
mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
Task task = mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
task.Wait();
catch (Exception ex)
Console.WriteLine($"Mqtt客户端尝试连接出错:{ex.Message}");
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
return Task.CompletedTask;
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
Console.WriteLine($"Mqtt客户端连接成功.");
return Task.CompletedTask;
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
Console.WriteLine($"Mqtt客户端连接断开");
return Task.CompletedTask;
string payload = "hello,from pascalming!";
string mqttTopic="/topic/pascalming/v1";
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(mqttTopic)
.WithPayload(payload)
.Build();
Task<MqttClientPublishResult> task = mqttClient.PublishAsync(applicationMessage);
task.Wait();
4、并发执行与短暂断开数据不丢失处理
客户端上报数据,上面的示例使用的是等待执行模式,也可以改为异步执行提高并发度。数据的计数需要使用安全的Interlocked.Increment和Interlocked.Increment执行。
数据上报后只有两种情况:执行成功或出错,这个需要等待异步执行结果的反馈。这是可以用计数器控制发送数据、执行成功和出错的计数,并控制这个差值的窗口大小。
上报出错的数据,需要保存,然后再次重连后重新上报。
5、参考资料
MQTTnet的Demo示例 dotnet/MQTTnet · GitHub
基于MQTTnet 3.0.12实现MQTT服务器和客户端
MQTTnet的C#版的开源MQTT通讯库,支持MQTTServer和Client,并提供各种类型的连接方法Demo。MQTTnet库3.1升级到4.0,并不完全兼容,在连接方式构建、事件订阅等方面需要修改。
WithAtLeastOnceQoS()//至少一次。.WithAtLeastOnceQoS()//至少一次。/// level 2: 只有一次的传输。/// level 0:最多一次的传输。/// level 1:至少一次的传输。
C#如何MQTTnet实现MQTT协议操作设备
前言: MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。
开发环境:
VS2022 + .NET 6 + Webapi /...
(1)修改etc/acl.conf
设置allow所有用户订阅$SYS/brokers/+/clients/#主题
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{allow, all, subscribe...
“c”这个字母可以有多种含义和功能。以下是几个常见的解释:
1. “C”作为英文字母中的一个字母,它是拉丁字母中的第三个字母。字母“C”在英语中有不同的发音,可以是/k/音或/s/音。它在单词中经常出现,比如cat(猫)、car(汽车)和city(城市)等等。
2. “C”也是罗马数字中的一个表示数字的符号,表示数字100。例如,CC表示200。
3. 在音乐中,“C”是西洋音乐中的一个音符。在标准调式中,C是最低音,同时也是所有其他音符的基准音。C音的频率为每秒约261.63次振动。
4. “C”还可以表示一种编程语言:C语言。C语言是一种广泛应用于计算机编程的高级编程语言,它简洁、高效,并且具有强大的编程功能。
5. 在科学中,“C”可以表示摄氏度的单位,它是用来测量温度的一种常用度量单位。
总而言之,“C”是一个多功能的字母,它在不同领域有不同的用途和含义。无论是作为一个字母、数字、音符、编程语言,还是单位,它都扮演着重要的角色。
### 回答2:
关于"c"的问题,需要更具体的指导才能提供准确的回答。因为"c"可以是许多不同的意思,例如:
1. C语言:C语言是一种通用的计算机编程语言,由Dennis M. Ritchie在20世纪70年代开发。它被广泛用于开发操作系统、嵌入式系统和其他应用程序。C语言以其简洁、高效和强大的功能而闻名,并成为许多其他编程语言的基础。
2. C++:C++是一种面向对象的编程语言,是C语言的扩展。它在C语言的基础上加入了许多新的特性和功能,包括类、继承、多态等。C++被广泛应用于游戏开发、图形界面设计和高性能计算领域。
3. 合同:在法律上,字母"C"可以代表合同。合同是一种协议,用于明确双方之间的权利和义务。合同可以是书面的、口头的或默示的,用于规范各种交易和合作关系。
4. 钢琴键盘上的C:在音乐中,字母"C"代表钢琴键盘上的一个音符。C是音乐的基础音符之一,也是调音的参考音。
这仅仅是"c"的一些可能解释,如果您有其他特定的问题,欢迎继续提问。
CSDN-Ada助手:
C# MQTTnet 3.1升级到MQTTnet 4.0 Client编程变化
wenlong0601: