阅读765 返回首页    go 微信


SDK快速入门__数据订阅_用户指南_数据传输-阿里云

这一节,您将学到如何用DTS Java SDK完成一些基本的操作。

初始化RegionContext

RegionContext 主要用于保存设置安全认证信息及访问网络模式设置。下面代码显示如何初始化RegionContext,设置安全认证凭证及网络访问模式。

  1. import java.util.List;
  2. import com.aliyun.drc.clusterclient.RegionContext;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. public class MainClass
  6. {
  7. public static void main(String[] args) throws Exception {
  8. // 创建一个RegionContext
  9. RegionContext context = new RegionContext();
  10. // 配置阿里云账号的AccessKey及AccessKeySecret
  11. context.setAccessKey("<AccessKey>");
  12. context.setSecret("<AccessKeySecret>");
  13. // 运行SDK的服务器是否使用公网IP连接DTS订阅通道
  14. context.setUsePublicIp(true);
  15. // 下面为其他调用代码 ……
  16. …………
  17. }
  18. }

如果要使用SDK,必须先初始化RegionContext,配置连接订阅通道的安全认证等信息。上面的接口setAccessKey设置的是阿里云账号的AccessKey
setSecret 设置的是阿里云账号的AccessKeySecret
AccessKey及AccessKeySecret是由阿里云的系统直接分配给用户的,称为ID对,用户标识用户,可到阿里云用户中心创建获取。
setUsePublicIp 是告诉DTS,您本地SDK运行服务器是否用公网IP连接订阅通道。如果设置为true,那么订阅数据流走公网,否则走内网。

初始化ClusterClient

SDK连接订阅通道,接受增量数据等操作都是通过类ClusterClient来完成的,下面代码创建了一个ClusterClient

  1. import java.util.List;
  2. import com.aliyun.drc.clusterclient.ClusterClient;
  3. import com.aliyun.drc.clusterclient.DefaultClusterClient;
  4. import com.aliyun.drc.clusterclient.RegionContext;
  5. public class MainClass
  6. {
  7. public static void main(String[] args) throws Exception {
  8. // 创建一个RegionContext
  9. RegionContext context = new RegionContext();
  10. context.setAccessKey("<AccessKey>");
  11. context.setSecret("<AccessKeySecret>");
  12. context.setUsePublicIp(true);
  13. // 创建订阅消费者
  14. final ClusterClient client = new DefaultClusterClient(context);
  15. // 下面是一些其他调用代码
  16. ……………
  17. }
  18. }

初始化Listener

消费数据的功能通过类Listener来实现。初始化完ClusterClient,需要添加listener,Listener要定义notify函数来接受订阅数据并进行数据消费。下面的代码中实现了最简单的消费逻辑,将订阅到的数据打印到屏幕。

  1. import com.aliyun.drc.clusterclient.ClusterClient;
  2. import com.aliyun.drc.clusterclient.ClusterListener;
  3. import com.aliyun.drc.clusterclient.DefaultClusterClient;
  4. import com.aliyun.drc.clusterclient.RegionContext;
  5. import com.aliyun.drc.clusterclient.message.ClusterMessage;
  6. public class MainClass
  7. {
  8. public static void main(String[] args) throws Exception {
  9. // 初始化一个RegionContext对象
  10. ………
  11. //初始化一个ClusterClient对象
  12. ………
  13. ClusterListener listener = new ClusterListener(){
  14. @Override
  15. public void notify(List<ClusterMessage> messages) throws Exception {
  16. for (ClusterMessage message : messages) {
  17. //打印订阅到的增量数据
  18. System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
  19. + message.getRecord().getOpt());
  20. //消费完数据后向DTS汇报ACK,必须调用
  21. message.ackAsConsumed();
  22. }
  23. }
  24. }
  25. }

DTS实现了SDK的数据消费时间点保存到DTS服务端的机制,简化用户使用SDK时,实现SDK容灾的复杂度。
上面示例代码中的 askAsConsumed()接口就是将SDK消费的最新一条数据的位点及时间戳汇报给DTS服务端。汇报了时间戳信息,如果SDK意外宕机重启后,会自动从DTS服务端获取这个消费时间点,然后从这个时间点重启,解决数据重复问题。

启动ClusterClient

  1. import java.util.List;
  2. import com.aliyun.drc.clusterclient.ClusterClient;
  3. import com.aliyun.drc.clusterclient.ClusterListener;
  4. import com.aliyun.drc.clusterclient.DefaultClusterClient;
  5. import com.aliyun.drc.clusterclient.RegionContext;
  6. import com.aliyun.drc.clusterclient.message.ClusterMessage;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class MainClass
  10. {
  11. public static void main(String[] args) throws Exception {
  12. //初始化RegionContext
  13. …………
  14. //初始化ClusterClient
  15. …………
  16. //初始化ClusterListener
  17. …………
  18. // 添加监听者
  19. client.addConcurrentListener(listener);
  20. // 设置请求的订阅通道ID
  21. client.askForGUID("dts_rdsrjiei2u2afnb_DSF");
  22. // 启动后台线程, 注意这里不会阻塞, 主线程不能退出
  23. client.start();
  24. }

上面代码中接口askForGUID设置这个client需要请求的订阅通道ID。这个订阅通道ID从DTS控制台上获取。一旦配置了订阅通道ID,那么这个SDK就能获取这个订阅通道中的增量数据。
在启动client之前,需要将监听者listener添加到client中,这样当client从订阅通道中拉取到增量数据时,会同步回调用listener的notify方法开始进行数据消费。

最后更新:2016-11-23 16:03:55

  上一篇:go SDK接口简介__数据订阅_用户指南_数据传输-阿里云
  下一篇:go 订阅数据SQL封装简介__数据订阅_用户指南_数据传输-阿里云