閱讀627 返回首頁    go 阿裏雲


MySQLReader__Reader插件_使用手冊_數據集成-阿裏雲

1 快速介紹

MySQLReader插件實現了從Mysql讀取數據。在底層實現上,MySQLReader通過JDBC連接遠程Mysql數據庫,並執行相應的sql語句將數據從mysql庫中SELECT出來。

2 實現原理

簡而言之,MySQLReader通過JDBC連接器連接到遠程的Mysql數據庫,並根據用戶配置的信息生成查詢SELECT SQL語句並發送到遠程Mysql數據庫,並將該SQL執行返回結果使用DataX自定義的數據類型拚裝為抽象的數據集,並傳遞給下遊Writer處理。

對於用戶配置Table、Column、Where的信息,MySQLReader將其拚接為SQL語句發送到Mysql數據庫;對於用戶配置querySql信息,Mysql直接將其發送到Mysql數據庫。

3 功能說明

3.1 配置樣例
  • 使用instance配置一個從Mysql數據庫同步抽取數據到ODPS的作業:
  1. {
  2. "type": "job",
  3. "traceId": "您可以在這裏填寫您作業的追蹤ID,建議使用業務名+您的作業ID",
  4. "version": "1.0",
  5. "configuration": {
  6. "reader": {
  7. "plugin": "mysql",
  8. "parameter": {
  9. "instanceName": "datasync001",
  10. "database": "database",
  11. "table": "table",
  12. "splitPk": "db_id",
  13. "username": "datasync001",
  14. "password": "xxxxxx",
  15. "column": [
  16. "*"
  17. ],
  18. "where": "Date(add_time) = '2014-06-01'"
  19. }
  20. },
  21. "writer": {
  22. "plugin": "odps",
  23. "parameter": {
  24. "accessId": "bazhen.csy",
  25. "accessKey": "xxxxxxx",
  26. "project": "project",
  27. "table": "table",
  28. "column": [
  29. "*"
  30. ],
  31. "partition": "pt=20150101"
  32. }
  33. }
  34. }
  35. }
  • 使用JDBC方式配置一個從Mysql數據庫同步抽取數據到ODPS的作業:
  1. {
  2. "type": "job",
  3. "traceId": "您可以在這裏填寫您作業的追蹤ID,建議使用業務名+您的作業ID",
  4. "version": "1.0",
  5. "configuration": {
  6. "reader": {
  7. "plugin": "mysql",
  8. "parameter": {
  9. "jdbcUrl": "jdbc:mysql://ip:port/database",
  10. "table": "table",
  11. "splitPk": "db_id",
  12. "username": "datasync001",
  13. "password": "xxxxxx",
  14. "column": [
  15. "*"
  16. ],
  17. "where": "Date(add_time) = '2014-06-01'"
  18. }
  19. },
  20. "writer": {
  21. "plugin": "odps",
  22. "parameter": {
  23. "accessId": "bazhen.csy",
  24. "accessKey": "xxxxxxx",
  25. "project": "project",
  26. "table": "table",
  27. "column": [
  28. "*"
  29. ],
  30. "partition": "pt=20150101"
  31. }
  32. }
  33. }
  34. }
  • 使用多個instance配置一個從Mysql數據庫同步抽取分庫分表數據到ODPS的作業:
  1. {
  2. "type": "job",
  3. "traceId": "您可以在這裏填寫您作業的追蹤ID,建議使用業務名+您的作業ID",
  4. "version": "1.0",
  5. "configuration": {
  6. "reader": {
  7. "plugin": "mysql",
  8. "parameter": {
  9. "connection": [
  10. {
  11. "table": [
  12. "tbl1",
  13. "tbl2",
  14. "tbl3"
  15. ],
  16. "instanceName": "inst1",
  17. "database": "db"
  18. },
  19. {
  20. "table": [
  21. "tbl4",
  22. "tbl5",
  23. "tbl6"
  24. ],
  25. "instanceName": "inst2",
  26. "database": "db"
  27. }
  28. ],
  29. "splitPk": "db_id",
  30. "username": "datasync001",
  31. "password": "xxxxxx",
  32. "column": [
  33. "*"
  34. ],
  35. "where": "Date(add_time) = '2014-06-01'"
  36. }
  37. },
  38. "writer": {
  39. "plugin": "odps",
  40. "parameter": {
  41. "accessId": "bazhen.csy",
  42. "accessKey": "xxxxxxx",
  43. "project": "project",
  44. "table": "table",
  45. "column": [
  46. "*"
  47. ],
  48. "partition": "pt=20150101"
  49. }
  50. }
  51. }
  52. }
  • 使用多個jdbc配置一個從Mysql數據庫同步抽取分庫分表數據到ODPS的作業:
  1. {
  2. "type": "job",
  3. "traceId": "您可以在這裏填寫您作業的追蹤ID,建議使用業務名+您的作業ID",
  4. "version": "1.0",
  5. "configuration": {
  6. "reader": {
  7. "plugin": "mysql",
  8. "parameter": {
  9. "connection": [
  10. {
  11. "table": [
  12. "tbl1",
  13. "tbl2",
  14. "tbl3"
  15. ],
  16. "jdbcUrl": "jdbc:mysql://ip:port/database1"
  17. },
  18. {
  19. "table": [
  20. "tbl4",
  21. "tbl5",
  22. "tbl6"
  23. ],
  24. "jdbcUrl": "jdbc:mysql://ip:port/database2"
  25. }
  26. ],
  27. "splitPk": "db_id",
  28. "username": "datasync001",
  29. "password": "xxxxxx",
  30. "column": [
  31. "*"
  32. ],
  33. "where": "Date(add_time) = '2014-06-01'"
  34. }
  35. },
  36. "writer": {
  37. "plugin": "odps",
  38. "parameter": {
  39. "accessId": "bazhen.csy",
  40. "accessKey": "xxxxxxx",
  41. "project": "project",
  42. "table": "table",
  43. "column": [
  44. "*"
  45. ],
  46. "partition": "pt=20150101"
  47. }
  48. }
  49. }
  50. }
3.2 參數說明
  • instanceName

    • 描述: 阿裏雲RDS實例名稱(Instance名稱)。用戶使用該配置指定RDS的Instance名稱,CDP將翻譯為底層執行的jdbcUrl連接串連接。

      instanceName指定的是RDS實例。發給類似Mysql實例,指定了數據源的IP+Port,需要和database配合使用。例如,在RDS WebConsole頁麵中點擊【基本信息】,其右側詳情的名稱即是用戶擁有RDS的instanceName。

      image

    • 必選:是

    • 默認值:無

  • jdbcUrl

    • 描述:對於CDP私有雲場景、數據庫遷移場景等,其本身數據源是普通Mysql數據庫不是RDS。對於這類使用場景,用戶可以指定jdbc信息直連。

      jdbcUrl和instanceName/database兩類信息概念上是等同的,因此隻能配置其一。如果兩者均配置,CDP默認將使用jdbcUrl信息。

      考慮到公有雲上用戶RDS DNS名稱、內網等信息可能存在更改的情況,強烈建議公有雲用戶不使用jdbcUrl配置項。

    • 必選:是

    • 默認值:無

  • database

    • 描述: 阿裏雲RDS實例下的數據庫名稱。
    • 必選:是
    • 默認值:無
  • username

    • 描述:數據源的用戶名
    • 必選:是
    • 默認值:無
  • password

    • 描述:數據源的密碼
    • 必選:是
    • 默認值:無
  • table

    • 描述:所選取的需要同步的表名稱,一個CDP Job隻能同步一張表。
    • 必選:是
    • 默認值:無
  • column

    • 描述:所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息。用戶使用星號代表默認使用所有列配置,例如[‘*’]。

      支持列裁剪,即列可以挑選部分列進行導出。

      支持列換序,即列可以不按照表schema信息進行導出。

      支持常量配置,用戶需要按照Mysql SQL語法格式:

      [“id”, “`table`“, “1”, “‘bazhen.csy’”, “‘null’”, “to_char(a + 1)”, “2.3” , “true”]

      id為普通列名,`table`為包含保留在的列名,1為整形數字常量,’bazhen.csy’為字符串常量,null為空指針,to_char(a + 1)為表達式,2.3為浮點數,true為布爾值。

      column必須用戶顯示指定同步的列集合,不允許為空!

    • 必選:是

    • 默認值:無

  • splitPk

    • 描述:MySQLReader進行數據抽取時,如果指定splitPk,表示用戶希望使用splitPk代表的字段進行數據分片,DataX因此會啟動並發任務進行數據同步,這樣可以大大提供數據同步的效能。

      推薦splitPk用戶使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分片也不容易出現數據熱點。

      目前splitPk僅支持整型數據切分,不支持字符串、浮點、日期等其他類型。如果用戶指定其他非支持類型,同步任務將報錯;

      如果splitPk不填寫,包括不提供splitPk或者splitPk值為空,DataX視作使用單通道同步該表數據。

    • 必選:否

    • 默認值:無

  • where

    • 描述:篩選條件。例如在做測試時,可以將where條件指定為limit 10;在實際業務場景中,往往會選擇當天的數據進行同步,可以將where條件指定為gmt_create > $bizdate ;where條件可以有效地進行業務增量同步。如果不填寫where語句,包括不提供where的key或者value,DataX均視作同步全量數據。
    • 必選:否
    • 默認值:無
  • querySql

    • 描述:在有些業務場景下,where這一配置項不足以描述所篩選的條件,用戶可以通過該配置型來自定義篩選SQL。當用戶配置了這一項之後,DataX係統就會忽略tables,columns這些配置型,直接使用這個配置項的內容對數據進行篩選,例如需要進行多表join後同步數據,使用select a,b from table_a join table_b on table_a.id = table_b.id;當用戶配置querySql時,MySQLReader直接忽略table、column、where條件的配置,querySql優先級大於table、column、where選項。
    • 必選:否
    • 默認值:無
3.3 類型轉換

目前MySQLReader支持大部分Mysql類型,但也存在部分個別類型沒有支持的情況,請注意檢查你的類型。

下麵列出MySQLReader針對Mysql類型轉換列表:

DataX 內部類型 Mysql 數據類型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext
Date date, datetime, timestamp, time, year
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

請注意:

  • 除上述羅列字段類型外,其他類型均不支持。
  • MySQLReader將tinyint(1)視作整形。

4 性能報告

4.1 環境準備
4.1.1 數據特征

建表語句:

  1. CREATE TABLE `tc_biz_vertical_test_0000` (
  2. `biz_order_id` bigint(20) NOT NULL COMMENT 'id',
  3. `key_value` varchar(4000) NOT NULL COMMENT 'Key-value的內容',
  4. `gmt_create` datetime NOT NULL COMMENT '創建時間',
  5. `gmt_modified` datetime NOT NULL COMMENT '修改時間',
  6. `attribute_cc` int(11) DEFAULT NULL COMMENT '防止並發修改的標誌',
  7. `value_type` int(11) NOT NULL DEFAULT '0' COMMENT '類型',
  8. `buyer_id` bigint(20) DEFAULT NULL COMMENT 'buyerid',
  9. `seller_id` bigint(20) DEFAULT NULL COMMENT 'seller_id',
  10. PRIMARY KEY (`biz_order_id`,`value_type`),
  11. KEY `idx_biz_vertical_gmtmodified` (`gmt_modified`)

) ENGINE=InnoDB DEFAULT CHARSET=gbk COMMENT=’tc_biz_vertical’

