阅读777 返回首页    go 财经资讯


SDK接口简介__数据订阅_用户指南_数据传输-阿里云

数据传输的数据订阅功能需要通过数据传输提供的SDK进行增量数据的订阅及消费。

在使用SDK消费之前,需要现在数据传输控制台创建需要订阅的RDS实例的订阅通道

当订阅通道创建完成后,使用SDK可以实时订阅订阅通道中的增量数据。目前:

  1. 数据传输只提供JAVA版本SDK,SDK下载地址
  2. 一个订阅通道只能被一个SDK消费,如果启动多个SDK连接同一个订阅通道时,只能有一个SDK进程拉取到增量数据。如果有多个下游SDK需要订阅同一个RDS的增量数据。那么需要为每个下游SDK创建一个订阅通道。

SDK中定义了多种类对象,本小节简单介绍SDK的这些类的接口定义。

RegionContex接口定义

  • setAccessKey(accessKey)

    设置安全凭证,参数为需要订阅数据的订阅通道对应的阿里云账号的AccessKey。

  • setSecret(AccessKeySecret)

    设置安全凭证,参数为阿里云账号对应的AccessKeySecret。可以到AK页面创建并获取。

  • setUsePublicIp(usePublicIp)

    配置SDK运行服务器是否使用公网订阅数据。如果通过公网订阅数据,那么参数usePublicIp参数为True,否则为False。

    数据订阅可以通过内网进行订阅,但是SDK在建立订阅连接之前需要先跟数据传输管控系统通信获取订阅通道的物理连接地址,SDK跟数据传输管控系统需要通过互联网通信,所以即使通过内网订阅数据,SDK部署服务器也需要挂载公网IP。

ClusterClient接口定义

  • void addConcurrentListener(ClusterListener arg0)

    添加下游监听者,监听者加入到一个ClusterClient中,才可以订阅订阅通道中的增量数据。

    参数ClusterListener arg0 为类ClusterListener的对象。

  • void askForGUID(String arg0)

    请求某个订阅通道的增量数据,参数String arg0 为订阅通道的ID,需要到数据传输控制台获取,如下图标识。

    获取订阅通道ID

  • List<ClusterListener> getConcurrentListeners()

    获取这个ClusterClient中的监听者列表,接口返回类型为List <ClusterListener >。

  • void start()

    启动SDK客户端,开始订阅增量数据。

  • void stop()

    停止SDK客户端,停止订阅增量数据。由于SDK中拉取数据和回调notify的是同一个线程执行的,如果notify的消费代码中有信号不可打断的功能时,那么stop函数可能不能正常关闭掉客户端。

ClusterListener接口定义

  • void notify(List<ClusterMessage> arg0)

    这个函数主要用于定义增量数据的消费,当SDK接受到数据时,会通过notify通知ClusterListner消费数据。例如示例demo的消费方式,就是将订阅数据打印到屏幕上。

    这个函数输入参数类型为:List <ClusterMessage >, 其中ClusterMessage为订阅数据存储的结构对象,具体定义详见ClusterMessage接口定义。

ClusterMessage接口定义

每个ClusterMessage保存RDS中的一个事务的数据记录,事务中的每条记录通过Record保存,本小节介绍ClusterMessage的主要接口函数。

  • Record getRecord()

    这个接口从ClusterMessage中获取一条变更记录。这个变更记录表示RDS binlog文件中的每一条记录,例如begin ,commit,update,insert等。

  • void ackAsConsumed

    为了简化下游SDK进程容灾,数据订阅服务端支持SDK的消费位点保存,当下游SDK异常宕机并重启后,会自动从上次异常退出的最后一个消费位点继续订阅并消费数据。

    在message消费完成后,需要调用这个接口向数据传输服务端汇报一个ACK,通知服务端更新下游SDK的消费位点, 保证SDK异常重启后消费数据的完整性。

Record接口定义

Record代表订阅的RDS binlog中的每条记录,例如begin, commit,update等。

  • String getAttribute(String key)

    这个函数可以获取Record中主要的一些属性值。传入参数为属性名,返回这个属性的值。

    可以调用这个函数获取属性值的属性名及对应的属性值如下表:

key 说明
record_id 这条Record的ID,这个ID在订阅过程中不保证递增
instance 这条Record对应的数据库实例的连接地址,格式为:IP:Port
source_type 这条Record对应数据库实例的引擎类型,目前取值为:mysql
source_category 这条Record的类型,目前取值为:full_recorded
timestamp 这条Record落binlog的时间,这个时间同时也是这条SQL在RDS中执行的时间
checkpoint 这条Record对应的binlog文件的位点,格式为:file_offset@file_name,filen_name为binlog文件的数字后缀
record_type 这条Record对应的操作类型,主要取值包括:insert/update/delete/replace/ddl/begin/commit/heartbeat
db 这条Record更新表,对应的数据库名
table_name 这条Record更新表的表名
record_recording 这条Record对应的编码
primary 这条Record更新表的主键列名
fields_enc 这条Record每个字段值的编码,各个字段之间用逗号隔开,如果非字符类型那么取值为空
  • Type getOpt()

    获取这条记录的变更类型,包括:

    insert、delete、update、replace、ddl、begin、commit、heartbeat。

    其中heartbeat为数据传输内部定义的心跳表,主要用于检查订阅通道是否健康,理论上每秒都会产生一条 heartbeat。

  • String getCheckpoint()

    获取这条变更记录在binlog中的位点,返回的位点格式为:
    binlog_offset@binlog_fid。

    其中binlog_offset为变更记录在binlog文件中的偏移量,binlog_fid为binlog文件的数字后缀,例如binlog文件名为mysql-bin.0008,那么binlog_fid为8。

  • String gettimestamp()

    获取这条变更记录在binlog中记录的运行时间戳。

  • String getDbname()

    获取这条变更记录修改的表所对应的数据库库名。

  • String getTablename()

    获取这条表更记录修改表对应的表名。

  • String getPrimaryKeys()

    获取这条变更记录对应的主键列名,如果是联合主键,那么这些列名之间用逗号分隔。

  • DBType getDbType()

    获得订阅实例的数据库类型,目前数据传输仅支持RDS MySQL,所以这个值为MySQL。

  • String getServerId()

    获取这条变更记录对应的RDS MySQL实例运行进程的IP:PORT。

  • int getFieldCount()

    获取这条变更记录的字段Field的个数。

  • List<Field> getFieldList()

    这个函数的返回结果的数据类型为List <Field >。

    List<Field> 包含了这条变更记录对应表的所有字段的定义及变更前后的镜像值,Field对象的定义详见Field接口定义

  • Boolean isFirstInLogevent()

    判断这条Record 是否数据库批量变更中的第一条事务日志,如果是的话返回True,否则返回False。

Field接口定义

Field类定义了每个字段的编码、类型、字段名、字段值及是否为主键等属性,本小节介绍Field类的各个接口定义。

  • String getEncoding()

    获取这个字段值的编码格式。

  • String getFieldname()

    获取这个字段的名称。

  • Type getType()

    获取这个字段的数据类型,Type的定义具体参见下面的字段类型定义。

  • ByteString getValue()

    获取这个字段的值,返回类型为ByteString,当值为空时,返回NULL。

  • Boolean isPrimary()

    判断这个字段是否是表的主键列,如果是返回True,否则返回False。

最后更新:2016-11-23 16:03:55

  上一篇:go 查看订阅数据__数据订阅_用户指南_数据传输-阿里云
  下一篇:go SDK快速入门__数据订阅_用户指南_数据传输-阿里云