閱讀48 返回首頁    go windows


發送事務消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲

目前支持的域包括公網測試、華東1、華北2、華東2、華南1。

MQ事務消息交互流程如下:

MQ事務消息交互流程

發送事務消息包含以下兩個步驟:

  1. 發送半消息及執行本地事務

    1. package com.alibaba.webx.TryHsf.app1;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.PropertyKeyConst;
    4. import com.aliyun.openservices.ons.api.SendResult;
    5. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    6. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    7. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    8. import java.util.Properties;
    9. import java.util.concurrent.TimeUnit;
    10. public class TransactionProducerClient {
    11. private final static Logger log = ClientLogger.getLog(); // 用戶需要設置自己的log, 記錄日誌便於排查問題
    12. public static void main(String[] args) throws InterruptedException {
    13. final BusinessService businessService = new BusinessService(); // 本地業務Service
    14. Properties properties = new Properties();
    15. properties.put(PropertyKeyConst.ProducerId, ""); // 您在控製台創建的Producer ID
    16. properties.put(PropertyKeyConst.AccessKey, ""); // 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
    17. properties.put(PropertyKeyConst.SecretKey, ""); // 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
    18. //公有雲生產環境:https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
    19. //公有雲公測環境:https://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
    20. //杭州金融雲環境:https://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
    21. //杭州深圳雲環境:https://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
    22. properties.put(PropertyKeyConst.ONSAddr,
    23. "https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");//此處以公有雲生產環境為例
    24. TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
    25. new LocalTransactionCheckerImpl());
    26. producer.start();
    27. Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
    28. // 輸入您在控製台創建的Topic
    29. SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
    30. @Override
    31. public TransactionStatus execute(Message msg, Object arg) {
    32. // 消息ID(有可能消息體一樣,但消息ID不一樣, 當前消息ID在控製台無法查詢)
    33. String msgId = msg.getMsgID();
    34. // 消息體內容進行crc32, 也可以使用其它的如MD5
    35. long crc32Id = HashUtil.crc32Code(msg.getBody());
    36. // 消息ID和crc32id主要是用來防止消息重複
    37. // 如果業務本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來做冪等
    38. // 如果要求消息絕對不重複, 推薦做法是對消息體body使用crc32或md5來防止重複消息
    39. Object businessServiceArgs = new Object();
    40. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    41. try {
    42. boolean isCommit =
    43. businessService.execbusinessService(businessServiceArgs);
    44. if (isCommit) {
    45. // 本地事務成功、提交消息
    46. transactionStatus = TransactionStatus.CommitTransaction;
    47. } else {
    48. // 本地事務失敗、回滾消息
    49. transactionStatus = TransactionStatus.RollbackTransaction;
    50. }
    51. } catch (Exception e) {
    52. log.error("Message Id:{}", msgId, e);
    53. }
    54. System.out.println(msg.getMsgID());
    55. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
    56. return transactionStatus;
    57. }
    58. }, null);
    59. // demo example 防止進程退出(實際使用不需要這樣)
    60. TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
    61. }
    62. }
  2. 提交事務消息狀態

    當本地事務執行完成(執行成功或執行失敗),需要通知服務器當前消息的事務狀態。通知方式有以下兩種:

    • 執行本地事務完成後提交
    • 執行本地事務一直沒提交狀態,等待服務器回查消息的事務狀態

    事務狀態有以下三種:

    • TransactionStatus.CommitTransaction 提交事務,允許訂閱方消費該消息
    • TransactionStatus.RollbackTransaction 回滾事務,消息將被丟棄不允許消費
    • TransactionStatus.Unknow 無法判斷狀態,期待MQ Broker向發送方再次詢問該消息對應的本地事務的狀態
    1. import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    2. public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
    3. private final static Logger log = ClientLogger.getLog();
    4. final BusinessService businessService = new BusinessService();
    5. @Override
    6. public TransactionStatus check(Message msg) {
    7. //消息ID(有可能消息體一樣,但消息ID不一樣, 當前消息屬於Half 消息,所以消息ID在控製台無法查詢)
    8. String msgId = msg.getMsgID();
    9. //消息體內容進行crc32, 也可以使用其它的方法如MD5
    10. long crc32Id = HashUtil.crc32Code(msg.getBody());
    11. //消息ID、消息本 crc32Id主要是用來防止消息重複
    12. //如果業務本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來做冪等
    13. //如果要求消息絕對不重複, 推薦做法是對消息體使用crc32或md5來防止重複消息.
    14. //業務自己的參數對象, 這裏隻是一個示例, 實際需要用戶根據情況來處理
    15. Object businessServiceArgs = new Object();
    16. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    17. try {
    18. boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
    19. if (isCommit) {
    20. //本地事務已成功、提交消息
    21. transactionStatus = TransactionStatus.CommitTransaction;
    22. } else {
    23. //本地事務已失敗、回滾消息
    24. transactionStatus = TransactionStatus.RollbackTransaction;
    25. }
    26. } catch (Exception e) {
    27. log.error("Message Id:{}", msgId, e);
    28. }
    29. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
    30. return transactionStatus;
    31. }
    32. }

工具類

  1. import java.util.zip.CRC32;
  2. public class HashUtil {
  3. public static long crc32Code(byte[] bytes) {
  4. CRC32 crc32 = new CRC32();
  5. crc32.update(bytes);
  6. return crc32.getValue();
  7. }
  8. }

事務回查機製說明

  1. 發送事務消息為什麼必須要實現回查Check機製?

    當步驟(1)中Half消息發送完成,但本地事務返回狀態為TransactionStatus.Unknow時,或者應用退出導致本地事務未提交任何狀態時,從MQ Broker的角度看,這條Half狀態的消息的狀態是未知的,因此MQ Broker會定期要求發送方能Check該Half狀態消息,並上報其最終狀態。

  2. Check被回調時,業務邏輯都需要做些什麼?

    MQ事務消息的check方法裏麵,應該寫一些檢查事務一致性的邏輯。MQ發送事務消息時需要實現LocalTransactionChecker接口,用來處理MQ Broker主動發起的本地事務狀態回查請求;因此在事務消息的Check方法中,需要完成兩件事情:

    (1) 檢查該Half消息對應的本地事務的狀態(commited or rollback)

    (2) 向MQ Broker提交該Half消息本地事務的狀態

最後更新:2016-11-23 18:53:59

  上一篇:go 發送普通消息(三種方式)__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
  下一篇:go 發送延時消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