基于环信MQTT消息云,Java版MQTT客户端快速实现消息收发

1 年前

本文介绍Java版MQTT 客户端,如何连接环信MQTT消息云快速实现消息的自收自发。

一、前提条件

1.部署Java开发环境

安装IDE。您可以使用 IntelliJ IDEA 或者 Eclipse ,本文以IntelliJ IDEA为例。
下载安装 JDK

2.导入项目依赖

在IntelliJ IDEA中创建工程,并确认pom.xml中包含以下依赖。

    commons-codec
    commons-codec
    1.10
    org.eclipse.paho
    org.eclipse.paho.client.mqttv3
    1.2.2
    org.apache.httpcomponents
    httpclient
    4.5.2
     com.alibaba
     fastjson
     1.2.76

二、实现流程

1、获取鉴权

为保障客户安全性需求,环信MQTT消息云服务为客户提供【token+clientID】方式实现鉴权认证,其中 AppID (clientID中的关键字段)及 token标识 获取流程如下:

【登录console】
欢迎您登录环信云console控制台,在此控制台中,为您提供应用列表、解决方案、DEMO体验以及常见问题等功能。
在应用列表中,若您未在APP中开通MQTT业务,可参见APP MQTT开通流程
若APP已开通MQTT业务,可在应用列表中选中Appname,点击【查看】操作,进入应用详情。



【获取AppID及连接地址】
进入【查看】后,点击左侧菜单栏【MQTT】->【服务概览】,在下图红色方框内获取当前AppID及服务器连接地址。


【获取token】
为实现对用户管控及接入安全性,环信云console提供用户认证功能,支持对用户账户的增、删、改、查以及为每个用户账户分配唯一token标识,获取token标识可选择以下两种形式。
形式一:console控制台获取(管理员视角)
* 点击左侧菜单栏【应用概览】->【用户认证】页面,点击【创建IM用户】按钮,增添新的账户信息(包 括用户名及密码)。
* 创建成功后,在【用户ID】列表中选中账户,点击【查看token】按钮获取当前账户token信息。


形式二:客户端代码获取(客户端视角)
* 获取域名:点击左侧菜单栏【即时通讯】->【服务概览】页面,查看下图中token域名、org_name、app_name。


* 拼接URL:获取token URL格式为:http:/ /token域名/org_name/app_name/token。
* 用户名/密码:使用【用户ID】列表中已有账户的用户名及密码,例“用户名:test/密码:test123”。

客户端获取token代码示例如下:

public static void main() 
    // 获取token的URL
    http://{token域名}/{org_name}/{app_name}/token
    // 获取token
    String token = "";
    // 取token
    try (final CloseableHttpClient httpClient = HttpClients.createDefault())
        final HttpPost httpPost = new HttpPost("http://{token域名}/{org_name}/{app_name}/token");
        Map params = new HashMap<>();
        params.put("grant_type", "password");
        params.put("username", "test");
        params.put("password", "test123");
        //设置请求体参数
        StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8"));
        entity.setContentEncoding("utf-8");
        httpPost.setEntity(entity);
        //设置请求头部
        httpPost.setHeader("Content-Type", "application/json");
        //执行请求,返回请求响应
        try (final CloseableHttpResponse response = httpClient.execute(httpPost)
            //请求返回状态码
            int statusCode = response.getStatusLine().getStatusCode();
            //请求成功
            if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT) 
                //取出响应体
                final HttpEntity entity2 = response.getEntity();
                //从响应体中解析出token
                String responseBody = EntityUtils.toString(entity2, "utf-8");
                JSONObject jsonObject = JSONObject.parseObject(responseBody);
                token = jsonObject.getString("access_token");
                 //请求失败
                throw new ClientProtocolException("请求失败,响应码为:" + statusCode);
    catch (IOException e) 
        e.printStackTrace();
}

返回结果

