48
windows
發送事務消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
目前支持的域包括公網測試、華東1、華北2、華東2、華南1。
MQ事務消息交互流程如下:
發送事務消息包含以下兩個步驟:
發送半消息及執行本地事務
package com.alibaba.webx.TryHsf.app1;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class TransactionProducerClient {
private final static Logger log = ClientLogger.getLog(); // 用戶需要設置自己的log, 記錄日誌便於排查問題
public static void main(String[] args) throws InterruptedException {
final BusinessService businessService = new BusinessService(); // 本地業務Service
Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, ""); // 您在控製台創建的Producer ID
properties.put(PropertyKeyConst.AccessKey, ""); // 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
properties.put(PropertyKeyConst.SecretKey, ""); // 阿裏雲身份驗證,在阿裏雲服務器管理控製台創建
//公有雲生產環境:https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//公有雲公測環境:https://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
//杭州金融雲環境:https://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//杭州深圳雲環境:https://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
properties.put(PropertyKeyConst.ONSAddr,
"https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");//此處以公有雲生產環境為例
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());
producer.start();
Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
// 輸入您在控製台創建的Topic
SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
// 消息ID(有可能消息體一樣,但消息ID不一樣, 當前消息ID在控製台無法查詢)
String msgId = msg.getMsgID();
// 消息體內容進行crc32, 也可以使用其它的如MD5
long crc32Id = HashUtil.crc32Code(msg.getBody());
// 消息ID和crc32id主要是用來防止消息重複
// 如果業務本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來做冪等
// 如果要求消息絕對不重複, 推薦做法是對消息體body使用crc32或md5來防止重複消息
Object businessServiceArgs = new Object();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit =
businessService.execbusinessService(businessServiceArgs);
if (isCommit) {
// 本地事務成功、提交消息
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// 本地事務失敗、回滾消息
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
log.error("Message Id:{}", msgId, e);
}
System.out.println(msg.getMsgID());
log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
return transactionStatus;
}
}, null);
// demo example 防止進程退出(實際使用不需要這樣)
TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
}
}
提交事務消息狀態
當本地事務執行完成(執行成功或執行失敗),需要通知服務器當前消息的事務狀態。通知方式有以下兩種:
- 執行本地事務完成後提交
- 執行本地事務一直沒提交狀態,等待服務器回查消息的事務狀態
事務狀態有以下三種:
- TransactionStatus.CommitTransaction 提交事務,允許訂閱方消費該消息
- TransactionStatus.RollbackTransaction 回滾事務,消息將被丟棄不允許消費
- TransactionStatus.Unknow 無法判斷狀態,期待MQ Broker向發送方再次詢問該消息對應的本地事務的狀態
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
private final static Logger log = ClientLogger.getLog();
final BusinessService businessService = new BusinessService();
@Override
public TransactionStatus check(Message msg) {
//消息ID(有可能消息體一樣,但消息ID不一樣, 當前消息屬於Half 消息,所以消息ID在控製台無法查詢)
String msgId = msg.getMsgID();
//消息體內容進行crc32, 也可以使用其它的方法如MD5
long crc32Id = HashUtil.crc32Code(msg.getBody());
//消息ID、消息本 crc32Id主要是用來防止消息重複
//如果業務本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來做冪等
//如果要求消息絕對不重複, 推薦做法是對消息體使用crc32或md5來防止重複消息.
//業務自己的參數對象, 這裏隻是一個示例, 實際需要用戶根據情況來處理
Object businessServiceArgs = new Object();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
if (isCommit) {
//本地事務已成功、提交消息
transactionStatus = TransactionStatus.CommitTransaction;
} else {
//本地事務已失敗、回滾消息
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
log.error("Message Id:{}", msgId, e);
}
log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
return transactionStatus;
}
}
工具類
import java.util.zip.CRC32;
public class HashUtil {
public static long crc32Code(byte[] bytes) {
CRC32 crc32 = new CRC32();
crc32.update(bytes);
return crc32.getValue();
}
}
事務回查機製說明
發送事務消息為什麼必須要實現回查Check機製?
當步驟(1)中Half消息發送完成,但本地事務返回狀態為TransactionStatus.Unknow時,或者應用退出導致本地事務未提交任何狀態時,從MQ Broker的角度看,這條Half狀態的消息的狀態是未知的,因此MQ Broker會定期要求發送方能Check該Half狀態消息,並上報其最終狀態。
Check被回調時,業務邏輯都需要做些什麼?
MQ事務消息的check方法裏麵,應該寫一些檢查事務一致性的邏輯。MQ發送事務消息時需要實現LocalTransactionChecker接口,用來處理MQ Broker主動發起的本地事務狀態回查請求;因此在事務消息的Check方法中,需要完成兩件事情:
(1) 檢查該Half消息對應的本地事務的狀態(commited or rollback)
(2) 向MQ Broker提交該Half消息本地事務的狀態
最後更新:2016-11-23 18:53:59
上一篇:
發送普通消息(三種方式)__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
下一篇:
發送延時消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
下拉提示__應用高級配置_產品使用手冊_開放搜索-阿裏雲
SnatEntrySetType__數據類型_API參考_專有網絡 VPC-阿裏雲
APP概覽列表__APP相關_API 列表_OpenAPI 2.0_移動推送-阿裏雲
1.2 阿裏雲企業郵箱-管理篇__雲郵箱快速開始_阿裏雲郵箱 體驗_體驗館-阿裏雲
阿裏雲雲服務器 ECS曆史
頁麵如何解除屏蔽?__站點檢測_產品常見問題_阿裏綠網-阿裏雲
雲服務器 ECS使用鏡像創建實例
新增服務器__服務器管理_快速入門_數據管理-阿裏雲
馬雲叫你來捉妖,阿裏雲棲大會本周三正式召開了,去年馬雲提出新零售催生了三江購物,今年呢?
怎樣授權一個子用戶管理兩台指定的ECS實例___雲服務器(ECS)授權問題_授權常見問題_訪問控製-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