閱讀777 返回首頁    go 財經資訊


SDK接口簡介__數據訂閱_用戶指南_數據傳輸-阿裏雲

數據傳輸的數據訂閱功能需要通過數據傳輸提供的SDK進行增量數據的訂閱及消費。

在使用SDK消費之前,需要現在數據傳輸控製台創建需要訂閱的RDS實例的訂閱通道

當訂閱通道創建完成後,使用SDK可以實時訂閱訂閱通道中的增量數據。目前:

  1. 數據傳輸隻提供JAVA版本SDK,SDK下載地址
  2. 一個訂閱通道隻能被一個SDK消費,如果啟動多個SDK連接同一個訂閱通道時,隻能有一個SDK進程拉取到增量數據。如果有多個下遊SDK需要訂閱同一個RDS的增量數據。那麼需要為每個下遊SDK創建一個訂閱通道。

SDK中定義了多種類對象,本小節簡單介紹SDK的這些類的接口定義。

RegionContex接口定義

  • setAccessKey(accessKey)

    設置安全憑證,參數為需要訂閱數據的訂閱通道對應的阿裏雲賬號的AccessKey。

  • setSecret(AccessKeySecret)

    設置安全憑證,參數為阿裏雲賬號對應的AccessKeySecret。可以到AK頁麵創建並獲取。

  • setUsePublicIp(usePublicIp)

    配置SDK運行服務器是否使用公網訂閱數據。如果通過公網訂閱數據,那麼參數usePublicIp參數為True,否則為False。

    數據訂閱可以通過內網進行訂閱,但是SDK在建立訂閱連接之前需要先跟數據傳輸管控係統通信獲取訂閱通道的物理連接地址,SDK跟數據傳輸管控係統需要通過互聯網通信,所以即使通過內網訂閱數據,SDK部署服務器也需要掛載公網IP。

ClusterClient接口定義

  • void addConcurrentListener(ClusterListener arg0)

    添加下遊監聽者,監聽者加入到一個ClusterClient中,才可以訂閱訂閱通道中的增量數據。

    參數ClusterListener arg0 為類ClusterListener的對象。

  • void askForGUID(String arg0)

    請求某個訂閱通道的增量數據,參數String arg0 為訂閱通道的ID,需要到數據傳輸控製台獲取,如下圖標識。

    獲取訂閱通道ID

  • List<ClusterListener> getConcurrentListeners()

    獲取這個ClusterClient中的監聽者列表,接口返回類型為List <ClusterListener >。

  • void start()

    啟動SDK客戶端,開始訂閱增量數據。

  • void stop()

    停止SDK客戶端,停止訂閱增量數據。由於SDK中拉取數據和回調notify的是同一個線程執行的,如果notify的消費代碼中有信號不可打斷的功能時,那麼stop函數可能不能正常關閉掉客戶端。

ClusterListener接口定義

  • void notify(List<ClusterMessage> arg0)

    這個函數主要用於定義增量數據的消費,當SDK接受到數據時,會通過notify通知ClusterListner消費數據。例如示例demo的消費方式,就是將訂閱數據打印到屏幕上。

    這個函數輸入參數類型為:List <ClusterMessage >, 其中ClusterMessage為訂閱數據存儲的結構對象,具體定義詳見ClusterMessage接口定義。

ClusterMessage接口定義

每個ClusterMessage保存RDS中的一個事務的數據記錄,事務中的每條記錄通過Record保存,本小節介紹ClusterMessage的主要接口函數。

  • Record getRecord()

    這個接口從ClusterMessage中獲取一條變更記錄。這個變更記錄表示RDS binlog文件中的每一條記錄,例如begin ,commit,update,insert等。

  • void ackAsConsumed

    為了簡化下遊SDK進程容災,數據訂閱服務端支持SDK的消費位點保存,當下遊SDK異常宕機並重啟後,會自動從上次異常退出的最後一個消費位點繼續訂閱並消費數據。

    在message消費完成後,需要調用這個接口向數據傳輸服務端匯報一個ACK,通知服務端更新下遊SDK的消費位點, 保證SDK異常重啟後消費數據的完整性。

