事務消息__最佳實踐_消息服務-阿裏雲
背景描述
有時候我們需要實現本地操作和消息發送的事務一致性功能。即:消息發送成功,則本地操作成功;反之,如果消息發送失敗,本地操作失敗(成功也需要rollback)。保證不出現操作成功但消息發送失敗;或者操作失敗但消息發送成功的情況;
另外,消費端,我們也希望消息一定被成功處理一次,不會因為消息端程序崩潰而導致消息沒有成功處理,進而需要人工重置消費進度。
解決方案
利用消息服務MNS的延遲消息功能來實現。
準備工作
創建兩個隊列:
1.事務消息隊列
消息的有效期小於消息延遲時間。即如果生產者不主動修改(提交)消息可見時間,消息對消費者不可見;
2.操作日誌隊列
記錄事務消息的操作記錄信息。消息延遲時間為事務操作超時時間。日誌隊列中的消息確認(刪除)後將對消費者不可見。
具體步驟
1.發送一條事務準備消息到事務消息隊列;
2.寫操作日誌信息到操作日誌隊列,日誌中包含步驟1消息的消息句柄;
3.執行本地事務操作;
4.如果步驟3成功,提交消息(消息對消費者可見);反之,回滾消息;
5.確認步驟2中的操作日誌(刪除該日誌消息);
6.步驟4後,消費者可以接收到事務消息;
7.消費者處理消息;
8.消費者確認刪除消息;
如下圖:
異常分析:
生產者異常(例如:進程重啟):
A.讀取操作日誌隊列超時未確認日誌
B.檢查事務結果
C.如果檢查得到事務已經成功,則提交消息(重複提交無副作用,同一句柄的消息隻能成功提交一次)
D.確認操作日誌
消費者異常(例如:進程重啟):
消息服務提供至少保證消費一次的特性,隻要步驟8不成功,消息在一段時間後可以繼續可見,被當前消費者或者其他消費者處理。
消息服務不可達(例如:斷網)
消息發送和接收處理狀態以及操作日誌都在消息服務端,消息服務本身具備高可靠和高可用的特點,所以隻要網絡恢複,事務可以繼續,能保證隻要生產者:操作成功,則消費者一定能夠拿到消息並處理成功;或操作失敗, 則消費者收不到消息的最終一致性。
代碼實現:
MNS最新的Java SDK(1.1.5)中的TransactionQueue支持上述事務消息方案。使用者隻需要在TransactionOperations和TransactionChecker 兩個接口添加業務操作和檢查邏輯,就可以方便的實現事務消息。
Demo 代碼
public class TransactionMessageDemo{
public class MyTransactionChecker implements TransactionChecker
{
public boolean checkTransactionStatus(Message message)
{
boolean checkResult = false;
String messageHandler = message.getReceiptHandle();
try{
//TODO: check if the messageHandler related transaction is success.
checkResult = true;
}catch(Exception e)
{
checkResult = false;
}
return checkResult;
}
}
public class MyTransactionOperations implements TransactionOperations
{
public boolean doTransaction(Message message)
{
boolean transactionResult = false;
String messageHandler = message.getReceiptHandle();
String messageBody = message.getMessageBody();
try{
//TODO: do your local transaction according to the messageHandler and messageBody here.
transactionResult = true;
}catch(Exception e)
{
transactionResult = false;
}
return transactionResult;
}
}
public static void main(String[] args) {
System.out.println("Start TransactionMessageDemo");
String transQueueName = "transQueueName";
String accessKeyId = ServiceSettings.getMNSAccessKeyId();
String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
String endpoint = ServiceSettings.getMNSAccountEndpoint();
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
MNSClient client = account.getMNSClient(); //this client need only initialize once
// create queue for transaction queue.
QueueMeta queueMeta = new QueueMeta();
queueMeta.setQueueName(transQueueName);
queueMeta.setPollingWaitSeconds(15);
TransactionMessageDemo demo = new TransactionMessageDemo();
TransactionChecker transChecker = demo.new MyTransactionChecker();
TransactionOperations transOperations = demo.new MyTransactionOperations();
TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);
// do transaction.
Message msg = new Message();
String messageBody = "TransactionMessageDemo";
msg.setMessageBody(messageBody);
transQueue.sendTransMessage(msg, transOperations);
// delete queue and close client if we won't use them.
transQueue.delete();
// close the client at the end.
client.close();
System.out.println("End TransactionMessageDemo");
}
}
最後更新:2016-11-23 16:04:12
上一篇:
超大消息傳輸__最佳實踐_消息服務-阿裏雲
下一篇:
日誌查詢工具__開發者工具_消息服務-阿裏雲
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