import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.spec.PKCS8EncodedKeySpec;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.codec.binary.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import com.alibaba.fastjson.JSONObject;
* MQTT客户端直连阿里云物联网平台,基于Eclipse Paho开发。
* 基于X.509认证接入文档:https://help.aliyun.com/document_detail/140588.html
public class IotMqttClientWithAuthByX509 {
// 地域ID
private static String regionId = "cn-shanghai"; // 目前仅华东2(上海)支持X.509认证。
// 设备证书
private String certPath = "";
// 设备证书私钥
private String privateKeyPath = "";
// 密码固定为空
private String privateKeyPassword = "";
// X.509认证返回信息的Topic。无需创建,无需订阅,直接使用。
private static final String AUTH_TOPIC = "/ext/auth/identity/response";
// 设备productKey,用于接收物联网平台下发的productKey,无需填写。
private static String productKey = "";
// 设备deviceName,用于接收物联网平台下发的deviceName,无需填写。
private static String deviceName = "";
// MQTT客户端
private MqttClient sampleClient = null;
* 建立MQTT连接
* @param certPath 证书路径
* @param privateKeyPath 私钥路径
* @param privateKeyPassword 私钥密码,目前固定为空
public void connect(String certPath, String privateKeyPath, String privateKeyPassword) {
this.certPath = certPath;
this.privateKeyPath = privateKeyPath;
this.privateKeyPassword = privateKeyPassword;
// 接入域名
String broker = "ssl://x509.itls." + regionId + ".aliyuncs.com:1883";
// 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内。
String clientId = ".";
// 只支持securemode=2,表示使用TLS。
String clientOpts = "|securemode=2|";
// MQTT接入客户端ID
String mqttClientId = clientId + clientOpts;
// 建立MQTT连接。使用X.509证书认证,所以不需要username和password。
connect(broker, mqttClientId, "", "");
* 建立MQTT连接
* @param serverURL 连接服务器地址
* @param clientId MQTT接入客户端ID
* @param username MQTT接入用户名
* @param password MQTT接入密码
protected void connect(String serverURL, String clientId, String username, String password) {
try {
MemoryPersistence persistence = new MemoryPersistence();
sampleClient = new MqttClient(serverURL, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setMqttVersion(4);// MQTT 3.1.1
connOpts.setUserName(username);// 用户名
connOpts.setPassword(password.toCharArray());// 密码
connOpts.setSocketFactory(createSSLSocket()); // 使用TLS,需要下载根证书root.crt,设置securemode=2。
connOpts.setCleanSession(false); // 不清理离线消息。qos=1的消息,在设备离线期间会保存在云端。
connOpts.setAutomaticReconnect(false); // 本demo关闭自动重连。强烈建议生产环境开启自动重连。
connOpts.setKeepAliveInterval(300); // 设置心跳,建议300秒。
// 先设置回调。如果是先connect,后设置回调,可能会导致消息到达时回调还没准备好,这样消息可能会丢失。
sampleClient.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 只处理X.509认证返回信息
if (AUTH_TOPIC.equals(topic)) {
JSONObject json = JSONObject
.parseObject(new String(message.getPayload(), StandardCharsets.UTF_8));
productKey = json.getString("productKey");
deviceName = json.getString("deviceName");
} else {
// 处理其他下行消息,强烈建议另起线程处理,以免回调堵塞。
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connecting to broker: " + serverURL);
sampleClient.connect(connOpts);
System.out.print("Connected: clientId=" + clientId);
System.out.println(",username=" + username + ",password=" + password);
} catch (MqttException e) {
System.out.print("connect failed: clientId=" + clientId);
System.out.println(",username=" + username + ",password=" + password);
System.out.println("reason " + e.getReasonCode());
System.out.println("msg " + e.getMessage());
System.out.println("loc " + e.getLocalizedMessage());
System.out.println("cause " + e.getCause());
System.out.println("except " + e);
e.printStackTrace();
} catch (Exception e) {
System.out.print("connect exception: clientId=" + clientId);
System.out.println(",username=" + username + ",password=" + password);
System.out.println("msg " + e.getMessage());
e.printStackTrace();
* 发布消息,默认qos=0
* @param topic 发布消息的Topic
* @param payload 发布的消息内容
public void publish(String topic, String payload) {
byte[] content = payload.getBytes(StandardCharsets.UTF_8);
publish(topic, 0, content);
* 发布消息
* @param topic 发布消息的Topic
* @param qos 消息等级,平台支持qos=0和qos=1,不支持qos=2。
* @param payload 发布的消息内容
public void publish(String topic, int qos, byte[] payload) {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
try {
sampleClient.publish(topic, message);
System.out.println("Message published: topic=" + topic + ",qos=" + qos);
} catch (MqttException e) {
System.out.println("publish failed: topic=" + topic + ",qos=" + qos);
System.out.println("reason " + e.getReasonCode());
System.out.println("msg " + e.getMessage());
System.out.println("loc " + e.getLocalizedMessage());
System.out.println("cause " + e.getCause());
System.out.println("except " + e);
e.printStackTrace();
protected SSLSocketFactory createSSLSocket() throws Exception {
// 物联网平台根证书,可以从官网文档中下载https://help.aliyun.com/document_detail/73742.html
// 设备X.509证书,可以从控制台设备信息中下载。
// CA certificate is used to authenticate server
InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream("/root.crt");
CertificateFactory cf = CertificateFactory.getInstance("X.509");
Certificate ca = cf.generateCertificate(in);
in.close();
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("ca", ca);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
// client key and certificates are sent to server so it can authenticate us
InputStream certIn = IotMqttClientWithAuthByX509.class.getResourceAsStream(certPath);
CertificateFactory certCf = CertificateFactory.getInstance("X.509");
Certificate certCa = certCf.generateCertificate(certIn);
certIn.close();
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", certCa);
PrivateKey privateKey = getPrivateKey(privateKeyPath);
ks.setKeyEntry("private-key", privateKey, privateKeyPassword.toCharArray(), new Certificate[] { certCa });
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, privateKeyPassword.toCharArray());
SSLContext context = SSLContext.getInstance("TLSV1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
SSLSocketFactory socketFactory = context.getSocketFactory();
return socketFactory;
private PrivateKey getPrivateKey(String path) throws Exception {
byte[] buffer = Base64.decodeBase64(getPem(path));
PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(buffer);
KeyFactory keyFactory = KeyFactory.getInstance("RSA");
return keyFactory.generatePrivate(keySpec);
private String getPem(String path) throws Exception {
InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream(path);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String readLine = null;
StringBuilder sb = new StringBuilder();
while ((readLine = br.readLine()) != null) {
if (readLine.charAt(0) == '-') {
continue;
} else {
sb.append(readLine);
sb.append('\r');
in.close();
return sb.toString();
* 连接成功后,物联网平台会下发productKey和deviceName信息到/ext/auth/identity/response,用于组装Topic进行消息收发。
* @param args
public static void main(String[] args) {
IotMqttClientWithAuthByX509 client = new IotMqttClientWithAuthByX509();
// 填写设备证书路径信息
client.connect("您的设备证书路径", "您的证书私钥路径", "");
// 连接成功之后,休眠两秒,为保证接收云端下发的productKey和deviceName,不然消息收发Topic的productKey和deviceName字段可能为空。
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
// 发送消息,验证连接是否成功。
String updateTopic = "/" + productKey + "/" + deviceName + "/user/update";
client.publish(updateTopic, "hello mqtt with X.509 auth");
在以上代码中配置设备证书和证书私钥的路径。本示例工程
X509-demo的
resource目录下证书文件为
x509device.cer、
devicex509_pkcs8.key。
......
// 设备证书
private String certPath = "/x509device.cer";
// 设备证书私钥
private String privateKeyPath = "/devicex509_pkcs8.key";
......
......
public static void main(String[] args) {
......
// 填写设备证书路径信息
client.connect("/x509device.cer", "/devicex509_pkcs8.key", "");
......
运行IotMqttClientWithAuthByX509.java主程序。
运行成功日志如下:
Connecting to broker: ssl://x509.itls.cn-shanghai.aliyuncs.com:1883
Connected: clientId=.|securemode=2|,username=,password=
Message published: topic=/a1****m/X509device/user/update,qos=0
您可登录
物联网平台控制台,在公共实例下查看设备状态和日志。
选择,可看到该设备的状态显示为在线。
选择,可查看相关日志。