Record接口定義

Record代表訂閱的RDS binlog中的每條記錄,例如begin, commit,update等。

  • String getAttribute(String key)

    這個函數可以獲取Record中主要的一些屬性值。傳入參數為屬性名,返回這個屬性的值。

    可以調用這個函數獲取屬性值的屬性名及對應的屬性值如下表:

key 說明
record_id 這條Record的ID,這個ID在訂閱過程中不保證遞增
instance 這條Record對應的數據庫實例的連接地址,格式為:IP:Port
source_type 這條Record對應數據庫實例的引擎類型,目前取值為:mysql
source_category 這條Record的類型,目前取值為:full_recorded
timestamp 這條Record落binlog的時間,這個時間同時也是這條SQL在RDS中執行的時間
checkpoint 這條Record對應的binlog文件的位點,格式為:file_offset@file_name,filen_name為binlog文件的數字後綴
record_type 這條Record對應的操作類型,主要取值包括:insert/update/delete/replace/ddl/begin/commit/heartbeat
db 這條Record更新表,對應的數據庫名
table_name 這條Record更新表的表名
record_recording 這條Record對應的編碼
primary 這條Record更新表的主鍵列名
fields_enc 這條Record每個字段值的編碼,各個字段之間用逗號隔開,如果非字符類型那麼取值為空
  • Type getOpt()

    獲取這條記錄的變更類型,包括:

    insert、delete、update、replace、ddl、begin、commit、heartbeat。

    其中heartbeat為數據傳輸內部定義的心跳表,主要用於檢查訂閱通道是否健康,理論上每秒都會產生一條 heartbeat。

  • String getCheckpoint()

    獲取這條變更記錄在binlog中的位點,返回的位點格式為:
    binlog_offset@binlog_fid。

    其中binlog_offset為變更記錄在binlog文件中的偏移量,binlog_fid為binlog文件的數字後綴,例如binlog文件名為mysql-bin.0008,那麼binlog_fid為8。

  • String gettimestamp()

    獲取這條變更記錄在binlog中記錄的運行時間戳。

  • String getDbname()

    獲取這條變更記錄修改的表所對應的數據庫庫名。

  • String getTablename()

    獲取這條表更記錄修改表對應的表名。

  • String getPrimaryKeys()

    獲取這條變更記錄對應的主鍵列名,如果是聯合主鍵,那麼這些列名之間用逗號分隔。

  • DBType getDbType()

    獲得訂閱實例的數據庫類型,目前數據傳輸僅支持RDS MySQL,所以這個值為MySQL。

  • String getServerId()

    獲取這條變更記錄對應的RDS MySQL實例運行進程的IP:PORT。

  • int getFieldCount()

    獲取這條變更記錄的字段Field的個數。

  • List<Field> getFieldList()

    這個函數的返回結果的數據類型為List <Field >。

    List<Field> 包含了這條變更記錄對應表的所有字段的定義及變更前後的鏡像值,Field對象的定義詳見Field接口定義

  • Boolean isFirstInLogevent()

    判斷這條Record 是否數據庫批量變更中的第一條事務日誌,如果是的話返回True,否則返回False。

Field接口定義

Field類定義了每個字段的編碼、類型、字段名、字段值及是否為主鍵等屬性,本小節介紹Field類的各個接口定義。

  • String getEncoding()

    獲取這個字段值的編碼格式。

  • String getFieldname()

    獲取這個字段的名稱。

  • Type getType()

    獲取這個字段的數據類型,Type的定義具體參見下麵的字段類型定義。

  • ByteString getValue()

    獲取這個字段的值,返回類型為ByteString,當值為空時,返回NULL。

  • Boolean isPrimary()

    判斷這個字段是否是表的主鍵列,如果是返回True,否則返回False。

最後更新:2016-11-23 16:03:55

  上一篇:go 查看訂閱數據__數據訂閱_用戶指南_數據傳輸-阿裏雲
  下一篇:go SDK快速入門__數據訂閱_用戶指南_數據傳輸-阿裏雲