閱讀173 返回首頁    go 英雄聯盟


ADSWriter__Writer插件_使用手冊_數據集成-阿裏雲

1 快速介紹

歡迎ADS加入CDP生態圈!ADSWriter插件實現了其他數據源向ADS寫入功能,現有CDP所有的數據源均可以無縫接入ADS,實現數據快速導入ADS。

ADS寫入預計支持兩種實現方式:

  • ADSWriter 支持向ODPS中轉落地導入ADS方式,優點在於當數據量較大時(>1KW),可以以較快速度進行導入,缺點引入了ODPS作為落地中轉,因此牽涉三方係統(CDP、ADS、ODPS)鑒權認證。
  • ADSWriter 同時支持向ADS直接寫入的方式,優點在於小批量數據寫入能夠較快完成(<1KW),缺點在於大數據導入較慢。

注意:

如果從ODPS導入數據到ADS,請用戶提前在源ODPS的Project中授權ADS Build賬號具有讀取你源表ODPS的權限,同時,ODPS源表創建人和ADS寫入屬於同一個阿裏雲賬號。

如果從非ODPS導入數據到ADS,請用戶提前在目的端ADS空間授權ADS Build賬號具備Load data權限。

以上涉及ADS Build賬號請聯係ADS管理員提供。

2 實現原理

ADS寫入預計支持兩種實現方式:

2.1 Load模式

CDP 將數據導入ADS為當前導入任務分配的ADS項目表,隨後CDP通知ADS完成數據加載。該類數據導入方式實際上是寫ADS完成數據同步,由於ADS是分布式存儲集群,因此該通道吞吐量較大,可以支持TB級別數據導入。

image

  1. CDP底層得到明文的 jdbc://host:port/dbname + username + password + table, 以此連接ADS, 執行show grants; 前置檢查該用戶是否有ADS中目標表的Load Data或者更高的權限。注意,此時ADSWriter使用用戶填寫的ADS用戶名+密碼信息完成登錄鑒權工作。

  2. 檢查通過後,通過ADS中目標表的元數據反向生成ODPS DDL,在ODPS中間project中,以ADSWriter的賬戶建立ODPS表(非分區表,生命周期設為1-2Day), 並調用ODPSWriter把數據源的數據寫入該ODPS表中。

    注意,這裏需要使用中轉ODPS的賬號AK向中轉ODPS寫入數據。

  3. 寫入完成後,以中轉ODPS賬號連接ADS,發起Load Data From ‘odps://中轉project/中轉table/' [overwrite] into adsdb.adstable [partition (xx,xx=xx)]; 這個命令返回一個Job ID需要記錄。

    注意,此時ADS使用自己的Build賬號訪問中轉ODPS,因此需要中轉ODPS對這個Build賬號提前開放讀取權限。

  4. 連接ADS一分鍾一次輪詢執行 select state from information_schema.job_instances where job_id like ‘$Job ID’,查詢狀態,注意這個第一個一分鍾可能查不到狀態記錄。

  5. Success或者Fail後返回給用戶,然後刪除中轉ODPS表,任務結束。

上述流程是從其他非ODPS數據源導入ADS流程,對於ODPS導入ADS流程使用如下流程:

image

2.2 Insert模式

CDP 將數據直連ADS接口,利用ADS暴露的INSERT接口直寫到ADS。該類數據導入方式寫入吞吐量較小,不適合大批量數據寫入。有如下注意點:

  • ADSWriter使用JDBC連接直連ADS,並隻使用了JDBC Statement進行數據插入。ADS不支持PreparedStatement,故ADSWriter隻能單行多線程進行寫入。
  • ADSWriter支持篩選部分列,列換序等功能,即用戶可以填寫列。
  • 考慮到ADS負載問題,建議ADSWriter Insert模式建議用戶使用TPS限流,最高在1W TPS。
  • ADSWriter在所有Task完成寫入任務後,Job Post單例執行flush工作,保證數據在ADS整體更新。

3 功能說明

