閱讀910 返回首頁    go 微信


發送普通消息(三種方式)__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲

MQ 發送普通消息有三種實現方式:可靠同步發送、可靠異步發送、單向(Oneway)發送。本文介紹了每種實現的原理、使用場景以及三種實現的異同,同時提供了代碼示例以供參考。

  • 可靠同步發送

    原理:同步發送是指消息發送方發出數據後,會在收到接收方發回響應之後才發下一個數據包的通訊方式。sync-send

應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信係統等。

  • 可靠異步發送

    原理:異步發送是指發送方發出數據後,不等接收方發回響應,接著發送下個數據包的通訊方式。MQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback),在執行消息的異步發送時,應用不需要等待服務器響應即可直接返回,通過回調接口接收務器響應,並對服務器的響應結果進行處理。

    async_send

    應用場景:異步發送一般用於鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如用戶視頻上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。

  • 單向(Oneway)發送

    原理:單向(Oneway)發送特點為隻負責發送消息,不等待服務器回應且沒有回調函數觸發,即隻發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。

    oneway

    應用場景:適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。

下表概括了三者的特點和主要區別。

發送TPS 發送結果反饋 可靠性
同步發送 不丟失
異步發送 不丟失
單向發送 最快 可能丟失

示例代碼

同步發送

  1. public class ProducerTest {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. properties.put(PropertyKeyConst.ProducerId, "XXX");//您在控製台創建的Producer ID
  5. properties.put(PropertyKeyConst.AccessKey,"XXX");// AccessKey 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
  6. properties.put(PropertyKeyConst.SecretKey, "XXX");// SecretKey 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
  7. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");//設置發送超時時間,單位毫秒
  8. //公有雲生產環境:https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  9. //公有雲公測環境:https://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
  10. //杭州金融雲環境:https://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  11. //杭州深圳雲環境:https://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
  12. properties.put(PropertyKeyConst.ONSAddr,
  13. "https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");//此處以公有雲生產環境為例
  14. Producer producer = ONSFactory.createProducer(properties);
  15. // 在發送消息前,必須調用start方法來啟動Producer,隻需調用一次即可。
  16. producer.start();
  17. //循環發送消息
  18. for (int i = 0; i < 100; i++){
  19. Message msg = new Message( //
  20. // Message Topic
  21. "TopicTestMQ",
  22. // Message Tag 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
  23. "TagA",
  24. // Message Body 可以是任何二進製形式的數據, MQ不做任何幹預,
  25. // 需要Producer與Consumer協商好一致的序列化和反序列化方式
  26. "Hello MQ".getBytes());
  27. // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
  28. // 以方便您在無法正常收到消息情況下,可通過阿裏雲服務器管理控製台查詢消息並補發。
  29. // 注意:不設置也不會影響消息正常收發
  30. msg.setKey("ORDERID_" + i);
  31. // 同步發送消息,隻要不拋異常就是成功
  32. SendResult sendResult = producer.send(msg);
  33. System.out.println(sendResult);
  34. }
  35. // 在應用退出前,銷毀Producer對象
  36. // 注意:如果不銷毀也沒有問題
  37. producer.shutdown();
  38. }
  39. }

異步發送

  1. public static void main(String[] args) {
  2. Properties properties = new Properties();
  3. properties.put(PropertyKeyConst.AccessKey, "DEMO_AK");// AccessKey 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
  4. properties.put(PropertyKeyConst.SecretKey, "DEMO_SK");// SecretKey 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
  5. properties.put(PropertyKeyConst.ProducerId, "DEMO_PID");//您在控製台創建的Producer ID
  6. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");//設置發送超時時間,單位毫秒
  7. Producer producer = ONSFactory.createProducer(properties);
  8. // 在發送消息前,必須調用start方法來啟動Producer,隻需調用一次即可。
  9. producer.start();
  10. Message msg = new Message(
  11. // Message Topic
  12. "TopicTestMQ",
  13. // Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
  14. "TagA",
  15. // Message Body,任何二進製形式的數據,MQ不做任何幹預,需要Producer與Consumer協商好一致的序列化和反序列化方式
  16. "Hello MQ".getBytes());
  17. // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。以方便您在無法正常收到消息情況下,可通過MQ控製台查詢消息並補發。
  18. // 注意:不設置也不會影響消息正常收發
  19. msg.setKey("ORDERID_100");
  20. // 異步發送消息, 發送結果通過callback返回給客戶端。
  21. producer.sendAsync(msg, new SendCallback() {
  22. @Override
  23. public void onSuccess(final SendResult sendResult) {
  24. // 消費發送成功
  25. System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgpln"> + sendResult.getMessageId());
  26. }
  27. @Override
  28. public void onException(OnExceptionContext context) {
  29. // 消息發送失敗
  30. System.out.println("send message failed. topic=" + context.getTopic() + ", msgpln"> + context.getMessageId());
  31. }
  32. });
  33. // 在callback返回之前即可取得msgId。
  34. System.out.println("send message async. topic=" + msg.getTopic() + ", msgpln"> + msg.getMsgID());
  35. // 在應用退出前,銷毀Producer對象。注意:如果不銷毀也沒有問題
  36. producer.shutdown();
  37. }

單向(Oneway)發送

  1. public static void main(String[] args) {
  2. Properties properties = new Properties();
  3. properties.put(PropertyKeyConst.AccessKey, "DEMO_AK");// AccessKey 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
  4. properties.put(PropertyKeyConst.SecretKey, "DEMO_SK");// SecretKey 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
  5. properties.put(PropertyKeyConst.ProducerId, "DEMO_PID");//您在控製台創建的Producer ID
  6. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");//設置發送超時時間,單位毫秒
  7. Producer producer = ONSFactory.createProducer(properties);
  8. // 在發送消息前,必須調用start方法來啟動Producer,隻需調用一次即可。
  9. producer.start();
  10. //循環發送消息
  11. for (int i = 0; i < 100; i++){
  12. Message msg = new Message(
  13. // Message Topic
  14. "TopicTestMQ",
  15. // Message Tag,
  16. // 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
  17. "TagA",
  18. // Message Body
  19. // 任何二進製形式的數據,MQ不做任何幹預,需要Producer與Consumer協商好一致的序列化和反序列化方式
  20. "Hello MQ".getBytes());
  21. // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
  22. // 以方便您在無法正常收到消息情況下,可通過阿裏雲服務器管理控製台查詢消息並補發。
  23. // 注意:不設置也不會影響消息正常收發
  24. msg.setKey("ORDERID_" + i);
  25. // oneway發送消息,隻要不拋異常就是成功
  26. producer.sendOneway(msg);
  27. }
  28. // 在應用退出前,銷毀Producer對象
  29. // 注意:如果不銷毀也沒有問題
  30. producer.shutdown();
  31. }

最後更新:2016-11-23 16:04:04

  上一篇:go 日誌配置__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
  下一篇:go 發送事務消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