單行記錄類似於:

  1. biz_order_id: 888888888
  2. key_value: ;orderIds:20148888888,2014888888813800;
  3. gmt_create: 2011-09-24 11:07:20

gmt_modified: 2011-10-24 17:56:34

attribute_cc: 1

  1. value_type: 3
  2. buyer_id: 8888888
  3. seller_id: 1
4.1.2 機器參數
  • 執行DataX的機器參數為:
    1. cpu: 24核 Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz
    2. mem: 48GB
    3. net: 千兆雙網卡
    4. disc: DataX 數據不落磁盤,不統計此項
  • Mysql數據庫機器參數為:
    1. cpu: 32核 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
    2. mem: 256GB
    3. net: 千兆雙網卡
    4. disc: BTWL419303E2800RGN INTEL SSDSC2BB800G4 D2010370
4.1.3 DataX jvm 參數

-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError

4.2 測試報告
4.2.1 單表測試報告
通道數 是否按照主鍵切分 DataX速度(Rec/s) DataX機器網卡進入流量(MB/s) DataX機器運行負載 DB網卡流出流量(MB/s) DB運行負載
1 183185 29 0.6 31 0.6
1 183185 29 0.6 31 0.6
4 183185 29 0.6 31 0.6
4 329733 58 0.8 60 0.76
8 183185 29 0.6 31 0.6
8 549556 115 1.46 120 0.78

說明:

  1. 這裏的單表,主鍵類型為 bigint(20),範圍為:190247559466810-570722244711460,從主鍵範圍劃分看,數據分布均勻。
  2. 對單表如果沒有安裝主鍵切分,那麼配置通道個數不會提升速度,效果與1個通道一樣。
4.2.2 分表測試報告(2個分庫,每個分庫16張分表,共計32張分表)
通道數 DataX速度(Rec/s) DataX機器網卡進入流量(MB/s) DataX機器運行負載 DB網卡流出流量(MB/s) DB運行負載
1 202241 31.5 1.0 32 1.1
4 726358 123.9 3.1 132 3.6
8 1074405 197 5.5 205 5.1
16 1227892 229.2 8.1 233 7.3

5 約束限製

5.1 主備同步數據恢複問題

主備同步問題指Mysql使用主從災備,備庫從主庫不間斷通過binlog恢複數據。由於主備數據同步存在一定的時間差,特別在於某些特定情況,例如網絡延遲等問題,導致備庫同步恢複的數據與主庫有較大差別,導致從備庫同步的數據不是一份當前時間的完整鏡像。

CDP如果同步的是阿裏雲提供RDS,是直接從主庫讀取數據,不存在數據恢複問題。但是會引入主庫負載問題,請注意流控配置。

5.2 一致性約束

Mysql在數據存儲劃分中屬於RDBMS係統,對外可以提供強一致性數據查詢接口。例如當一次同步任務啟動運行過程中,當該庫存在其他數據寫入方寫入數據時,MySQLReader完全不會獲取到寫入更新數據,這是由於數據庫本身的快照特性決定的。關於數據庫快照特性,請參看MVCC Wikipedia

上述是在MySQLReader單線程模型下數據同步一致性的特性,由於MySQLReader可以根據用戶配置信息使用了並發數據抽取,因此不能嚴格保證數據一致性:當MySQLReader根據splitPk進行數據切分後,會先後啟動多個並發任務完成數據同步。由於多個並發任務相互之間不屬於同一個讀事務,同時多個並發任務存在時間間隔。因此這份數據並不是完整的一致的數據快照信息。

針對多線程的一致性快照需求,在技術上目前無法實現,隻能從工程角度解決,工程化的方式存在取舍,我們提供幾個解決思路給用戶,用戶可以自行選擇:

  1. 使用單線程同步,即不再進行數據切片。缺點是速度比較慢,但是能夠很好保證一致性。
  2. 關閉其他數據寫入方,保證當前數據為靜態數據,例如,鎖表、關閉備庫同步等等。缺點是可能影響在線業務。
