1 什么是MQTT?
mqtt
(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。MQTT 是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为 OASIS 规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的 IoT 场景。
2 MQTTnet
MQTTnet
是一个基于MQTT协议高度专业的.net库,它同时提供MQTT client和MQTT server(broke),支持v3.1.0,v3.1.1和v5.0.0的标准MQTT协议.
3 MQTTnet支持范围
.Net Standard 1.3+
.Net Core 1.1+
.Net Core App 1.1+
.Net Framework 4.5.2+(x86,x64,AnyCPU)
Mono 5.2+
Universal Windows Platform(UWP) 10.0.1024+(x86,x64,ARM,AnyCPU,Windwos 10 IoT Core)
Xamarin.Android 7.5+
Xamarin.iOS 10.14+
4 创建服务器
MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
(1)接受来自客户的网络连接;
(2)接受客户发布的应用信息;
(3)处理来自客户端的订阅和退订请求;
(4)向订阅的客户转发应用程序消息。
服务器创建一个控制台应用,可选>>控制台应用(.NET Core)创建新项目MqttNetServer,代码如下:
1 sing MQTTnet;
2 using MQTTnet.Protocol;
3 using MQTTnet.Server;
4 using Newtonsoft.Json;
5 using System;
6 using System.Collections.Generic;
7 using System.IO;
8 using System.Reflection;
9 using System.Security.Cryptography.X509Certificates;
10 using System.Text;
11 using System.Threading.Tasks;
13 namespace MqttServerTest
14 {
15 class Program
16 {
17 public static IMqttServer mqttServer;
18 static void Main(string[] args)
19 {
20 mqttServer = new MQTTnet.MqttFactory().CreateMqttServer();
21 mqttServer.UseClientConnectedHandler(e =>
22 {
23 Console.WriteLine("***new connect:" + e.ClientId);
25 });
26 mqttServer.UseClientDisconnectedHandler(e =>
27 {
28 Console.WriteLine("*** disconnect:" + e.ClientId);
29 });
31 //var options = new MqttServerOptions();
32 //await mqttServer.StartAsync(options);
34 //var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
35 //var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"), "yourPassword", X509KeyStorageFlags.Exportable);
38 var optionsBuilder = new MqttServerOptionsBuilder()
39 .WithConnectionBacklog(100)
40 .WithDefaultEndpointPort(1884)
41 .WithConnectionValidator(c=> {
42 //c.SessionItems.
43 //if (c.ClientId.Length < 10)
44 //{
45 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
46 // //c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
47 // return;
48 //}
49 //if (c.Username != "mySecretUser")
50 //{
51 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
52 // return;
53 //}
55 //if (c.Password != "mySecretPassword")
56 //{
57 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
58 // return;
59 //}
61 c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
62 Console.WriteLine("***connect validator:"+c.ClientId);
63 })
64 //.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
65 //.WithEncryptionSslProtocol(SslProtocols.Tls12)
66 .WithApplicationMessageInterceptor(context=> {
67 //if (context.ApplicationMessage.Topic == "my/custom/topic")
68 //{
69 // context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
70 //}
71 //// It is possible to disallow the sending of messages for a certain client id like this:
72 //if (context.ClientId != "Someone")
73 //{
74 // context.AcceptPublish = false;
75 // return;
76 //}
77 // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
78 // This is useful when the IoT device has no own clock and the creation time of the message might be important.
80 context.AcceptPublish = true;
81 Console.WriteLine("***Message:" + context.ApplicationMessage.Payload);
82 })
83 .WithSubscriptionInterceptor(context=>
84 {
85 //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
86 //{
87 // context.AcceptSubscription = false;
88 //}
90 //if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
91 //{
92 // context.AcceptSubscription = false;
93 // context.CloseConnection = true;
94 //}
96 context.AcceptSubscription = true;
97 Console.WriteLine("***Subscript:" + context.TopicFilter);
98 })
99 //.WithStorage(new RetainedMessageHandler())
100 ;
101 var options = optionsBuilder.Build();
103 //// Setting the options
104 //options.Storage=new RetainedMessageHandler();
106 StartServer(options);
110 Console.WriteLine("Press any key to exit.");
111 Console.ReadLine();
113 //await mqttServer.StopAsync();
114 }
116 public static async void StartServer(IMqttServerOptions options)
117 {
118 await mqttServer.StartAsync(options);
119 }
121 }
123 // The implementation of the storage:
124 // This code uses the JSON library "Newtonsoft.Json".
125 public class RetainedMessageHandler : IMqttServerStorage
126 {
127 private const string Filename = "C:\\MQTT\\RetainedMessages.json";
129 public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
130 {
131 File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
132 return Task.FromResult(0);
133 }
135 public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
136 {
137 IList<MqttApplicationMessage> retainedMessages;
138 if (File.Exists(Filename))
139 {
140 var json = File.ReadAllText(Filename);
141 retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
142 }
143 else
144 {
145 retainedMessages = new List<MqttApplicationMessage>();
146 }
148 return Task.FromResult(retainedMessages);
149 }
150 }
152 }
代码直接运行起来,就是一个简单的Mqtt server。
5 创建xamarin APP
一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:
(1)发布其他客户端可能会订阅的信息;
(2)订阅其它客户端发布的消息;
(3)退订或删除应用程序的消息;
(4)断开与服务器连接。
在VS中新建一个xamarin.Forms的移动应用,创建好后在Nuget上搜索mqttnet,添加对MQTTnet包的引用。更改代码如下:
1 <?xml version="1.0" encoding="utf-8" ?>
2 <ContentPage xmlns="http://xamarin.com/schemas/2014/forms"
3 xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml"
4 xmlns:local="clr-namespace:CatShell"
5 x:Class="CatShell.MainPage">
7 <StackLayout>
8 <!-- Place new controls here -->
9 <Label Text="SubscribeTopic"/>
10 <Entry x:Name="txtSubTopic" Placeholder="Subscribe Topic" />
11 <Button Text="BtnSubscribe" Clicked="SubButton_Clicked"/>
12 <Entry x:Name="txtReceiveMessage"/>
13 <Label Text="PublishTopic"/>
14 <Entry x:Name="txtPubTopic"/>
15 <Entry x:Name="txtSendMessage" />
16 <Button Text="Publish" Clicked="PubButton_Clicked"/>
17 <Editor>
19 </Editor>
21 </StackLayout>
23 </ContentPage>
1 using MQTTnet;
2 using MQTTnet.Client;
3 using MQTTnet.Client.Options;
4 using System;
5 using System.Collections.Generic;
6 using System.Linq;
7 using System.Text;
8 using System.Threading;
9 using System.Threading.Tasks;
10 using Xamarin.Forms;
12 namespace CatShell
13 {
14 public partial class MainPage : ContentPage
15 {
16 public IMqttClient mqttClient;
17 public IMqttClientOptions options;
18 public MainPage()
19 {
20 InitializeComponent();
21 InitMqttClient();
22 ConnectMqttServer();
23 }
25 public void InitMqttClient()
26 {
27 // Create a new MQTT client.
28 var factory = new MqttFactory();
29 mqttClient = factory.CreateMqttClient();
31 mqttClient.UseConnectedHandler(e => {
33 Device.BeginInvokeOnMainThread(() =>
34 {
35 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> connect success." + Environment.NewLine;
36 });
37 });
38 mqttClient.UseDisconnectedHandler(e =>
39 {
40 Device.BeginInvokeOnMainThread(() =>
41 {
42 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> Disconnect." + Environment.NewLine;
43 });
44 });
45 mqttClient.UseApplicationMessageReceivedHandler(e =>
46 {
47 Device.BeginInvokeOnMainThread(() =>
48 {
49 txtReceiveMessage.Text = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}" + Environment.NewLine;
50 });
51 });
53 // Create TCP based options using the builder.
54 options = new MqttClientOptionsBuilder()
55 .WithClientId("Client4")
56 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
57 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
58 //.WithCredentials("bud", "%spencer%")
59 //.WithTls()
60 //.WithTls(new MqttClientOptionsBuilderTlsParameters
61 //{
62 // UseTls = true,
63 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
64 // {
65 // // TODO: Check conditions of certificate by using above parameters.
66 // return true;
67 // }
68 //})
69 .WithCleanSession()
70 .Build();
72 }
74 public async void ConnectMqttServer()
75 {
76 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
78 }
80 private async void SubButton_Clicked(object sender, EventArgs e)
81 {
83 string topic = txtSubTopic.Text.Trim();
85 if (string.IsNullOrEmpty(topic))
86 {
87 //MessageBox.Show("订阅主题不能为空!");
88 return;
89 }
91 if (!mqttClient.IsConnected)
92 {
93 //MessageBox.Show("MQTT客户端尚未连接!");
94 return;
95 }
97 // Subscribe to a topic
98 await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
100 txtReceiveMessage.Text = txtReceiveMessage.Text + $"已订阅[{topic}]主题" + Environment.NewLine;
101 txtSubTopic.IsReadOnly = false;
102 //BtnSubscribe.Enabled = false;
103 }
105 private async void PubButton_Clicked(object sender, EventArgs e)
106 {
107 string topic = txtPubTopic.Text.Trim();
109 if (string.IsNullOrEmpty(topic))
110 {
111 //MessageBox.Show("发布主题不能为空!");
112 return;
113 }
115 string inputString = txtSendMessage.Text.Trim();
117 PublishMessages(topic, inputString);
118 }
120 public async void PublishMessages(string topicMsg, string payloadMsg)
121 {
122 var message = new MqttApplicationMessageBuilder()
123 .WithTopic(topicMsg)
124 .WithPayload(payloadMsg)
125 .WithExactlyOnceQoS()
126 .WithRetainFlag()
127 .Build();
129 await mqttClient.PublishAsync(message);
130 }
132 }
133 }
代码运行起来,在APP上可以直接发信息。
6 创建winForm client(可选)
可以创建一个winForm来相互互动,在VS上新建一个windows窗体应用(.NET Framework),界面设计如下
后台代码如下:
1 using MQTTnet;
2 using MQTTnet.Client.Options;
3 using MQTTnet.Client;
4 using System;
5 using System.Collections.Generic;
6 using System.ComponentModel;
7 using System.Data;
8 using System.Drawing;
9 using System.Linq;
10 using System.Net.Security;
11 using System.Security.Cryptography.X509Certificates;
12 using System.Text;
13 using System.Threading;
14 using System.Threading.Tasks;
15 using System.Windows.Forms;
17 namespace MqttClientWin
18 {
19 public partial class Form1 : Form
20 {
21 public IMqttClient mqttClient;
22 public IMqttClientOptions options;
23 public Form1()
24 {
25 InitializeComponent();
26 InitMqttClient();
27 ConnectMqttServer();
28 }
30 public void InitMqttClient()
31 {
32 // Create a new MQTT client.
33 var factory = new MqttFactory();
34 mqttClient = factory.CreateMqttClient();
36 mqttClient.UseApplicationMessageReceivedHandler(e =>
37 {
38 Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
39 Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
40 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
41 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
42 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
43 Console.WriteLine();
45 this.Invoke(new Action(() =>
46 {
47 txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
48 }));
50 //Task.Run(() => mqttClient.PublishAsync("hello/world"));
51 });
53 mqttClient.UseConnectedHandler(async e =>
54 {
55 Console.WriteLine("### CONNECTED WITH SERVER ###");
57 //// Subscribe to a topic
58 //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
60 //Console.WriteLine("### SUBSCRIBED ###");
62 this.Invoke(new Action(() =>
63 {
64 txtReceiveMessage.AppendText($">> connect success.{Environment.NewLine}");
65 }));
66 });
68 mqttClient.UseDisconnectedHandler(e =>
69 {
70 this.Invoke(new Action(() =>
71 {
72 txtReceiveMessage.AppendText($">> Disconnect .{Environment.NewLine}");
73 }));
74 });
76 // Create TCP based options using the builder.
77 options = new MqttClientOptionsBuilder()
78 .WithClientId("Client5")
79 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
80 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
81 //.WithCredentials("bud", "%spencer%")
82 //.WithTls()
83 //.WithTls(new MqttClientOptionsBuilderTlsParameters
84 //{
85 // UseTls = true,
86 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
87 // {
88 // // TODO: Check conditions of certificate by using above parameters.
89 // return true;
90 // }
91 //})
92 .WithCleanSession()
93 .Build();
95 }
97 public async void ConnectMqttServer()
98 {
99 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
101 }
103 public void ReconnectMqttServer()
104 {
105 mqttClient.UseDisconnectedHandler(async e =>
106 {
107 Console.WriteLine("### DISCONNECTED FROM SERVER ###");
108 await Task.Delay(TimeSpan.FromSeconds(5));
110 try
111 {
112 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
113 }
114 catch
115 {
116 Console.WriteLine("### RECONNECTING FAILED ###");
117 }
118 });
119 }
121 public async void PublishMessages(string topicMsg,string payloadMsg)
122 {
123 var message = new MqttApplicationMessageBuilder()
124 .WithTopic(topicMsg)
125 .WithPayload(payloadMsg)
126 .WithExactlyOnceQoS()
127 .WithRetainFlag()
128 .Build();
130 await mqttClient.PublishAsync(message);
131 }
133 private async void BtnSubscribe_Click(object sender, EventArgs e)
134 {
135 string topic = txtSubTopic.Text.Trim();
137 if (string.IsNullOrEmpty(topic))
138 {
139 MessageBox.Show("订阅主题不能为空!");
140 return;
141 }
143 if (!mqttClient.IsConnected)
144 {
145 MessageBox.Show("MQTT客户端尚未连接!");
146 return;
147 }
149 // Subscribe to a topic
150 await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
152 txtReceiveMessage.AppendText($"已订阅[{topic}]主题" + Environment.NewLine);
153 txtSubTopic.Enabled = false;
154 BtnSubscribe.Enabled = false;
155 }
157 private void BtnPublish_Click(object sender, EventArgs e)
158 {
159 string topic = txtPubTopic.Text.Trim();
161 if (string.IsNullOrEmpty(topic))
162 {
163 MessageBox.Show("发布主题不能为空!");
164 return;
165 }
167 string inputString = txtSendMessage.Text.Trim();
169 PublishMessages(topic, inputString);
171 }
172 }
173 }
7 MQTT协议中的订阅、主题、会话
一、订阅(Subscription)
订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
二、会话(Session)
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
三、主题名(Topic Name)
连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
五、负载(Payload)
消息订阅者所具体接收的内容。
8 MQTT协议中的方法
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:
(1)Connect。等待与服务器建立连接。
(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
(3)Subscribe。等待完成订阅。
(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。