最近做了应该MQTT通讯的服务,参考了网上的一些例子,代码分享一下。

MQTT通讯帮助类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using Newtonsoft.Json;

namespace MQTTService.Common
{
/// <summary>
/// MQTT通讯
/// </summary>
public partial class MQTTnetHelper : BaseJobLog
{
private  MqttClient mqttClient = null;

public  bool ConnectState { get { return mqttClient.IsConnected; } }

public  async Task ConnectMqttServerAsync(string url,int? port, string clientId, string username, string password)
{
if (mqttClient == null)
{
mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;
mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
mqttClient.Connected += MqttClient_Connected;
mqttClient.Disconnected += MqttClient_Disconnected;
}

try
{
var options = new MqttClientTcpOptions
{
Server = url,
Port= port,
ClientId = clientId,
UserName = username,
Password = password,
CleanSession = true
};

await mqttClient.ConnectAsync(options);
}
catch (Exception ex)
{
var msg =$"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine;
}
}

private  void MqttClient_Connected(object sender, EventArgs e)
{
var msg = "已连接到MQTT服务器!" + Environment.NewLine;
}

private  void MqttClient_Disconnected(object sender, EventArgs e)
{
var msg = "已断开MQTT连接!" + Environment.NewLine;
}

private  void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
var msg = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}";
}

private  void Subscribe_ClickAsync(object sender, EventArgs e)
{
mqttClient.SubscribeAsync(new List<TopicFilter> {
new TopicFilter("", MqttQualityOfServiceLevel.AtMostOnce)
});
}

