979
windows
MQTT 客户端收发 MQTT 消息__Java 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
本文主要介绍如何使用 MQTT 客户端收发 MQTT 消息,并给出示例代码供前期开发测试参考,包括资源申请、环境准备、示例代码、注意事项等。
注意:
本文给出的实例均基于 Eclipse Paho Java SDK 实现,SDK 下载请参见 MQTT 接入准备。如使用其他第三方的客户端,请适当修改。
1. 资源申请
使用 MQ 提供的 MQTT 服务,首先需要核实应用中使用的 Topic 资源是否已经申请,如果没有,请先去控制台申请 Topic,Group ID 等资源。
申请资源时需要根据需求选择对应的 Region,例如 MQTT 需要使用华北2的接入点,那么 Topic 等资源就在华北2 申请,资源申请具体请参见申请 MQ 资源。
注意:MQTT 使用的多级子 Topic 不需要申请,代码里直接使用即可,没有限制。
2. 环境准备
使用 MQTT 协议来收发消息,需要根据应用平台选择合适的客户端。本示例运行在 Java 平台,使用 Eclipse Paho Java SDK 构建。首先引入 Maven 依赖,POM 文件配置如下:
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>Eclipse Paho Repo</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
</repository>
<repository>
<id>snapshots-repo</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
3. MQTT 发送消息
本段示例代码演示如何使用 MQTT 客户端发送普通消息和 P2P 的点对点消息,其中用到的工具 MacSignature 参考下文。
public class MQTTSendMsg {
public static void main(String[] args) throws IOException {
/**
* 设置MQTT的接入点,请根据应用所在环境选择合适的Region,不支持跨Region访问
*/
final String broker ="tcp://mqtt-test.cn-qingdao.aliyuncs.com:1883";
/**
* 设置阿里云的AccessKey,用于鉴权
*/
final String acessKey ="XXXXXX";
/**
* 设置阿里云的SecretKey,用于鉴权
*/
final String secretKey ="XXXXXXX";
/**
* 发消息使用的一级Topic,需要先在MQ控制台里申请
*/
final String topic ="XXXX";
/**
* MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
* 其中GroupID在MQ控制台里申请
* DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
*/
final String clientId ="GID_XXX@@@ClientID_XXXX";
String sign;
MemoryPersistence persistence = new MemoryPersistence();
try {
final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
System.out.println("Connecting to broker: " + broker);
/**
* 计算签名,将签名作为MQTT的password。
* 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
* 第二个参数阿里云的SecretKey
*/
sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
connOpts.setUserName(acessKey);
connOpts.setServerURIs(new String[] { broker });
connOpts.setPassword(sign.toCharArray());
connOpts.setCleanSession(false);
connOpts.setKeepAliveInterval(100);
sampleClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
System.out.println("mqtt connection lost");
throwable.printStackTrace();
while(!sampleClient.isConnected()){
try {
sampleClient.connect(connOpts);
} catch (MqttException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
}
});
sampleClient.connect(connOpts);
for (int i = 0; i < 10; i++) {
try {
String scontent = new Date()+"MQTT Test body" + i;
final MqttMessage message = new MqttMessage(scontent.getBytes());
message.setQos(0);
System.out.println(i+" pushed at "+new Date()+" "+ scontent);
/**
*消息发送到某个主题Topic,所有订阅这个Topic的设备都能收到这个消息。
* 遵循MQTT的发布订阅规范,Topic也可以是多级Topic。此处设置了发送到二级Topic
*/
sampleClient.publish(topic+"/notice/", message);
/**
* 如果发送P2P消息,二级Topic必须是“p2p”,三级Topic是目标的ClientID
* 此处设置的三级Topic需要是接收方的ClientID
*/
String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
sampleClient.publish(p2pTopic,message);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception me) {
me.printStackTrace();
}
}
}
4. MQTT 接收消息
本段代码演示如何使用 MQTT 客户端订阅消息,接收普通的消息以及点对点消息。
public class MQTTRecvMsg {
public static void main(String[] args) throws IOException {
/**
* 设置MQTT的接入点,请根据应用所在环境选择合适的Region,不支持跨Region访问
*/
final String broker ="tcp://mqtt-test.cn-qingdao.aliyuncs.com:1883";
/**
* 设置阿里云的AccessKey,用于鉴权
*/
final String acessKey ="XXXXXX";
/**
* 设置阿里云的SecretKey,用于鉴权
*/
final String secretKey ="XXXXXXX";
/**
* 发消息使用的一级Topic,需要先在MQ控制台里申请
*/
final String topic ="XXXX";
/**
* MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
* 其中GroupID在MQ控制台里申请
* DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
*/
final String clientId ="GID_XXXX@@@ClientID_XXXXXX";
String sign;
MemoryPersistence persistence = new MemoryPersistence();
try {
final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
System.out.println("Connecting to broker: " + broker);
/**
* 计算签名,将签名作为MQTT的password
* 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
* 第二个参数阿里云的SecretKey
*/
sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
/**
* 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息不需要显式订阅
*/
final String[] topicFilters=new String[]{topic+"/notice/"};
final int[]qos={0};
connOpts.setUserName(acessKey);
connOpts.setServerURIs(new String[] { broker });
connOpts.setPassword(sign.toCharArray());
connOpts.setCleanSession(false);
connOpts.setKeepAliveInterval(100);
sampleClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
System.out.println("mqtt connection lost");
throwable.printStackTrace();
while(!sampleClient.isConnected()){
try {
sampleClient.connect(connOpts);
sampleClient.subscribe(topicFilters,qos);
} catch (MqttException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
}
});
sampleClient.connect(connOpts);
sampleClient.subscribe(topicFilters,qos);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception me) {
me.printStackTrace();
}
}
}
上文代码用到的工具类 MacSignature.java 如下:
public class MacSignature {
/**
* @param text 要签名的文本
* @param secretKey 阿里云MQ SecretKey
* @return 加密后的字符串
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
Charset charset = Charset.forName("UTF-8");
String algorithm = "HmacSHA1";
Mac mac = Mac.getInstance(algorithm);
mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
byte[] bytes = mac.doFinal(text.getBytes(charset));
return new String(Base64.encodeBase64(bytes), charset);
}
/**
* 发送方签名方法
*
* @param clientId MQTT ClientID
* @param secretKey 阿里云MQ SecretKey
* @return 加密后的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
return macSignature(clientId, secretKey);
}
/**
* 订阅方签名方法
*
* @param topics 要订阅的Topic集合
* @param clientId MQTT ClientID
* @param secretKey 阿里云MQ SecretKey
* @return 加密后的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
Collections.sort(topics); //以字典顺序排序
String topicText = "";
for (String topic : topics) {
topicText += topic + "n";
}
String text = topicText + clientId;
return macSignature(text, secretKey);
}
/**
* 订阅方签名方法
*
* @param topic 要订阅的Topic
* @param clientId MQTT ClientID
* @param secretKey 阿里云MQ SecretKey
* @return 加密后的字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
List<String> topics = new ArrayList<String>();
topics.add(topic);
return subSignature(topics, clientId, secretKey);
}
}
最后更新:2016-11-29 09:51:55
上一篇:
MQ 客户端收发 MQTT 消息__Java 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
下一篇:
SSL 方式接入示例__Java 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
院士潘建伟:阿里云与中科院联合发布量子计算云平台,计算能力有望跃升100万倍
创建灾备实例__扩展实例_快速入门(MySQL)_云数据库 RDS 版-阿里云
日志服务监控__云服务监控_用户指南_云监控-阿里云
8.3 在BI工具中连接和使用分析型数据库__第八章 在生产中使用分析型数据库_使用手册_分析型数据库-阿里云
BI示例教程__数加体验馆_数加平台介绍-阿里云
请求签名机制__调用方式_API使用手册_消息服务-阿里云
从这里开始__快速入门_数据管理-阿里云
步骤3:管理证书__快速入门_证书服务-阿里云
SQL脚本开发__Intelij 开发插件_工具_大数据计算服务-阿里云
E-MapReduce 快速开始__创建 E-MapReduce_快速入门_E-MapReduce-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云