765
微信
SDK快速入门__数据订阅_用户指南_数据传输-阿里云
这一节,您将学到如何用DTS Java SDK完成一些基本的操作。
初始化RegionContext
RegionContext 主要用于保存设置安全认证信息及访问网络模式设置。下面代码显示如何初始化RegionContext,设置安全认证凭证及网络访问模式。
import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 创建一个RegionContext
RegionContext context = new RegionContext();
// 配置阿里云账号的AccessKey及AccessKeySecret
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
// 运行SDK的服务器是否使用公网IP连接DTS订阅通道
context.setUsePublicIp(true);
// 下面为其他调用代码 ……
…………
}
}
如果要使用SDK,必须先初始化RegionContext,配置连接订阅通道的安全认证等信息。上面的接口setAccessKey设置的是阿里云账号的AccessKey
setSecret 设置的是阿里云账号的AccessKeySecret
AccessKey及AccessKeySecret是由阿里云的系统直接分配给用户的,称为ID对,用户标识用户,可到阿里云用户中心创建获取。
setUsePublicIp 是告诉DTS,您本地SDK运行服务器是否用公网IP连接订阅通道。如果设置为true,那么订阅数据流走公网,否则走内网。
初始化ClusterClient
SDK连接订阅通道,接受增量数据等操作都是通过类ClusterClient来完成的,下面代码创建了一个ClusterClient
import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 创建一个RegionContext
RegionContext context = new RegionContext();
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
context.setUsePublicIp(true);
// 创建订阅消费者
final ClusterClient client = new DefaultClusterClient(context);
// 下面是一些其他调用代码
……………
}
}
初始化Listener
消费数据的功能通过类Listener来实现。初始化完ClusterClient,需要添加listener,Listener要定义notify函数来接受订阅数据并进行数据消费。下面的代码中实现了最简单的消费逻辑,将订阅到的数据打印到屏幕。
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 初始化一个RegionContext对象
………
//初始化一个ClusterClient对象
………
ClusterListener listener = new ClusterListener(){
@Override
public void notify(List<ClusterMessage> messages) throws Exception {
for (ClusterMessage message : messages) {
//打印订阅到的增量数据
System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
+ message.getRecord().getOpt());
//消费完数据后向DTS汇报ACK,必须调用
message.ackAsConsumed();
}
}
}
}
DTS实现了SDK的数据消费时间点保存到DTS服务端的机制,简化用户使用SDK时,实现SDK容灾的复杂度。
上面示例代码中的 askAsConsumed()接口就是将SDK消费的最新一条数据的位点及时间戳汇报给DTS服务端。汇报了时间戳信息,如果SDK意外宕机重启后,会自动从DTS服务端获取这个消费时间点,然后从这个时间点重启,解决数据重复问题。
启动ClusterClient
import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
//初始化RegionContext
…………
//初始化ClusterClient
…………
//初始化ClusterListener
…………
// 添加监听者
client.addConcurrentListener(listener);
// 设置请求的订阅通道ID
client.askForGUID("dts_rdsrjiei2u2afnb_DSF");
// 启动后台线程, 注意这里不会阻塞, 主线程不能退出
client.start();
}
上面代码中接口askForGUID设置这个client需要请求的订阅通道ID。这个订阅通道ID从DTS控制台上获取。一旦配置了订阅通道ID,那么这个SDK就能获取这个订阅通道中的增量数据。
在启动client之前,需要将监听者listener添加到client中,这样当client从订阅通道中拉取到增量数据时,会同步回调用listener的notify方法开始进行数据消费。
最后更新:2016-11-23 16:03:55
上一篇:
SDK接口简介__数据订阅_用户指南_数据传输-阿里云
下一篇:
订阅数据SQL封装简介__数据订阅_用户指南_数据传输-阿里云
基本介绍__运维中心手册_用户操作指南_大数据开发套件-阿里云
高防CNAME接入流程__运维问题_产品常见问题_DDoS 高防IP-阿里云
邮件推送常见退信代码___使用问题_邮件推送-阿里云
想给马云当学生吗?阿里云大数据学院落户青岛!北方第一所,5年培养5000名人才…
定位及解决 HSF 问题__HSF 常见问题_开发常见问题_产品常见问题_企业级分布式应用服务 EDAS-阿里云
EDAS 账号合并计费说明___服务条款和价格说明_企业级分布式应用服务 EDAS-阿里云
扩容多分区磁盘__扩容磁盘概览_磁盘_用户指南_云服务器 ECS-阿里云
1.6 新增用户和授权__快速入门_分析型数据库-阿里云
调整集群规模__集群_API参考_E-MapReduce-阿里云
测试域名解析是否生效的方法__解析生效_产品使用问题_云解析-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云