{
    "access_token": "YWMtN8a0oqV3EeuF0AmiqRgEh-grzF8zZk2Wp8GS3pF-orDW_F-gj3kR6os3h_oz3ROQAwMAAAF5BxhGlwBPGgAvTR8vDrdVsDPNZMQj0fFjv7EaohgZhzMHM9ncVLE30g",
    "expires_in": 5184000,
    "user": 
        "uuid": "d6fc5fa0-8f79-11ea-8b37-87fa33dd1390",
        "type": "user",
        "created": 1588756404898,
        "modified": 1588756404898,
        "username": "test",
        "activated": true
 access_token即为要获取的token

2、初始化

在IntelliJ IDEA工程中创建MQTT客户端,客户端初始配置包括创建clientID,topic名称,QoS质量,连接地址等信息。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
public class MqttDemoStarter 
    public static void main(String[] args) throws MqttException, InterruptedException {
        * 用户指定
        String deviceId = "xxxxx-xxxx-xxxxx-xxxxx-xxxxx";
        * 从console控制台获取
        String appId = "1NQ1E9";
         * 设置接入点,进入console控制台获取
        String endpoint = "1NQ1E9.sandbox.mqtt.chat";
         * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致之前的连接断开。
         * clientId由两部分组成,格式为DeviceID@appId,其中DeviceID由业务方自己设置,appId在console控制台创建,clientId总长度不得超过64个字符。
        String clientId = deviceId + "@" + appId;
         * 需要订阅或发送消息的topic名称
         * 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
        final String myTopic = "myTopic";
         * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
        final int qosLevel = 0;
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
         * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是ws或者wss,使用http://;如果是mqtt或者mqtts,使用tcp://
        final MqttClient mqttClient = new MqttClient("tcp://" + endpoint + ":1883", clientId, memoryPersistence);
         * 设置客户端发送超时时间,防止无限阻塞。
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
}

3、连接服务器

配置连接密码、cleansession标志、心跳间隔、超时时间等信息,调用connect()函数连接至环信MQTT消息云。

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
         * 用户名,在console中注册
        mqttConnectOptions.setUserName("test");
         * 用户密码为第一步中申请的token
        mqttConnectOptions.setPassword(token.toCharArray());
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
        mqttConnectOptions.setConnectionTimeout(5000);
        mqttClient.connect(mqttConnectOptions);
        //暂停1秒钟,等待连接订阅完成
        Thread.sleep(1000);


4、订阅【subscribe】

【订阅主题】

当客户端成功连接环信MQTT消息云后,需尽快向服务器发送订阅主题消息。

mqttClient.setCallback(new MqttCallbackExtended() {
        * 连接完成回调方法
        * @param b
        * @param s
        @Override
        public void connectComplete(boolean b, String s) {
            * 客户端连接成功后就需要尽快订阅需要的Topic。
            System.out.println("connect success");
            executorService.submit(() -> {
                try {
                    final String[] topicFilter = {myTopic};
                    final int[] qos = {qosLevel};
                    mqttClient.subscribe(topicFilter, qos);
                } catch (Exception e) {
                    e.printStackTrace();
});

【取消订阅】

mqttClient.unsubscribe(new String[]{myTopic});

【接收消息】

配置接收消息回调方法,从环信MQTT消息云接收订阅消息。

mqttClient.setCallback(new MqttCallbackExtended() {
        * 接收消息回调方法
        * @param s
        * @param mqttMessage
        @Override
        public void messageArrived(String s, MqttMessage mqttMessage) {
            System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
});

5、发布【publish】

配置发送消息回调方法,向环信MQTT消息云中指定topic发送消息。

for (int i = 0; i < 10; i++) {
        * 构建一个Mqtt消息
        MqttMessage message = new MqttMessage("hello world pub sub msg".getBytes());
        //设置传输质量
        message.setQos(qosLevel);
        * 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
        mqttClient.publish(myTopic, message);
}

6、结果验证

connect success
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg