閱讀473 返回首頁    go 小米MIX


創建RDS到MaxCompute數據實時同步作業__實時同步_用戶指南_數據傳輸-阿裏雲

本小節介紹如何使用數據傳輸服務快速創建RDS實例到MaxCompute實例間的實時同步作業,實現在線(RDS)到離線係統(MaxCompute)的數據實時同步,進一步為數據實時分析奠定基礎。

支持功能

數據源

  • 支持同一個阿裏雲賬號下RDS MySQL實例到MaxCompute實例的數據實時同步。
  • 支持不同阿裏雲賬號下的RDS MySQL實例到MaxCompute實例的數據實時同步。
  • 支持的RDS實例包括,經典網絡和VPC網絡兩種網絡模式。

同步對象

  • 隻支持表的同步,不支持其他非表對象的同步。

同步原理

原理

如上圖所示,整個同步過程分為兩步:
(1) 全量初始化, 這個步驟將RDS MySQL中已經存在的全量數據初始化到MaxCompute中。對於同步的每個表,全量初始化的數據都會獨立存儲在MaxCompute中的全量基線表中,這個表的默認格式為:源表名_base。例如表 t1,那麼全量基線表在MaxCompute中存儲的表名為:t1_dts_base。這個存儲表名前綴可以根據需要變更,您可以在配置任務時,修改表在MaxCompute存儲的名稱。

(2) 增量數據同步,這個步驟將RDS MySQL產生的增量數據數據實時同步到MaxCompute中。並存儲在增量日誌表中,每個同步表對應一個增量日誌表。增量日誌表在MaxCompute中存儲的表名的默認格式為:源表名_log。這個存儲表名前綴可以根據需要變更,您可以在配置任務時,修改表在MaxCompute存儲的名稱。

增量日誌表除了存儲更新數據,它還會存儲一些元信息,增量日誌表的表結構定義如下:

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
1 I 1476258462 N Y 1 ….. JustInsert
2 U 1476258463 Y N 1 ….. JustInsert
2 U 1476258463 N Y 1 ….. JustUpdate
3 D 1476258464 Y N 1 ….. JustUpdate

其中:
record_id: 這條增量日誌的唯一標識,唯一遞增。如果變更類型為update,那麼增量更新會被拆分成2條,一條Insert,一條Delete。那麼這兩條記錄的record_id相同。
operation_flag: 標示這條增量日誌的操作類型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 這條增量日誌的操作時間戳,為這個更新操作記錄binlog的時間戳。這個時間戳為UTC時間。
before_flag: 表示這條增量日誌後麵帶的各個column值是否更新前的值。取值包括:Y 和 N。當後麵的column為更新前的值時,before_flag=Y, 當後麵的column值為更新後的值時,before_flag=N。
after_flag:表示這條增量日誌後麵帶的各個column值是否更新後的值。取值包括:Y 和 N。 當後麵的column為更新前的值時,after_flag=N,當後麵的column值為更新後的值時,after_flag=Y。

對於不同的操作類型,增量日誌中的before_flag和after_flag定義如下:

1) 操作類型為:insert

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
1 I 1476258462 N Y 1 ….. JustInsert

當操作類型為insert時,後麵的所有column值為新插入的記錄值,即為更新後的值。所以before_flag=N, after_flag=Y。

2) 操作類型為:update

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
2 U 1476258463 Y N 1 ….. JustInsert
2 U 1476258463 N Y 1 ….. JustUpdate

當操作類型為update時,會將update操作拆為2條增量日誌。這兩條增量日誌的record_id ,operation_flag 及dts_utc_timestamp相同。
第一條日誌記錄了更新前的值,所以before_flag=Y, after_flag=N。
第二條日誌記錄了更新後的值,所以before_flag=N, after_flag=Y。

3) 操作類型為:delete

record_id operation_flag dts_utc_timestamp before_flag after_flag col1 …. colN
3 D 1476258464 Y N 1 ….. JustUpdate

當操作類型為delete時,後麵的所有column值為被刪除的記錄值,即為更新前的值。所以before_flag=Y, after_flag=N。

(3) RDS->MaxCompute數據同步,對於每個同步表,都會在MaxCompute中生成一個全量基線表+一個增量日誌表,所以如果需要在MaxCompute中獲取某個時刻某張表的全量數據,就需要merge這張表的全量基線表和增量日誌表。具體實現方法後麵會詳細講解。

下麵詳細介紹RDS到MaxCompute數據實時同步作業的配置流程。

同步作業配置流程

下麵詳細介紹RDS到MaxCompute數據實時同步作業的配置流程。

1.購買同步鏈路

進入數據傳輸服務控製台,進入數據同步頁麵,點擊控製台右上角“創建同步作業” 開始作業配置。

在鏈路配置之前需要購買一個同步鏈路。同步鏈路目前支持包年包月及按量付費兩種付費模式,可以根據需要選擇不同的付費模式。

在購買頁麵需要配置的參數包括:

  • 源實例
    同步作業的源實例類型,目前隻支持RDS For MySQL。
  • 源地域
    源地域為同步實例的源RDS實例所在地域。
  • 目標實例
    目標實例為同步作業的目標實例類型,目前支持 RDS For MySQL, MaxCompute(原ODPS),DataHub。配置RDS->MaxCompute同步鏈路時,目標實例選擇:MaxCompute 即可。
  • 目標地域
    由於MaxCompute目前隻在上海地區售賣,所以目標地域選擇上海。
  • 實例規格
    實例規格影響了鏈路的同步性能,可以根據業務性能選擇合適的規則。
  • 網絡類型
    RDS->MaxCompute支持通過公網、私網 同步數據。如果源RDS沒有公網連接地址,那麼網絡類型隻能選擇 私網
  • 數量
    數量為一次性購買的同步鏈路的數量,如果購買的是按量付費實例,一次最多購買99條鏈路。

當購買完同步實例,返回數據傳輸控製台,點擊新購鏈路右側的“配置同步作業” 開始鏈路配置。

2.同步鏈路連接信息配置。

在這一步主要配置:

  • 同步作業名稱
    同步作業名稱沒有唯一性要求,主要為了更方便識別具體的作業,建議選擇一個有業務意義的作業名稱,方便後續的鏈路查找及管理。

  • 實例ID配置

在這個步驟中需要配置源RDS實例的實例ID,及目標MaxCompute實例的project。配置的MaxCompute project 必須屬於登錄DTS的阿裏雲賬號的資源。

實例連接信息

當這些內容配置完成後,可以點擊授權白名單並進入下一步

3.授權RDS實例白名單

這個步驟,主要是將給DTS服務賬號授權MaxCompute寫權限,讓DTS能夠將數據同步複製到MaxCompute中。

步驟2

授權權限包括對project的:
CreateTable
CreateInstance
CreateResource
CreateJob
List
為了保證同步作業的穩定性,在同步過程中,請勿將寫權限回收。當白名單授權後,點擊下一步,進入同步賬號創建。

當授權完成後,即進入同步對象選擇。

4.選擇同步對象

當MaxCompute賬號授權完成後,即進入同步表及同步初始化的相關配置。

步驟3

在這個步驟中,需要配置 同步初始化 和 同步表。其中:
(1)同步初始化
同步初始化選項包括: 結構初始化 和 全量數據初始化。
結構初始化是指對於待同步的表,在MaxCompute中創建對應的表,完成表結構定義。
全量數據初始化是指對於待同步的表,將曆史數據初始化到MaxCompute中。
配置任務時,建議同時選擇 結構初始化+全量數據初始化。

(2) 同步表選擇

同步表隻能選擇某些表,不能直接選擇整個庫。對於同步的表,可以修改表在MaxCompute對應的全量基線表及增量日誌表的表名前綴。如需修改,可以點擊右邊已選擇對象後麵的編輯按鈕,進入修改界麵。步驟4

當配置完同步對象後,進入同步初始化配置。

5.預檢查

當上麵所有選項配置完成後,即進入啟動之前的預檢查。具體檢查項內容詳見本文最後的 預檢查內容 一節。
當同步作業配置完成後,數據傳輸服務會進行限製預檢查,當預檢查通過後,可以點擊 確定 按鈕,啟動同步作業。

當同步作業啟動之後,即進入同步作業列表。此時剛啟動的作業處於同步初始化狀態。初始化的時間長度依賴於源實例中同步對象的數據量大小。當初始化完成後同步鏈路即進入同步中的狀態,此時源跟目標實例的同步鏈路才真正建立完成。

當同步任務進入 同步中 時,可以在MaxCompute中可以查詢出jiangliu_test對應的全量基線表和增量日誌表:
表信息

至此,完成RDS->MaxCompute數據實時同步作業的配置。

全量數據合並方案

本小節介紹,如何根據同步到MaxCompute中的全量基線表和增量日誌數據得到某個時刻表的全量數據。
DTS提供通過MaxCompute SQL實現全量數據合並的能力。

通過MaxCompute SQL merge 全量基線數據 和 增量日誌表 得到時刻t的全量數據。MaxCompute SQL的寫法如下:

  1. insert overwrite table result_storage_table
  2. select col1,
  3. col2,
  4. colN
  5. from(
  6. select row_number() over(partition by t.primary_key_column
  7. order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, col1,col2,colN
  8. from(
  9. select incr.record_id, incr.operation_flag, incr.after_flag, incr.col1, incr.col2,incr.colN
  10. from table_log incr
  11. where utc_timestamp< timestmap
  12. union all
  13. select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.col1, base.col2,base.colN
  14. from table_base base) t) gt
  15. where record_num=1
  16. and after_flag='Y'

上麵代碼中的幾個變量意義如下:
1) result_storage_table 表示全量merge結果集的存儲表的表名
2) col1, col2,colN 表示同步表中列的列名
3) primary_key_column 表示同步表中的主鍵列的列名
4) table_log 表示增量日誌表
5) table_base 表示全量基線表
6) timestmap 表示需要merge哪個時刻的全量數據

例如對於上麵配置任務中的jiangliu_test表,jiangliu_test_20161010_base 為jiangliu_test對應的全量基線表,jiangliu_test_20161010_log 為jiangliu_test對應的增量日誌表。

jiangliu_test的表結構定義為:
表結構

那麼查詢時間戳1476263486 時刻,jiangliu_test表的全量數據的MaxCompute SQL如下:

  1. insert overwrite table jiangliu_test_1476263486
  2. select id,
  3. name
  4. from(
  5. select row_number() over(partition by t.id
  6. order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, name
  7. from(
  8. select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.name
  9. from jiangliu_test_20161010_log incr
  10. where utc_timestamp< 1476263486
  11. union all
  12. select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.name
  13. from jiangliu_test_20161010_base base) t) gt
  14. where gt.row_number= 1
  15. and gt.after_flag= 'Y' ;

您也可以通過大數據開發套件,在後續的計算分析操作之前,添加全量數據Merge節點,當全量數據merge完成後,可自動調度起後續的計算分析節點。同時可以配置調度周期,進行周期性的數據離線分析。

至此完成RDS->MaxCompute數據同步任務配置及全量數據合並。

最後更新:2016-12-07 10:50:45

  上一篇:go 創建RDS實例間數據實時同步作業__實時同步_用戶指南_數據傳輸-阿裏雲
  下一篇:go RDS到DataHub數據實時同步__實時同步_用戶指南_數據傳輸-阿裏雲