閱讀979 返回首頁    go 財經資訊


MQTT 客戶端收發 MQTT 消息__Java 接入示例_MQTT 接入(物聯)_消息隊列 MQ-阿裏雲

本文主要介紹如何使用 MQTT 客戶端收發 MQTT 消息,並給出示例代碼供前期開發測試參考,包括資源申請、環境準備、示例代碼、注意事項等。

注意:

本文給出的實例均基於 Eclipse Paho Java SDK 實現,SDK 下載請參見 MQTT 接入準備。如使用其他第三方的客戶端,請適當修改。

1. 資源申請

使用 MQ 提供的 MQTT 服務,首先需要核實應用中使用的 Topic 資源是否已經申請,如果沒有,請先去控製台申請 Topic,Group ID 等資源。

申請資源時需要根據需求選擇對應的 Region,例如 MQTT 需要使用華北2的接入點,那麼 Topic 等資源就在華北2 申請,資源申請具體請參見申請 MQ 資源

注意:MQTT 使用的多級子 Topic 不需要申請,代碼裏直接使用即可,沒有限製。

2. 環境準備

使用 MQTT 協議來收發消息,需要根據應用平台選擇合適的客戶端。本示例運行在 Java 平台,使用 Eclipse Paho Java SDK 構建。首先引入 Maven 依賴,POM 文件配置如下:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.eclipse.paho</groupId>
  4. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  5. <version>1.0.2</version>
  6. </dependency>
  7. </dependencies>
  8. <repositories>
  9. <repository>
  10. <id>Eclipse Paho Repo</id>
  11. <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
  12. </repository>
  13. <repository>
  14. <id>snapshots-repo</id>
  15. <url>https://oss.sonatype.org/content/repositories/snapshots</url>
  16. <releases>
  17. <enabled>false</enabled>
  18. </releases>
  19. <snapshots>
  20. <enabled>true</enabled>
  21. </snapshots>
  22. </repository>
  23. </repositories>

3. MQTT 發送消息

