979
阿裏雲
MQTT 客戶端收發 MQTT 消息__Java 接入示例_MQTT 接入(物聯)_消息隊列 MQ-阿裏雲
本文主要介紹如何使用 MQTT 客戶端收發 MQTT 消息,並給出示例代碼供前期開發測試參考,包括資源申請、環境準備、示例代碼、注意事項等。
注意:
本文給出的實例均基於 Eclipse Paho Java SDK 實現,SDK 下載請參見 MQTT 接入準備。如使用其他第三方的客戶端,請適當修改。
1. 資源申請
使用 MQ 提供的 MQTT 服務,首先需要核實應用中使用的 Topic 資源是否已經申請,如果沒有,請先去控製台申請 Topic,Group ID 等資源。
申請資源時需要根據需求選擇對應的 Region,例如 MQTT 需要使用華北2的接入點,那麼 Topic 等資源就在華北2 申請,資源申請具體請參見申請 MQ 資源。
注意:MQTT 使用的多級子 Topic 不需要申請,代碼裏直接使用即可,沒有限製。
2. 環境準備
使用 MQTT 協議來收發消息,需要根據應用平台選擇合適的客戶端。本示例運行在 Java 平台,使用 Eclipse Paho Java SDK 構建。首先引入 Maven 依賴,POM 文件配置如下:
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>Eclipse Paho Repo</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
</repository>
<repository>
<id>snapshots-repo</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
3. MQTT 發送消息
本段示例代碼演示如何使用 MQTT 客戶端發送普通消息和 P2P 的點對點消息,其中用到的工具 MacSignature 參考下文。
public class MQTTSendMsg {
public static void main(String[] args) throws IOException {
/**
* 設置MQTT的接入點,請根據應用所在環境選擇合適的Region,不支持跨Region訪問
*/
final String broker ="tcp://mqtt-test.cn-qingdao.aliyuncs.com:1883";
/**
* 設置阿裏雲的AccessKey,用於鑒權
*/
final String acessKey ="XXXXXX";
/**
* 設置阿裏雲的SecretKey,用於鑒權
*/
final String secretKey ="XXXXXXX";
/**
* 發消息使用的一級Topic,需要先在MQ控製台裏申請
*/
final String topic ="XXXX";
/**
* MQTT的ClientID,一般由兩部分組成,GroupID@@@DeviceID
* 其中GroupID在MQ控製台裏申請
* DeviceID由應用方設置,可能是設備編號等,需要唯一,否則服務端拒絕重複的ClientID連接
*/
final String clientId ="GID_XXX@@@ClientID_XXXX";
String sign;
MemoryPersistence persistence = new MemoryPersistence();
try {
final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
System.out.println("Connecting to broker: " + broker);
/**
* 計算簽名,將簽名作為MQTT的password。
* 簽名的計算方法,參考工具類MacSignature,第一個參數是ClientID的前半部分,即GroupID
* 第二個參數阿裏雲的SecretKey
*/
sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
connOpts.setUserName(acessKey);
connOpts.setServerURIs(new String[] { broker });
connOpts.setPassword(sign.toCharArray());
connOpts.setCleanSession(false);
connOpts.setKeepAliveInterval(100);
sampleClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
System.out.println("mqtt connection lost");
throwable.printStackTrace();
while(!sampleClient.isConnected()){
try {
sampleClient.connect(connOpts);
} catch (MqttException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
}
});
sampleClient.connect(connOpts);
for (int i = 0; i < 10; i++) {
try {
String scontent = new Date()+"MQTT Test body" + i;
final MqttMessage message = new MqttMessage(scontent.getBytes());
message.setQos(0);
System.out.println(i+" pushed at "+new Date()+" "+ scontent);
/**
*消息發送到某個主題Topic,所有訂閱這個Topic的設備都能收到這個消息。
* 遵循MQTT的發布訂閱規範,Topic也可以是多級Topic。此處設置了發送到二級Topic
*/
sampleClient.publish(topic+"/notice/", message);
/**
* 如果發送P2P消息,二級Topic必須是“p2p”,三級Topic是目標的ClientID
* 此處設置的三級Topic需要是接收方的ClientID
*/
String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
sampleClient.publish(p2pTopic,message);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception me) {
me.printStackTrace();
}
}
}
4. MQTT 接收消息
本段代碼演示如何使用 MQTT 客戶端訂閱消息,接收普通的消息以及點對點消息。
public class MQTTRecvMsg {
public static void main(String[] args) throws IOException {
/**
* 設置MQTT的接入點,請根據應用所在環境選擇合適的Region,不支持跨Region訪問
*/
final String broker ="tcp://mqtt-test.cn-qingdao.aliyuncs.com:1883";
/**
* 設置阿裏雲的AccessKey,用於鑒權
*/
final String acessKey ="XXXXXX";
/**
* 設置阿裏雲的SecretKey,用於鑒權
*/
final String secretKey ="XXXXXXX";
/**
* 發消息使用的一級Topic,需要先在MQ控製台裏申請
*/
final String topic ="XXXX";
/**
* MQTT的ClientID,一般由兩部分組成,GroupID@@@DeviceID
* 其中GroupID在MQ控製台裏申請
* DeviceID由應用方設置,可能是設備編號等,需要唯一,否則服務端拒絕重複的ClientID連接
*/
final String clientId ="GID_XXXX@@@ClientID_XXXXXX";
String sign;
MemoryPersistence persistence = new MemoryPersistence();
try {
final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
System.out.println("Connecting to broker: " + broker);
/**
* 計算簽名,將簽名作為MQTT的password
* 簽名的計算方法,參考工具類MacSignature,第一個參數是ClientID的前半部分,即GroupID
* 第二個參數阿裏雲的SecretKey
*/
sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
/**
* 設置訂閱方訂閱的Topic集合,此處遵循MQTT的訂閱規則,可以是一級Topic,二級Topic,P2P消息不需要顯式訂閱
*/
final String[] topicFilters=new String[]{topic+"/notice/"};
final int[]qos={0};
connOpts.setUserName(acessKey);
connOpts.setServerURIs(new String[] { broker });
connOpts.setPassword(sign.toCharArray());
connOpts.setCleanSession(false);
connOpts.setKeepAliveInterval(100);
sampleClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
System.out.println("mqtt connection lost");
throwable.printStackTrace();
while(!sampleClient.isConnected()){
try {
sampleClient.connect(connOpts);
sampleClient.subscribe(topicFilters,qos);
} catch (MqttException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
}
});
sampleClient.connect(connOpts);
sampleClient.subscribe(topicFilters,qos);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception me) {
me.printStackTrace();
}
}
}
上文代碼用到的工具類 MacSignature.java 如下:
public class MacSignature {
/**
* @param text 要簽名的文本
* @param secretKey 阿裏雲MQ SecretKey
* @return 加密後的字符串
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
Charset charset = Charset.forName("UTF-8");
String algorithm = "HmacSHA1";
Mac mac = Mac.getInstance(algorithm);
mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
byte[] bytes = mac.doFinal(text.getBytes(charset));
return new String(Base64.encodeBase64(bytes), charset);
}
/**
* 發送方簽名方法
*
* @param clientId MQTT ClientID
* @param secretKey 阿裏雲MQ SecretKey
* @return 加密後的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
return macSignature(clientId, secretKey);
}
/**
* 訂閱方簽名方法
*
* @param topics 要訂閱的Topic集合
* @param clientId MQTT ClientID
* @param secretKey 阿裏雲MQ SecretKey
* @return 加密後的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
Collections.sort(topics); //以字典順序排序
String topicText = "";
for (String topic : topics) {
topicText += topic + "n";
}
String text = topicText + clientId;
return macSignature(text, secretKey);
}
/**
* 訂閱方簽名方法
*
* @param topic 要訂閱的Topic
* @param clientId MQTT ClientID
* @param secretKey 阿裏雲MQ SecretKey
* @return 加密後的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
List<String> topics = new ArrayList<String>();
topics.add(topic);
return subSignature(topics, clientId, secretKey);
}
}
最後更新:2016-11-29 09:51:55
上一篇:
MQ 客戶端收發 MQTT 消息__Java 接入示例_MQTT 接入(物聯)_消息隊列 MQ-阿裏雲
下一篇:
SSL 方式接入示例__Java 接入示例_MQTT 接入(物聯)_消息隊列 MQ-阿裏雲
院士潘建偉:阿裏雲與中科院聯合發布量子計算雲平台,計算能力有望躍升100萬倍
創建災備實例__擴展實例_快速入門(MySQL)_雲數據庫 RDS 版-阿裏雲
日誌服務監控__雲服務監控_用戶指南_雲監控-阿裏雲
8.3 在BI工具中連接和使用分析型數據庫__第八章 在生產中使用分析型數據庫_使用手冊_分析型數據庫-阿裏雲
BI示例教程__數加體驗館_數加平台介紹-阿裏雲
請求簽名機製__調用方式_API使用手冊_消息服務-阿裏雲
從這裏開始__快速入門_數據管理-阿裏雲
步驟3:管理證書__快速入門_證書服務-阿裏雲
SQL腳本開發__Intelij 開發插件_工具_大數據計算服務-阿裏雲
E-MapReduce 快速開始__創建 E-MapReduce_快速入門_E-MapReduce-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