3.1 配置樣例
  • 這裏使用一份從內存產生到ADS,使用Load模式進行導入的數據。
{
    "type": "job",
    "traceId": "您可以在這裏填寫您作業的追蹤ID,建議使用業務名+您的作業ID",
    "version": "1.0",
    "configuration": {
        "reader": {
            "plugin": "stream",
            "parameter": {
                "column": [
                    {
                        "type": "string",
                        "value": "filed"
                    },
                    {
                        "type": "long",
                        "value": 100
                    },
                    {
                        "dateFormat": "yyyy-MM-dd HH:mm:ss",
                        "type": "date",
                        "value": "2014-12-12 12:12:12"
                    },
                    {
                        "type": "bool",
                        "value": true
                    },
                    {
                        "type": "bytes",
                        "value": "byte string"
                    }
                ],
                "sliceRecordCount": 100000
            }
        },
        "writer": {
            "plugin": "ads",
            "parameter": {
                "writeMode": "load",
                "url": "127.0.0.1:3306",
                "schema": "schema",
                "table": "table",
                "username": "username",
                "password": "password",
                "partition": "",
                "lifeCycle": 2,
                "overWrite": true
            }
        }
    }
}
  • 這裏使用一份從內存產生到ADS,使用Insert模式進行導入的數據。
{
    "type": "job",
    "traceId": "您可以在這裏填寫您作業的追蹤ID,建議使用業務名+您的作業ID",
    "version": "1.0",
    "configuration": {
        "reader": {
            "plugin": "stream",
            "parameter": {
                "column": [
                    {
                        "type": "string",
                        "value": "filed"
                    },
                    {
                        "type": "long",
                        "value": 100
                    },
                    {
                        "dateFormat": "yyyy-MM-dd HH:mm:ss",
                        "type": "date",
                        "value": "2014-12-12 12:12:12"
                    },
                    {
                        "type": "bool",
                        "value": true
                    },
                    {
                        "type": "bytes",
                        "value": "byte string"
                    }
                ],
                "sliceRecordCount": 100000
            }
        },
        "writer": {
            "plugin": "ads",
            "parameter": {
                "writeMode": "insert",
                "url": "127.0.0.1:3306",
                "schema": "schema",
                "table": "table",
                "column": ["*"],
                "username": "username",
                "password": "password",
                "partition": "id,ds=2015"
            }
        }
    }
}
3.2 參數說明 (用戶配置規格)
  • url
    • 描述:ADS連接信息,格式為"ip:port"。
    • 必選:是
    • 默認值:無
  • schema
    • 描述:ADS的schema名稱。
    • 必選:是
    • 默認值:無
  • username
    • 描述:ADS對應的username,目前就是accessId
    • 必選:是
    • 默認值:無
  • password
    • 描述:ADS對應的password,目前就是accessKey
    • 必選:是
    • 默認值:無
  • table
    • 描述:目的表的表名稱。
    • 必選:是
    • 默認值:無
  • partition
    • 描述:目標表的分區名稱,當目標表為分區表,需要指定該字段。
    • 必選:否
    • 默認值:無
  • writeMode
    • 描述:支持Load和Insert兩種寫入模式
    • 必選:是
    • 默認值:無
  • column
    • 描述:目的表字段列表,可以為["*"],或者具體的字段列表,例如["a", "b", "c"]
    • 必選:是
    • 默認值:無
  • overWrite
    • 描述:ADS寫入是否覆蓋當前寫入的表,true為覆蓋寫入,false為不覆蓋(追加)寫入。當writeMode為Load,該值才會生效。
    • 必選:是
    • 默認值:無
  • lifeCycle
    • 描述:ADS 臨時表生命周期。當writeMode為Load時,該值才會生效。
    • 必選:是
    • 默認值:無
3.3 類型轉換
CDP 內部類型 ADS 數據類型
Long int, tinyint, smallint, int, bigint
Double float, double, decimal
String varchar
Date date
Boolean bool
Bytes

注意:

  • multivalue ADS支持multivalue類型,CDP對於該類型支持待定.

4 插件約束

如果Reader為ODPS,且ADSWriter寫入模式為Load模式時,ODPS的partition隻支持如下三種配置方式(以兩級分區為例):

"partition":["pt=*,ds=*"]  (讀取test表所有分區的數據)
"partition":["pt=1,ds=*"]  (讀取test表下麵,一級分區pt=1下麵的所有二級分區)
"partition":["pt=1,ds=hangzhou"] (讀取test表下麵,一級分區pt=1下麵,二級分區ds=hz的數據)

最後更新:2016-11-23 16:03:59

  上一篇:go OSSWriter__Writer插件_使用手冊_數據集成-阿裏雲
  下一篇:go TxtFileWriter__Writer插件_使用手冊_數據集成-阿裏雲