48
阿里云
发送事务消息__Java SDK_TCP 接入(专业)_消息队列 MQ-阿里云
目前支持的域包括公网测试、华东1、华北2、华东2、华南1。
MQ事务消息交互流程如下:
发送事务消息包含以下两个步骤:
发送半消息及执行本地事务
package com.alibaba.webx.TryHsf.app1;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class TransactionProducerClient {
private final static Logger log = ClientLogger.getLog(); // 用户需要设置自己的log, 记录日志便于排查问题
public static void main(String[] args) throws InterruptedException {
final BusinessService businessService = new BusinessService(); // 本地业务Service
Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, ""); // 您在控制台创建的Producer ID
properties.put(PropertyKeyConst.AccessKey, ""); // 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, ""); // 阿里云身份验证,在阿里云服务器管理控制台创建
//公有云生产环境:https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//公有云公测环境:https://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
//杭州金融云环境:https://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//杭州深圳云环境:https://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
properties.put(PropertyKeyConst.ONSAddr,
"https://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");//此处以公有云生产环境为例
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());
producer.start();
Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
// 输入您在控制台创建的Topic
SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
// 消息ID(有可能消息体一样,但消息ID不一样, 当前消息ID在控制台无法查询)
String msgId = msg.getMsgID();
// 消息体内容进行crc32, 也可以使用其它的如MD5
long crc32Id = HashUtil.crc32Code(msg.getBody());
// 消息ID和crc32id主要是用来防止消息重复
// 如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
// 如果要求消息绝对不重复, 推荐做法是对消息体body使用crc32或md5来防止重复消息
Object businessServiceArgs = new Object();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit =
businessService.execbusinessService(businessServiceArgs);
if (isCommit) {
// 本地事务成功、提交消息
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// 本地事务失败、回滚消息
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
log.error("Message Id:{}", msgId, e);
}
System.out.println(msg.getMsgID());
log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
return transactionStatus;
}
}, null);
// demo example 防止进程退出(实际使用不需要这样)
TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
}
}
提交事务消息状态
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
- 执行本地事务完成后提交
- 执行本地事务一直没提交状态,等待服务器回查消息的事务状态
事务状态有以下三种:
- TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息
- TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费
- TransactionStatus.Unknow 无法判断状态,期待MQ Broker向发送方再次询问该消息对应的本地事务的状态
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
private final static Logger log = ClientLogger.getLog();
final BusinessService businessService = new BusinessService();
@Override
public TransactionStatus check(Message msg) {
//消息ID(有可能消息体一样,但消息ID不一样, 当前消息属于Half 消息,所以消息ID在控制台无法查询)
String msgId = msg.getMsgID();
//消息体内容进行crc32, 也可以使用其它的方法如MD5
long crc32Id = HashUtil.crc32Code(msg.getBody());
//消息ID、消息本 crc32Id主要是用来防止消息重复
//如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
//如果要求消息绝对不重复, 推荐做法是对消息体使用crc32或md5来防止重复消息.
//业务自己的参数对象, 这里只是一个示例, 实际需要用户根据情况来处理
Object businessServiceArgs = new Object();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
if (isCommit) {
//本地事务已成功、提交消息
transactionStatus = TransactionStatus.CommitTransaction;
} else {
//本地事务已失败、回滚消息
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
log.error("Message Id:{}", msgId, e);
}
log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
return transactionStatus;
}
}
工具类
import java.util.zip.CRC32;
public class HashUtil {
public static long crc32Code(byte[] bytes) {
CRC32 crc32 = new CRC32();
crc32.update(bytes);
return crc32.getValue();
}
}
事务回查机制说明
发送事务消息为什么必须要实现回查Check机制?
当步骤(1)中Half消息发送完成,但本地事务返回状态为TransactionStatus.Unknow时,或者应用退出导致本地事务未提交任何状态时,从MQ Broker的角度看,这条Half状态的消息的状态是未知的,因此MQ Broker会定期要求发送方能Check该Half状态消息,并上报其最终状态。
Check被回调时,业务逻辑都需要做些什么?
MQ事务消息的check方法里面,应该写一些检查事务一致性的逻辑。MQ发送事务消息时需要实现LocalTransactionChecker接口,用来处理MQ Broker主动发起的本地事务状态回查请求;因此在事务消息的Check方法中,需要完成两件事情:
(1) 检查该Half消息对应的本地事务的状态(commited or rollback)
(2) 向MQ Broker提交该Half消息本地事务的状态
最后更新:2016-11-23 18:53:59
上一篇:
发送普通消息(三种方式)__Java SDK_TCP 接入(专业)_消息队列 MQ-阿里云
下一篇:
发送延时消息__Java SDK_TCP 接入(专业)_消息队列 MQ-阿里云
下拉提示__应用高级配置_产品使用手册_开放搜索-阿里云
SnatEntrySetType__数据类型_API参考_专有网络 VPC-阿里云
APP概览列表__APP相关_API 列表_OpenAPI 2.0_移动推送-阿里云
1.2 阿里云企业邮箱-管理篇__云邮箱快速开始_阿里云邮箱 体验_体验馆-阿里云
阿里云云服务器 ECS历史
页面如何解除屏蔽?__站点检测_产品常见问题_阿里绿网-阿里云
云服务器 ECS使用镜像创建实例
新增服务器__服务器管理_快速入门_数据管理-阿里云
马云叫你来捉妖,阿里云栖大会本周三正式召开了,去年马云提出新零售催生了三江购物,今年呢?
怎样授权一个子用户管理两台指定的ECS实例___云服务器(ECS)授权问题_授权常见问题_访问控制-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云