本段示例代碼演示如何使用 MQTT 客戶端發送普通消息和 P2P 的點對點消息,其中用到的工具 MacSignature 參考下文。

  1. public class MQTTSendMsg {
  2. public static void main(String[] args) throws IOException {
  3. /**
  4. * 設置MQTT的接入點,請根據應用所在環境選擇合適的Region,不支持跨Region訪問
  5. */
  6. final String broker ="tcp://mqtt-test.cn-qingdao.aliyuncs.com:1883";
  7. /**
  8. * 設置阿裏雲的AccessKey,用於鑒權
  9. */
  10. final String acessKey ="XXXXXX";
  11. /**
  12. * 設置阿裏雲的SecretKey,用於鑒權
  13. */
  14. final String secretKey ="XXXXXXX";
  15. /**
  16. * 發消息使用的一級Topic,需要先在MQ控製台裏申請
  17. */
  18. final String topic ="XXXX";
  19. /**
  20. * MQTT的ClientID,一般由兩部分組成,GroupID@@@DeviceID
  21. * 其中GroupID在MQ控製台裏申請
  22. * DeviceID由應用方設置,可能是設備編號等,需要唯一,否則服務端拒絕重複的ClientID連接
  23. */
  24. final String clientId ="GID_XXX@@@ClientID_XXXX";
  25. String sign;
  26. MemoryPersistence persistence = new MemoryPersistence();
  27. try {
  28. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29. final MqttConnectOptions connOpts = new MqttConnectOptions();
  30. System.out.println("Connecting to broker: " + broker);
  31. /**
  32. * 計算簽名,將簽名作為MQTT的password。
  33. * 簽名的計算方法,參考工具類MacSignature,第一個參數是ClientID的前半部分,即GroupID
  34. * 第二個參數阿裏雲的SecretKey
  35. */
  36. sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37. connOpts.setUserName(acessKey);
  38. connOpts.setServerURIs(new String[] { broker });
  39. connOpts.setPassword(sign.toCharArray());
  40. connOpts.setCleanSession(false);
  41. connOpts.setKeepAliveInterval(100);
  42. sampleClient.setCallback(new MqttCallback() {
  43. public void connectionLost(Throwable throwable) {
  44. System.out.println("mqtt connection lost");
  45. throwable.printStackTrace();
  46. while(!sampleClient.isConnected()){
  47. try {
  48. sampleClient.connect(connOpts);
  49. } catch (MqttException e) {
  50. e.printStackTrace();
  51. }
  52. try {
  53. Thread.sleep(1000);
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  60. System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  61. }
  62. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  63. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  64. }
  65. });
  66. sampleClient.connect(connOpts);
  67. for (int i = 0; i < 10; i++) {
  68. try {
  69. String scontent = new Date()+"MQTT Test body" + i;
  70. final MqttMessage message = new MqttMessage(scontent.getBytes());
  71. message.setQos(0);
  72. System.out.println(i+" pushed at "+new Date()+" "+ scontent);
  73. /**
  74. *消息發送到某個主題Topic,所有訂閱這個Topic的設備都能收到這個消息。
  75. * 遵循MQTT的發布訂閱規範,Topic也可以是多級Topic。此處設置了發送到二級Topic
  76. */
  77. sampleClient.publish(topic+"/notice/", message);
  78. /**
  79. * 如果發送P2P消息,二級Topic必須是“p2p”,三級Topic是目標的ClientID
  80. * 此處設置的三級Topic需要是接收方的ClientID
  81. */
  82. String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
  83. sampleClient.publish(p2pTopic,message);
  84. } catch (Exception e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. } catch (Exception me) {
  89. me.printStackTrace();
  90. }
  91. }
  92. }

4. MQTT 接收消息

本段代碼演示如何使用 MQTT 客戶端訂閱消息,接收普通的消息以及點對點消息。

  1. public class MQTTRecvMsg {
  2. public static void main(String[] args) throws IOException {
  3. /**
  4. * 設置MQTT的接入點,請根據應用所在環境選擇合適的Region,不支持跨Region訪問
  5. */
  6. final String broker ="tcp://mqtt-test.cn-qingdao.aliyuncs.com:1883";
  7. /**
  8. * 設置阿裏雲的AccessKey,用於鑒權
  9. */
  10. final String acessKey ="XXXXXX";
  11. /**
  12. * 設置阿裏雲的SecretKey,用於鑒權
  13. */
  14. final String secretKey ="XXXXXXX";
  15. /**
  16. * 發消息使用的一級Topic,需要先在MQ控製台裏申請
  17. */
  18. final String topic ="XXXX";
  19. /**
  20. * MQTT的ClientID,一般由兩部分組成,GroupID@@@DeviceID
  21. * 其中GroupID在MQ控製台裏申請
  22. * DeviceID由應用方設置,可能是設備編號等,需要唯一,否則服務端拒絕重複的ClientID連接
  23. */
  24. final String clientId ="GID_XXXX@@@ClientID_XXXXXX";
  25. String sign;
  26. MemoryPersistence persistence = new MemoryPersistence();
  27. try {
  28. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29. final MqttConnectOptions connOpts = new MqttConnectOptions();
  30. System.out.println("Connecting to broker: " + broker);
  31. /**
  32. * 計算簽名,將簽名作為MQTT的password
  33. * 簽名的計算方法,參考工具類MacSignature,第一個參數是ClientID的前半部分,即GroupID
  34. * 第二個參數阿裏雲的SecretKey
  35. */
  36. sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37. /**
  38. * 設置訂閱方訂閱的Topic集合,此處遵循MQTT的訂閱規則,可以是一級Topic,二級Topic,P2P消息不需要顯式訂閱
  39. */
  40. final String[] topicFilters=new String[]{topic+"/notice/"};
  41. final int[]qos={0};
  42. connOpts.setUserName(acessKey);
  43. connOpts.setServerURIs(new String[] { broker });
  44. connOpts.setPassword(sign.toCharArray());
  45. connOpts.setCleanSession(false);
  46. connOpts.setKeepAliveInterval(100);
  47. sampleClient.setCallback(new MqttCallback() {
  48. public void connectionLost(Throwable throwable) {
  49. System.out.println("mqtt connection lost");
  50. throwable.printStackTrace();
  51. while(!sampleClient.isConnected()){
  52. try {
  53. sampleClient.connect(connOpts);
  54. sampleClient.subscribe(topicFilters,qos);
  55. } catch (MqttException e) {
  56. e.printStackTrace();
  57. }
  58. try {
  59. Thread.sleep(1000);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }
  65. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  66. System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  67. }
  68. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  69. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  70. }
  71. });
  72. sampleClient.connect(connOpts);
  73. sampleClient.subscribe(topicFilters,qos);
  74. Thread.sleep(Integer.MAX_VALUE);
  75. } catch (Exception me) {
  76. me.printStackTrace();
  77. }
  78. }
  79. }

上文代碼用到的工具類 MacSignature.java 如下:

  1. public class MacSignature {
  2. /**
  3. * @param text 要簽名的文本
  4. * @param secretKey 阿裏雲MQ SecretKey
  5. * @return 加密後的字符串
  6. * @throws InvalidKeyException
  7. * @throws NoSuchAlgorithmException
  8. */
  9. public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
  10. Charset charset = Charset.forName("UTF-8");
  11. String algorithm = "HmacSHA1";
  12. Mac mac = Mac.getInstance(algorithm);
  13. mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
  14. byte[] bytes = mac.doFinal(text.getBytes(charset));
  15. return new String(Base64.encodeBase64(bytes), charset);
  16. }
  17. /**
  18. * 發送方簽名方法
  19. *
  20. * @param clientId MQTT ClientID
  21. * @param secretKey 阿裏雲MQ SecretKey
  22. * @return 加密後的字符串
  23. * @throws NoSuchAlgorithmException
  24. * @throws InvalidKeyException
  25. */
  26. public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  27. return macSignature(clientId, secretKey);
  28. }
  29. /**
  30. * 訂閱方簽名方法
  31. *
  32. * @param topics 要訂閱的Topic集合
  33. * @param clientId MQTT ClientID
  34. * @param secretKey 阿裏雲MQ SecretKey
  35. * @return 加密後的字符串
  36. * @throws NoSuchAlgorithmException
  37. * @throws InvalidKeyException
  38. */
  39. public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  40. Collections.sort(topics); //以字典順序排序
  41. String topicText = "";
  42. for (String topic : topics) {
  43. topicText += topic + "n";
  44. }
  45. String text = topicText + clientId;
  46. return macSignature(text, secretKey);
  47. }
  48. /**
  49. * 訂閱方簽名方法
  50. *
  51. * @param topic 要訂閱的Topic
  52. * @param clientId MQTT ClientID
  53. * @param secretKey 阿裏雲MQ SecretKey
  54. * @return 加密後的字符串
  55. * @throws NoSuchAlgorithmException
  56. * @throws InvalidKeyException
  57. */
  58. public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  59. List<String> topics = new ArrayList<String>();
  60. topics.add(topic);
  61. return subSignature(topics, clientId, secretKey);
  62. }
  63. }

最後更新:2016-11-29 09:51:55

  上一篇:go MQ 客戶端收發 MQTT 消息__Java 接入示例_MQTT 接入(物聯)_消息隊列 MQ-阿裏雲
  下一篇:go SSL 方式接入示例__Java 接入示例_MQTT 接入(物聯)_消息隊列 MQ-阿裏雲