5.3 數據庫編碼問題

Mysql本身的編碼設置非常靈活,包括指定編碼到庫、表、字段級別,甚至可以均不同編碼。優先級從高到低為字段、表、庫、實例。我們不推薦數據庫用戶設置如此混亂的編碼,最好在庫級別就統一到UTF-8。

MySQLReader底層使用JDBC進行數據抽取,JDBC天然適配各類編碼,並在底層進行了編碼轉換。因此MySQLReader不需用戶指定編碼,可以自動獲取編碼並轉碼。

對於Mysql底層寫入編碼和其設定的編碼不一致的混亂情況,MySQLReader對此無法識別,對此也無法提供解決方案,對於這類情況,導出有可能為亂碼

5.4 增量數據同步

MySQLReader使用JDBC SELECT語句完成數據抽取工作,因此可以使用SELECT…WHERE…進行增量數據抽取,方式有多種:

  • 數據庫在線應用寫入數據庫時,填充modify字段為更改時間戳,包括新增、更新、刪除(邏輯刪)。對於這類應用,MySQLReader隻需要WHERE條件跟上一同步階段時間戳即可。
  • 對於新增流水型數據,MySQLReader可以WHERE條件後跟上一階段最大自增ID即可。

對於業務上無字段區分新增、修改數據情況,MySQLReader也無法進行增量數據同步,隻能同步全量數據。

5.5 SQL安全性

MySQLReader提供querySql語句交給用戶自己實現SELECT抽取語句,MySQLReader本身對querySql不做任何安全性校驗。這塊交由DataX用戶方自己保證。

FAQ


Q: MySQLReader同步報錯,報錯信息為XXX

A: 網絡或者權限問題,請使用mysql命令行測試:

  1. mysql -u<username> -p<password> -h<ip> -D<database> -e "select * from <表名>"

如果上述命令也報錯,那可以證實是環境問題,請聯係你的DBA。


Q: 我想同步Mysql增量數據,怎麼配置?

A: MySQLReader必須業務支持增量字段DataX才能同步增量,例如在淘寶大部分業務表中,通過gmt_modified字段表征這條記錄的最新修改時間,那麼DataX MySQLReader隻需要配置where條件為

  1. "where": "Date(add_time) = '2014-06-01'"

Q: 上述bizdate代表什麼意思? 我每天需要同步的gmt_modified值肯定不一樣的,如何做到每天使用不同變量值

A: CDP支持自定義變量,請參考CDP Console 中有關自定義變量章節說明。


Q: 我有1億條數據需要同步,大概需要同步多長時間

A: 和數據庫配置、DataX機器負載相關,請參考上述性能章節


Q: 為什麼不推薦使用默認列*, 會有問題嗎

A: 如果你限定了DataX同步a,b,c三列,那麼數據庫在添加列,修改列,刪除列情況,DataX可以做到要麼立即報錯,要麼兼容老情況。如果直接配置星號,上遊的數據庫變量會立刻影響下遊數據!而且可能會在下遊任務運行到一段時間才報錯,可能已經引入了大量髒數據。


Q: MySQLReader同步出現亂碼,怎麼處理

A: 通常情況下是你同步的數據庫沒有按照規範配置編碼,比如數據庫配置的編碼是UTF8,但是底層物理文件實際存放的編碼是GBK,DataX按照UTF-8讀取數據就會出現亂碼。此時你應該尋求你的DBA修改庫的編碼格式。

最後更新:2016-11-30 16:25:49

  上一篇:go Job主體配置__作業配置說明_使用手冊_數據集成-阿裏雲
  下一篇:go SQLServerReader__Reader插件_使用手冊_數據集成-阿裏雲