public  Task DataPublishAsync(string topic, object data)
{
var strdata = JsonConvert.SerializeObject(data);
var byteData = Encoding.UTF8.GetBytes(strdata);
var appMsg = new MqttApplicationMessage(topic, byteData, MqttQualityOfServiceLevel.AtMostOnce, false);
var  result = mqttClient.PublishAsync(appMsg);
return result;

MQTT连接上报类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MQTTService.Models;
using Newtonsoft.Json;

namespace MQTTService.Common
{
public class ConnectOrPushHelper : BaseJob
{
private MQTTnetHelper mqttHelper = new MQTTnetHelper();

/// <summary>
/// MQTT连接
/// </summary>
/// <returns></returns>
public bool MQTTConnect(DeviceAuthModel deviceAuthModel)
{
if (deviceAuthModel.IsNull()) return false;
//MQTT连接
var device = deviceAuthModel.deviceInfo;
var mq = deviceAuthModel.mqInfo;
var IP = CommonConfig.ApiIP_MQTT.IsNotNullOrWhiteSpace() ? CommonConfig.ApiIP_MQTT : mq.mqIp;
var port = CommonConfig.ApiPort_MQTT.IsNotNullOrWhiteSpace() ? CommonConfig.ApiPort_MQTT : mq.mqPort;
_log.Info(string.Format("MQTT连接,IP:{0},Port:{1},clientId:{2},mqUsername:{3},mqPassword:{4}", IP, port, device.deviceName + "@" + device.deviceSecret, mq.mqUsername, mq.mqPassword));
var res = mqttHelper.ConnectMqttServerAsync(IP, int.Parse(port), device.deviceName + "@" + device.deviceSecret, mq.mqUsername, mq.mqPassword);
System.Threading.Thread.Sleep(120);

if (!mqttHelper.ConnectState)
{
_log.Info("MQTT连接失败,DeviceAuthModel:" + JsonConvert.SerializeObject(deviceAuthModel));
return false;
}
else
{
return true;
}
}

/// <summary>
/// MQTT上报
/// </summary>
/// <param name="url"></param>
/// <param name="atrrData"></param>
/// <returns></returns>
public bool Push<T>(string url,string deviceName, T reportModel)
{
try
{
_log.Info(string.Format("MQTT推送数据,url:{0},数据:{1}", url, JsonConvert.SerializeObject(reportModel)));

var result = mqttHelper.DataPublishAsync(url, reportModel);
if (result.Status.ToString() == "Faulted")
{
_log.Info(string.Format("MQTT推送数据失败,url:{0},错误信息:{1},推送数据:{2}", url, result.Exception.Message, JsonConvert.SerializeObject(reportModel)));
return false;
}
System.Threading.Thread.Sleep(50);
return true;
}
catch (Exception ex)
{
_log.Info(string.Format("发布失败,编号:{0},异常信息:{1}", deviceName, ex.Message));
return false;
}
}

Http请求帮助类:

using log4net;
using Newtonsoft.Json;
using RestSharp;
using System;
using System.Diagnostics;

namespace MQTTService.Common
{
public class BaseJob
{
private string host;
public ILog _log;

public BaseJob()
{
_log = LogManager.GetLogger(typeof(BaseJob));
}

public string Post(string url, object data)
{
var client = new RestClient(url);
if (url.Contains("https://"))
System.Net.ServicePointManager.ServerCertificateValidationCallback += (sender, certificate, chain, sslPolicyErrors) => true; //SSL证书验证
var request = new RestRequest(Method.POST);
try
{
var str = JsonConvert.SerializeObject(data);
request.AddHeader("cache-control", "no-cache");
request.AddHeader("content-type", "application/json");
request.AddParameter("application/json", JsonConvert.SerializeObject(data), ParameterType.RequestBody);

Stopwatch sw = new Stopwatch();
sw.Start();
var response = client.Execute<dynamic>(request);
sw.Stop();
if(response.ErrorMessage.IsNotNullOrWhiteSpace()) _log.Info(url + ",传参:" + JsonConvert.SerializeObject(data) + ",错误信息:" + response.ErrorMessage+ ",接口执行时间:" + sw.ElapsedMilliseconds);
return response.Content;
}
catch (Exception e)
{
_log.Error(e);
_log.Info(url + ",传参:" + JsonConvert.SerializeObject(request.Parameters));
return string.Empty;
}
}

public T Post<T>(string url, object data) where T : new()
{
url = host + url;
var client = new RestClient(url);
var request = new RestRequest(Method.POST);
try
{
request.AddHeader("cache-control", "no-cache");
request.AddHeader("content-type", "application/json");
request.AddParameter("application/json", JsonConvert.SerializeObject(data), ParameterType.RequestBody);

Stopwatch sw = new Stopwatch();
sw.Start();
var response = client.Execute<T>(request);
sw.Stop();

_log.Info(url + ",传参:" + JsonConvert.SerializeObject(data) + ",返回结果:" + response.Content + ",接口执行时间:" + sw.ElapsedMilliseconds);
return response.Data;
}
catch (Exception e)
{
_log.Error(e);
_log.Info(url + ",传参: " + JsonConvert.SerializeObject(request.Parameters));
return default(T);
}
}

Windows上 Mqtt 服务器搭建与使用客户端工具 Mqtt Box进行测试: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112305328 SpringBoot整合 MQTT 服务器 实现 消息的发送与订阅(推送消息与接收推送): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112394731 在上面 实现 了将 MQTT 服务器搭建成功并且在SpringBoot中
在上篇文章中利用Apollo创建了 MQTT 服务端,但仅有一个服务端是没有意义的,只有将服务端和客户端结合起来使用才能发挥 MQTT 协议的特性,所以本篇的内容是创建 MQTT 客户端。由于本人对.Net平台相对熟悉,所以将使用 MQTT Net类库结合WPF创建一个客户端。 1.需求分析 MQTT 协议的基本特性是使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合,同时基于TCP/IP能够提供多种...
mqtt 的功能请看百度百科, MQTT _百度百科。这里简单说一下, mqtt 的功能就是一个发布/订阅的功能例如,接下来,我们做一个例子。现在有A服务器作为发布方,B客户端作为订阅方。如果A发布了消息,那么会向B发送消息,反之亦然,从而进行通信的功能。一、A服务端代码1.建立一个.net5的web api项目2.nuget引用 mqtt net3. Mqtt Service代码 4. Mqtt HostService代码 5.在Startup注入代码 6.增加一个服务器发送消息的方法 二、B客户端1.建立一
技术架构:Asp.NET CORE 3.1 MVC + SQLserver + Redis等 开发语言: C# 6.0、JavaScript 前端框架:JQuery、EasyUI、Bootstrap 后端框架:MVC、SQLSugar等 数 据 库:SQLserver 2012