阅读148 返回首页    go 阿里云


MQ 客户端收发 MQTT 消息__Java 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云

MQ 除了提供标准的 MQTT 服务之外,还支持使用 MQ 客户端来收发 MQTT 消息,实现 MQ 私有协议和 MQTT 协议之间的数据互通。这种方式一般用于 MQTT 移动端设备上传的数据的后期处理,即在 ECS 上部署 MQ 的客户端,分析处理 MQTT 上传的数据。

MQ 消息和 MQTT 消息的对应关系如下图所示:

mq2mqtt

使用 MQ 客户端的相关说明,请参见 MQ TCP Java SDK 接入

1. MQ 客户端发送 MQTT 消息

本小节介绍如何使用 MQ 的 SDK 向 MQTT 设备发送消息。使用 MQ 客户端向 MQTT 设备发消息的方式和普通 MQ 客户端发消息没有区别,只是要根据需求,将 MQTT 的二级 Topic 设置到 MQ 的消息属性中。

  1. public class ONSSendMsg {
  2. public static void main(String[] args) throws InterruptedException {
  3. /**
  4. * 设置阿里云的AccessKey,用于鉴权
  5. */
  6. final String acessKey ="XXXXXX";
  7. /**
  8. * 设置阿里云的SecretKey,用于鉴权
  9. */
  10. final String secretKey ="XXXXXXX";
  11. /**
  12. * 发消息使用的一级Topic,需要先在MQ控制台里申请
  13. */
  14. final String topic ="MQTTTestTopic";
  15. /**
  16. * ProducerID,需要先在MQ控制台里申请
  17. */
  18. final String producerId ="PID_MQTTTestTopic";
  19. Properties properties =new Properties();
  20. properties.put(PropertyKeyConst.ProducerId, producerId);
  21. properties.put(PropertyKeyConst.AccessKey, acessKey);
  22. properties.put(PropertyKeyConst.SecretKey, secretKey);
  23. Producer producer = ONSFactory.createProducer(properties);
  24. producer.start();
  25. byte[] body=new byte[1024];
  26. final Message msg = new Message(
  27. topic,//MQ消息的Topic,需要事先申请
  28. "TagTest",//MQ Tag,可以进行消息过滤
  29. body);//消息体,和MQTT的body对应
  30. /**
  31. * 使用MQ客户端给MQTT设备发送P2P消息时,需要在MQ消息中设置mqttSecondTopic属性
  32. * 设置的值是“/p2p/”+目标ClientID
  33. */
  34. String targetClientID="GID_MQTTTestTopic@@@DeviceID_0001";
  35. msg.putUserProperties("mqttSecondTopic", "/p2p/"+targetClientID);
  36. //发送消息,只要不抛异常就是成功。
  37. SendResult sendResult = producer.send(msg);
  38. System.out.println(sendResult);
  39. /**
  40. * 如果仅仅发送Pub/Sub消息,则只需要设置实际MQTT订阅的Topic即可,支持设置二级Topic
  41. */
  42. msg.putUserProperties("mqttSecondTopic", "/notice/");
  43. SendResult result =producer.send(msg);
  44. producer.shutdown();
  45. System.exit(0);
  46. }
  47. }

2. MQ 客户端接收 MQTT 消息

本小节介绍如何使用 MQ 的 SDK 接收来自 MQTT 设备发送的消息。使用 MQ 客户端接收 MQTT 设备的消息和普通 MQ 客户端收消息没有区别,MQ 客户端只需要订阅 MQTT 的一级 Topic 即可。

  1. public class ONSRecvMsg {
  2. public static void main(String[] args) throws InterruptedException {
  3. /**
  4. * 设置阿里云的AccessKey,用于鉴权
  5. */
  6. final String acessKey ="XXXXXX";
  7. /**
  8. * 设置阿里云的SecretKey,用于鉴权
  9. */
  10. final String secretKey ="XXXXXXX";
  11. /**
  12. * 收消息使用的一级Topic,需要先在MQ控制台里申请
  13. */
  14. final String topic ="MQTTTestTopic";
  15. /**
  16. * ConsumerID ,需要先在MQ控制台里申请
  17. */
  18. final String consumerID ="GID_MQTTTestTopic";
  19. Properties properties =new Properties();
  20. properties.put(PropertyKeyConst.ConsumerId, consumerID);
  21. properties.put(PropertyKeyConst.AccessKey, acessKey);
  22. properties.put(PropertyKeyConst.SecretKey, secretKey);
  23. Consumer consumer =ONSFactory.createConsumer(properties);
  24. /**
  25. * 此处MQ客户端只需要订阅MQTT的一级Topic即可
  26. */
  27. consumer.subscribe(topic, "*", new MessageListener() {
  28. public Action consume(Message message, ConsumeContext consumeContext) {
  29. System.out.println("recv msg:"+message);
  30. return Action.CommitMessage;
  31. }
  32. });
  33. consumer.start();
  34. System.out.println("[Case Normal Consumer Init] Ok");
  35. Thread.sleep(Integer.MAX_VALUE);
  36. consumer.shutdown();
  37. System.exit(0);
  38. }

最后更新:2016-11-23 16:04:20

  上一篇:go JavaScript 发消息示例__JavaScript 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
  下一篇:go MQTT 客户端收发 MQTT 消息__Java 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云