292
阿里云
广播拉取消息模型__最佳实践_消息服务-阿里云
背景描述
阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型。其中队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull)模式;主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push)模式。上面两种模型基本能满足我们大多数应用场景。
推送模式的好处是即时性能比较好,但是需要暴露客户端地址来接收服务端的消息推送。有些情况下,比如企业内网,我们无法暴露推送地址,希望改用拉取(Pull)的方式。虽然MNS不直接提供这种消费模型,但是我们可以结合主题和队列来实现一对多的拉取消息消费模型。具体方案如下:
解决方案
让主题将消息先推送到队列,然后由消费者从队列拉取消息。这样既可以做到1对多的广播消息,又不需要暴露消费者的地址;如下图所示:
接口说明
最新的Java SDK(1.1.5)中的CloudPullTopic 默认支持上述解决方案。其中MNSClient 提供下面两个接口来快速创建CloudPullTopic:
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
其中,TopicMeta 是创建topic的meta 设置, queueNameList里指定topic消息推送的队列名列表;needCreateQueue表明queueNameList是否需要创建;queueMetaTemplate是创建queue需要的queue meta 参数设置;
Demo代码
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
MNSClient client = account.getMNSClient();
// build consumer name list.
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
try{
//producer code:
// create pull topic which will send message to 3 queues for consumer.
String topicName = "demo-topic-for-pull";
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);
//publish message and consume message.
String messageBody = "broadcast message to all the consumers:hello the world.";
// if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly.
TopicMessage tMessage = new RawTopicMessage();
tMessage.setBaseMessageBody(messageBody);
pullTopic.publishMessage(tMessage);
// consumer code:
//3 consumers receive the message.
CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
Message consumer1Msg = queueForConsumer1.popMessage(30);
if(consumer1Msg != null)
{
System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
Message consumer2Msg = queueForConsumer2.popMessage(30);
if(consumer2Msg != null)
{
System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
Message consumer3Msg = queueForConsumer3.popMessage(30);
if(consumer3Msg != null)
{
System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
// delete the fullTopic.
pullTopic.delete();
}catch(ClientException ce)
{
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
}
catch(ServiceException se)
{
/*you can get more MNS service error code in following link.
https://help.aliyun.com/document_detail/mns/api_reference/error_code/error_code.html?spm=5176.docmns/api_reference/error_code/error_response
*/
se.printStackTrace();
}
client.close();
最后更新:2016-11-23 17:16:08
上一篇:
Queue操作__历史协议_API使用手册_消息服务-阿里云
下一篇:
消息处理时长自适应__最佳实践_消息服务-阿里云
缩略后填充__图片缩放_老版图片服务手册_对象存储 OSS-阿里云
编辑索引__结构管理_DMS for MongoDB_用户指南(NoSQL)_数据管理-阿里云
阿里云为西安交大带来了这个新学院:收入逆天!秒变超级英雄!
如何开通大数据开发套件__快速开始_大数据开发套件-阿里云
云服务器 ECS镜像复制FAQ
查询策略下绑定的API__流量控制相关接口_API_API 网关-阿里云
通用图像分析服务简介__通用图像分析服务_人工智能图像类-阿里云
OSS 参考使用说明__开发准备_开发人员指南_E-MapReduce-阿里云
云服务器 ECS 按量付费云盘和可用区FAQ
数据表管理__数据管理手册_用户操作指南_大数据开发套件-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云