mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端
大家好,又见面了,我是你们的朋友全栈君。
服务端:
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.ServiceProcess;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
namespace MQTTNetService
{
public partial class MQTTNetService : ServiceBase
{
private MqttServer mqttServer = null;
private System.Timers.Timer timer = null;
//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多1000个清零;
private List subClientIDs = new List();
public MQTTNetService()
{
InitializeComponent();
//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
timer = new System.Timers.Timer();
timer.AutoReset = true;
timer.Enabled = true;
timer.Interval = 5000;
timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);
}
protected override void OnStart(string[] args)
{
//开启服务
CreateMQTTServer();
if (timer.Enabled == false)
{
timer.Enabled = true;
timer.Start();
}
}
protected override void OnStop()
{
if (timer.Enabled == true)
{
timer.Enabled = false;
timer.Stop();
}
}
///
/// 开启服务
///
private async void CreateMQTTServer()
{
if (mqttServer == null)
{
var options = new MqttServerOptions();
var optionsBuilder = new MqttServerOptionsBuilder();
//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。
//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))
//指定端口
optionsBuilder.WithDefaultEndpointPort(1883);
//连接记录数,默认 一般为2000
//optionsBuilder.WithConnectionBacklog(2000);
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
//将发送的消息加到日志
mqttServer.ApplicationMessageReceived += (s, e) =>
{
string msg = @”发送消息的客户端id:” + e.ClientId + “\n”
+ “发送时间:” + DateTime.Now + “\n”
+ “发送消息的主题:” + e.ApplicationMessage.Topic + “\n”
+ “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\n”
+ “————————————————–\n”
;
WriteMsgLog(msg);
};
await mqttServer.StartAsync(options);
}
}
#region 记录日志
///
/// 消息记录日志
///
///
private void WriteMsgLog(string msg)
{
//string path = @”C:\log.txt”;
//该日志文件会存在windows服务程序目录下
string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{
FileStream fs;
fs = File.Create(path);
fs.Close();
}
using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{
using (StreamWriter sw = new StreamWriter(fs))
{
sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);
}
}
}
///
///客户端链接日志
///
///
private void WriteClientLinkLog(string msg)
{
//string path = @”C:\log.txt”;
//该日志文件会存在windows服务程序目录下
string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{
FileStream fs;
fs = File.Create(path);
fs.Close();
}
using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{
using (StreamWriter sw = new StreamWriter(fs))
{
sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);
}
}
}
///
/// 通过定时器将客户端链接信息写入日志
///
///
///
private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)
{
if (mqttServer != null)
{
List subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();
if (subclients.Count > 0)
{
foreach (var item in subclients)
{
if (!subClientIDs.Contains(item.ClientId))
{
subClientIDs.Add(item.ClientId);
string msg = @”接收客户端ID:” + item.ClientId + “\n”
+ “接收时间:” + DateTime.Now + “\n”
+ “协议版本:” + item.ProtocolVersion + “\n”
+ “最后收到的非保持活包” + item.LastNonKeepAlivePacketReceived + “\n”
+ “最后收到的包” + item.LastPacketReceived + “\n”
+ “挂起的应用程序消息” + item.PendingApplicationMessages + “\n”
+ “————————————————” + “\n”;
WriteClientLinkLog(msg);
}
}
if (subClientIDs.Count >= 1000)
{
subClientIDs.Clear();
}
}
}
}
#endregion
}
}
以上服务端不能判断特定标识的客户端接入,也就是只要有客户端连接就会接入,不够完善
客户端:简单用于测试 接收net core
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Text;
using System.Threading.Tasks;
namespace mqttclienttest0101
{
class Program
{
static MqttClient mqttClient = null;
static void Main(string[] args)
{
//var factory = new MqttFactory();
//var mqttClient = factory.CreateMqttClient();
//var options = new MqttClientOptionsBuilder();
//options.WithClientId(new Guid().ToString());
//options.WithTcpServer(“192.168.3.193”);
//options.WithTls();
//options.WithCleanSession();
//var task = mqttClient.ConnectAsync(options.Build());
RunAsync();
Console.ReadLine();
if (mqttClient != null)
{
var message = new MqttApplicationMessageBuilder();
message.WithTopic(“TopicTest”);
message.WithPayload(“topictest001”);
message.WithExactlyOnceQoS();
message.WithRetainFlag();
mqttClient.PublishAsync(message.Build());
}
Console.WriteLine(“Hello World!”);
}
public static async Task RunAsync()
{
try
{
var options = new MqttClientOptions
{
ClientId = new Guid().ToString(),
CleanSession = true,
ChannelOptions = new MqttClientTcpOptions
{
//Server = “localhost”,
Server = “127.0.1.1”
},
//ChannelOptions = new MqttClientWebSocketOptions
//{
// Uri = “ws://localhost:59690/mqtt”
//}
};
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient()as MqttClient;
mqttClient.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine(“### RECEIVED APPLICATION MESSAGE ###”);
Console.WriteLine($”+ Topic = {e.ApplicationMessage.Topic}”);
Console.WriteLine($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);
Console.WriteLine($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);
Console.WriteLine($”+ Retain = {e.ApplicationMessage.Retain}”);
Console.WriteLine();
};
mqttClient.Connected += async (s, e) =>
{
Console.WriteLine(“### CONNECTED WITH SERVER ###”);
await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(“#”).Build());
Console.WriteLine(“### SUBSCRIBED ###”);
};
mqttClient.Disconnected += async (s, e) =>
{
Console.WriteLine(“### DISCONNECTED FROM SERVER ###”);
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(options);
}
catch
{
Console.WriteLine(“### RECONNECTING FAILED ###”);
}
};
try
{
await mqttClient.ConnectAsync(options);
}
catch (Exception exception)
{
Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);
}
Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);
//while (true)
//{
// Console.ReadLine();
// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));
// var applicationMessage = new MqttApplicationMessageBuilder()
// .WithTopic(“A/B/C”)
// .WithPayload(“Hello World”)
// .WithAtLeastOnceQoS()
// .Build();
// await mqttClient.PublishAsync(applicationMessage);
//}
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
}
}
}
客户端2 发送
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MqttClientTest01
{
public partial class Form1 : Form
{
private MqttClient mqttClient = null;
public Form1()
{
InitializeComponent();
}
private void Form1_Load(object sender, EventArgs e)
{
}
private void but_linkserver_Click(object sender, EventArgs e)
{
RunAsync();
创建一个新的MQTT客户端。
//var factory = new MqttFactory();
//mqttClient = factory.CreateMqttClient() as MqttClient;
使用构建器创建基于TCP的选项。
//var options = new MqttClientOptionsBuilder();
//options.WithClientId(new Guid().ToString());
//options.WithTcpServer(txtb_serverip.Text.Trim(), Convert.ToInt32(txtb_serverport.Text.Trim()));
//options.WithTls();
//options.WithCleanSession();
//but_submsg_Click(sender, e);
//var task = mqttClient.ConnectAsync(options.Build());
//if (task != null)
//{
//}
}
public void RunAsync()
{
try
{
var options = new MqttClientOptions
{
ClientId = Guid.NewGuid().ToString(),
CleanSession = true,
ChannelOptions = new MqttClientTcpOptions
{
//Server = “localhost”,
Server = txtb_serverip.Text.Trim(),
Port = Convert.ToInt32(txtb_serverport.Text.Trim()) ,
BufferSize=20*400,
},
//ChannelOptions = new MqttClientWebSocketOptions
//{
// Uri = “ws://localhost:59690/mqtt”
//}
};
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient() as MqttClient;
try
{
var task= mqttClient.ConnectAsync(options);
if (task!=null)
{
lab_linkstatus.Text = “连接成功!”;
lab_linktimer.Text = DateTime.Now.ToString();
}
}
catch (Exception exception)
{
// Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);
}
// Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);
//while (true)
//{
// Console.ReadLine();
// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));
// var applicationMessage = new MqttApplicationMessageBuilder()
// .WithTopic(“A/B/C”)
// .WithPayload(“Hello World”)
// .WithAtLeastOnceQoS()
// .Build();
// await mqttClient.PublishAsync(applicationMessage);
//}
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
}
private void tb_username_TextChanged(object sender, EventArgs e)
{
}
private void but_clientsend_Click(object sender, EventArgs e)
{
if (mqttClient != null)
{
var message = new MqttApplicationMessageBuilder();
message.WithTopic(txtb_msgtopic.Text.Trim());
message.WithPayload(rtb_pubmsg.Text.Trim());
message.WithExactlyOnceQoS();
message.WithRetainFlag();
mqttClient.PublishAsync(message.Build());
}
}
private async void but_submsg_Click(object sender, EventArgs k)
{
if (mqttClient != null)
{