閱讀585 返回首頁    go 微信


事務消息__最佳實踐_消息服務-阿裏雲

背景描述

有時候我們需要實現本地操作和消息發送的事務一致性功能。即:消息發送成功,則本地操作成功;反之,如果消息發送失敗,本地操作失敗(成功也需要rollback)。保證不出現操作成功但消息發送失敗;或者操作失敗但消息發送成功的情況;

另外,消費端,我們也希望消息一定被成功處理一次,不會因為消息端程序崩潰而導致消息沒有成功處理,進而需要人工重置消費進度。

解決方案

利用消息服務MNS的延遲消息功能來實現。

準備工作

創建兩個隊列:

1.事務消息隊列

消息的有效期小於消息延遲時間。即如果生產者不主動修改(提交)消息可見時間,消息對消費者不可見;

2.操作日誌隊列

記錄事務消息的操作記錄信息。消息延遲時間為事務操作超時時間。日誌隊列中的消息確認(刪除)後將對消費者不可見。

具體步驟

1.發送一條事務準備消息到事務消息隊列;

2.寫操作日誌信息到操作日誌隊列,日誌中包含步驟1消息的消息句柄;

3.執行本地事務操作;

4.如果步驟3成功,提交消息(消息對消費者可見);反之,回滾消息;

5.確認步驟2中的操作日誌(刪除該日誌消息);

6.步驟4後,消費者可以接收到事務消息;

7.消費者處理消息;

8.消費者確認刪除消息;

如下圖:

異常分析:

生產者異常(例如:進程重啟):

A.讀取操作日誌隊列超時未確認日誌

B.檢查事務結果

C.如果檢查得到事務已經成功,則提交消息(重複提交無副作用,同一句柄的消息隻能成功提交一次)

D.確認操作日誌

消費者異常(例如:進程重啟):

消息服務提供至少保證消費一次的特性,隻要步驟8不成功,消息在一段時間後可以繼續可見,被當前消費者或者其他消費者處理。

消息服務不可達(例如:斷網)

消息發送和接收處理狀態以及操作日誌都在消息服務端,消息服務本身具備高可靠和高可用的特點,所以隻要網絡恢複,事務可以繼續,能保證隻要生產者:操作成功,則消費者一定能夠拿到消息並處理成功;或操作失敗, 則消費者收不到消息的最終一致性。

事務圖片

代碼實現:

MNS最新的Java SDK(1.1.5)中的TransactionQueue支持上述事務消息方案。使用者隻需要在TransactionOperations和TransactionChecker 兩個接口添加業務操作和檢查邏輯,就可以方便的實現事務消息。

Demo 代碼

  1. public class TransactionMessageDemo{
  2. public class MyTransactionChecker implements TransactionChecker
  3. {
  4. public boolean checkTransactionStatus(Message message)
  5. {
  6. boolean checkResult = false;
  7. String messageHandler = message.getReceiptHandle();
  8. try{
  9. //TODO: check if the messageHandler related transaction is success.
  10. checkResult = true;
  11. }catch(Exception e)
  12. {
  13. checkResult = false;
  14. }
  15. return checkResult;
  16. }
  17. }
  18. public class MyTransactionOperations implements TransactionOperations
  19. {
  20. public boolean doTransaction(Message message)
  21. {
  22. boolean transactionResult = false;
  23. String messageHandler = message.getReceiptHandle();
  24. String messageBody = message.getMessageBody();
  25. try{
  26. //TODO: do your local transaction according to the messageHandler and messageBody here.
  27. transactionResult = true;
  28. }catch(Exception e)
  29. {
  30. transactionResult = false;
  31. }
  32. return transactionResult;
  33. }
  34. }
  35. public static void main(String[] args) {
  36. System.out.println("Start TransactionMessageDemo");
  37. String transQueueName = "transQueueName";
  38. String accessKeyId = ServiceSettings.getMNSAccessKeyId();
  39. String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
  40. String endpoint = ServiceSettings.getMNSAccountEndpoint();
  41. CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
  42. MNSClient client = account.getMNSClient(); //this client need only initialize once
  43. // create queue for transaction queue.
  44. QueueMeta queueMeta = new QueueMeta();
  45. queueMeta.setQueueName(transQueueName);
  46. queueMeta.setPollingWaitSeconds(15);
  47. TransactionMessageDemo demo = new TransactionMessageDemo();
  48. TransactionChecker transChecker = demo.new MyTransactionChecker();
  49. TransactionOperations transOperations = demo.new MyTransactionOperations();
  50. TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);
  51. // do transaction.
  52. Message msg = new Message();
  53. String messageBody = "TransactionMessageDemo";
  54. msg.setMessageBody(messageBody);
  55. transQueue.sendTransMessage(msg, transOperations);
  56. // delete queue and close client if we won't use them.
  57. transQueue.delete();
  58. // close the client at the end.
  59. client.close();
  60. System.out.println("End TransactionMessageDemo");
  61. }
  62. }

最後更新:2016-11-23 16:04:12

  上一篇:go 超大消息傳輸__最佳實踐_消息服務-阿裏雲
  下一篇:go 日誌查詢工具__開發者工具_消息服務-阿裏雲