148
阿里云
MQ 客户端收发 MQTT 消息__Java 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
MQ 除了提供标准的 MQTT 服务之外,还支持使用 MQ 客户端来收发 MQTT 消息,实现 MQ 私有协议和 MQTT 协议之间的数据互通。这种方式一般用于 MQTT 移动端设备上传的数据的后期处理,即在 ECS 上部署 MQ 的客户端,分析处理 MQTT 上传的数据。
MQ 消息和 MQTT 消息的对应关系如下图所示:
使用 MQ 客户端的相关说明,请参见 MQ TCP Java SDK 接入。
1. MQ 客户端发送 MQTT 消息
本小节介绍如何使用 MQ 的 SDK 向 MQTT 设备发送消息。使用 MQ 客户端向 MQTT 设备发消息的方式和普通 MQ 客户端发消息没有区别,只是要根据需求,将 MQTT 的二级 Topic 设置到 MQ 的消息属性中。
public class ONSSendMsg {
public static void main(String[] args) throws InterruptedException {
/**
* 设置阿里云的AccessKey,用于鉴权
*/
final String acessKey ="XXXXXX";
/**
* 设置阿里云的SecretKey,用于鉴权
*/
final String secretKey ="XXXXXXX";
/**
* 发消息使用的一级Topic,需要先在MQ控制台里申请
*/
final String topic ="MQTTTestTopic";
/**
* ProducerID,需要先在MQ控制台里申请
*/
final String producerId ="PID_MQTTTestTopic";
Properties properties =new Properties();
properties.put(PropertyKeyConst.ProducerId, producerId);
properties.put(PropertyKeyConst.AccessKey, acessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
Producer producer = ONSFactory.createProducer(properties);
producer.start();
byte[] body=new byte[1024];
final Message msg = new Message(
topic,//MQ消息的Topic,需要事先申请
"TagTest",//MQ Tag,可以进行消息过滤
body);//消息体,和MQTT的body对应
/**
* 使用MQ客户端给MQTT设备发送P2P消息时,需要在MQ消息中设置mqttSecondTopic属性
* 设置的值是“/p2p/”+目标ClientID
*/
String targetClientID="GID_MQTTTestTopic@@@DeviceID_0001";
msg.putUserProperties("mqttSecondTopic", "/p2p/"+targetClientID);
//发送消息,只要不抛异常就是成功。
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
/**
* 如果仅仅发送Pub/Sub消息,则只需要设置实际MQTT订阅的Topic即可,支持设置二级Topic
*/
msg.putUserProperties("mqttSecondTopic", "/notice/");
SendResult result =producer.send(msg);
producer.shutdown();
System.exit(0);
}
}
2. MQ 客户端接收 MQTT 消息
本小节介绍如何使用 MQ 的 SDK 接收来自 MQTT 设备发送的消息。使用 MQ 客户端接收 MQTT 设备的消息和普通 MQ 客户端收消息没有区别,MQ 客户端只需要订阅 MQTT 的一级 Topic 即可。
public class ONSRecvMsg {
public static void main(String[] args) throws InterruptedException {
/**
* 设置阿里云的AccessKey,用于鉴权
*/
final String acessKey ="XXXXXX";
/**
* 设置阿里云的SecretKey,用于鉴权
*/
final String secretKey ="XXXXXXX";
/**
* 收消息使用的一级Topic,需要先在MQ控制台里申请
*/
final String topic ="MQTTTestTopic";
/**
* ConsumerID ,需要先在MQ控制台里申请
*/
final String consumerID ="GID_MQTTTestTopic";
Properties properties =new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerID);
properties.put(PropertyKeyConst.AccessKey, acessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
Consumer consumer =ONSFactory.createConsumer(properties);
/**
* 此处MQ客户端只需要订阅MQTT的一级Topic即可
*/
consumer.subscribe(topic, "*", new MessageListener() {
public Action consume(Message message, ConsumeContext consumeContext) {
System.out.println("recv msg:"+message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("[Case Normal Consumer Init] Ok");
Thread.sleep(Integer.MAX_VALUE);
consumer.shutdown();
System.exit(0);
}
最后更新:2016-11-23 16:04:20
上一篇:
JavaScript 发消息示例__JavaScript 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
下一篇:
MQTT 客户端收发 MQTT 消息__Java 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
云服务器ECS监控__云服务监控_用户指南_云监控-阿里云
Android 收发消息示例__Android 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
批量修改解析记录__批量管理接口_API文档_云解析-阿里云
售后支持服务条款__服务条款_售后支持计划_支持计划-阿里云
搜索指定发布信息__发布管理相关接口_Open API_消息队列 MQ-阿里云
请求状态__常用指标_使用手册_性能测试-阿里云
SDK-Release__Spark_开发人员指南_E-MapReduce-阿里云
采集方式__loghub-采集_用户指南_日志服务-阿里云
解读物联网圈重磅成人礼!阿里云Link物联网平台出击“智联网”!
云解析(免费版)服务条款__服务条款_产品简介_云解析-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云