閱讀292 返回首頁    go 汽車大全


廣播拉取消息模型__最佳實踐_消息服務-阿裏雲

背景描述

阿裏雲消息服務MNS 已經提供隊列(queue)和主題(topic)兩種模型。其中隊列提供的是一對多的共享消息消費模型,采用客戶端主動拉取(Pull)模式;主題模型提供一對多的廣播消息消費模型,並且采用服務端主動推送(Push)模式。上麵兩種模型基本能滿足我們大多數應用場景。

推送模式的好處是即時性能比較好,但是需要暴露客戶端地址來接收服務端的消息推送。有些情況下,比如企業內網,我們無法暴露推送地址,希望改用拉取(Pull)的方式。雖然MNS不直接提供這種消費模型,但是我們可以結合主題和隊列來實現一對多的拉取消息消費模型。具體方案如下:

解決方案

讓主題將消息先推送到隊列,然後由消費者從隊列拉取消息。這樣既可以做到1對多的廣播消息,又不需要暴露消費者的地址;如下圖所示:

拉取

接口說明

最新的Java SDK(1.1.5)中的CloudPullTopic 默認支持上述解決方案。其中MNSClient 提供下麵兩個接口來快速創建CloudPullTopic:

  1. public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
  2. public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

其中,TopicMeta 是創建topic的meta 設置, queueNameList裏指定topic消息推送的隊列名列表;needCreateQueue表明queueNameList是否需要創建;queueMetaTemplate是創建queue需要的queue meta 參數設置;

Demo代碼

  1. CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
  2. MNSClient client = account.getMNSClient();
  3. // build consumer name list.
  4. Vector<String> consumerNameList = new Vector<String>();
  5. String consumerName1 = "consumer001";
  6. String consumerName2 = "consumer002";
  7. String consumerName3 = "consumer003";
  8. consumerNameList.add(consumerName1);
  9. consumerNameList.add(consumerName2);
  10. consumerNameList.add(consumerName3);
  11. QueueMeta queueMetaTemplate = new QueueMeta();
  12. queueMetaTemplate.setPollingWaitSeconds(30);
  13. try{
  14. //producer code:
  15. // create pull topic which will send message to 3 queues for consumer.
  16. String topicName = "demo-topic-for-pull";
  17. TopicMeta topicMeta = new TopicMeta();
  18. topicMeta.setTopicName(topicName);
  19. CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);
  20. //publish message and consume message.
  21. String messageBody = "broadcast message to all the consumers:hello the world.";
  22. // if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly.
  23. TopicMessage tMessage = new RawTopicMessage();
  24. tMessage.setBaseMessageBody(messageBody);
  25. pullTopic.publishMessage(tMessage);
  26. // consumer code:
  27. //3 consumers receive the message.
  28. CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
  29. CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
  30. CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
  31. Message consumer1Msg = queueForConsumer1.popMessage(30);
  32. if(consumer1Msg != null)
  33. {
  34. System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
  35. }else{
  36. System.out.println("the queue is empty");
  37. }
  38. Message consumer2Msg = queueForConsumer2.popMessage(30);
  39. if(consumer2Msg != null)
  40. {
  41. System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
  42. }else{
  43. System.out.println("the queue is empty");
  44. }
  45. Message consumer3Msg = queueForConsumer3.popMessage(30);
  46. if(consumer3Msg != null)
  47. {
  48. System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
  49. }else{
  50. System.out.println("the queue is empty");
  51. }
  52. // delete the fullTopic.
  53. pullTopic.delete();
  54. }catch(ClientException ce)
  55. {
  56. System.out.println("Something wrong with the network connection between client and MNS service."
  57. + "Please check your network and DNS availablity.");
  58. ce.printStackTrace();
  59. }
  60. catch(ServiceException se)
  61. {
  62. /*you can get more MNS service error code in following link.
  63. https://help.aliyun.com/document_detail/mns/api_reference/error_code/error_code.html?spm=5176.docmns/api_reference/error_code/error_response
  64. */
  65. se.printStackTrace();
  66. }
  67. client.close();

最後更新:2016-11-23 17:16:08

  上一篇:go Queue操作__曆史協議_API使用手冊_消息服務-阿裏雲
  下一篇:go 消息處理時長自適應__最佳實踐_消息服務-阿裏雲