閱讀971 返回首頁    go 阿裏雲 go 技術社區[雲棲]


新老DataHub遷移手冊

DataHub服務用戶遷移文檔

前言

原Odps版內測DataHub(下文統稱為老DataHub服務),於2016年11月21日起已經處於維護狀態,新版DataHub屆時已經開啟公測,公測至今已有半年以上時間,我們決定開始逐步下線老DataHub服務,老版部分用戶需要遷移至新版DataHub。

新版本具有更多的特性,性能功能都有不少提升,可以同時支持數據一份數據同步到Odps、OSS、ElasticSearch等多個不同服務中,且提供WebConsole控製台進行更簡單的操作。

準備工作

本文檔針對使用Logstash、Fluentd、Flume以及使用SDK寫入老DataHub服務的用戶,提供遷移到新服務的指引,過程中遇到任何困難可以聯係我們

dingtalk

新版DataHub相關文檔

DataHub產品使用文檔

DataHub控製台

創建新datahub project

新版DataHub中存在項目空間-Project概念,與Odps中Project類似,但是不等於Odps中的Project,為了方便管理,我們建議遷移時在DataHub中創建與Odps Project同名的Project(不同名稱也可以)

  • 登錄DataHub官網控製台,使用阿裏雲賬號登錄;
  • 點擊創建Project,輸入名稱及描述,點擊創建(Project描述中建議攜帶Project用處及Owner的郵箱或聯係方式)

創建新DataHub topic

新版DataHub存在主題-Topic的概念,與Odps的Table類似,但是不等於Odps的Table,通常如果是需要導入數據到Odps的話,需要為每張表創建一個Topic,且字段類型、順序與名稱需要一致,Odps中的分區字段當做普通的Topic字段處理,新版DataHub會根據該分區字段再DataHub中的數據值,將數據同步到Odps離線表中。

例如:

MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
對應Topic應為如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)

創建Topic可以通過以下方式:

  • 若Topic數量較少,可以再WebConsole控製台,進入Project頁麵後點擊創建Topic按鈕,選擇從MaxCompute導入,輸入配置信息後勾選“自動創建DataConnector”,點擊“導入表結構”即可導入odps表對應的格式,確認格式無誤後選擇Shard數量及生命周期, Shard數量建議與老服務一樣,生命周期建議3天,點擊創建即可。
  • 若Topic過多,可以使用遷移工具DataHub表結構遷移工具,工具將對列表中的所有表創建對應Topic及Connector。

DataHub與MaxCompute字段類型對應表

MaxCompute表中的類型 DataHub Topic中的類型
STRING STRING
DOUBLE DOUBLE
BIGINT BIGINT
DATETIME TIMESTAMP (注:以微秒為度量單位)
BOOLEAN BOOLEAN
DECIMAL 不支持
MAP 不支持
ARRAY 不支持

映射Odps分區

老DataHub在寫入數據時需要直接指定分區,如果是通過fluend或logstash等插件寫入的用戶是需要配置分區信息或者通過某個時間字段轉為固定格式作為分區

新版DataHub在這一行為上有所改變,Odps表的分區字段再DataHub中將會變成一個普通字段,後台Connector同步任務在同步數據到Odps表時會根據分區字段比如pt具體每條記錄的值寫入Odps對應分區中。

例如:

MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
對應Topic應為如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)
數據1: ("test", "test", "0.14", "a1", "20170405")
數據2: ("test", "test", "0.14", "aa", "20170406")
則數據1將會同步到odps分區ds=a1,pt=20170405
則數據2將會同步到odps分區ds=a2,pt=20170406
  • 若使用插件導入,並且是通過字符串轉換為固定格式的分區值的用戶,新的插件需要使用fluentd/logstash的filter功能,對分區字段的值進行轉換,具體使用方式可以參考這些開源工具的官方文檔

不同類型接入方式遷移

使用Java SDK

需要換成新版本DataHub的SDK,Mvn依賴變化

原依賴

<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-core</artifactId>
    <version>0.xxx</version>
</dependency>

新依賴

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.3.0-public</version>
</dependency>

Client初始化

原Client初始化步驟

Account account = new AliyunAccount(accessId, accessKey);
odps = new Odps(account);
odps.setDefaultProject(project);
odps.setEndpoint(odpsEndpoint);
DatahubClient client = new DatahubClient(odps, project, table, datahubEndpoint);
client.loadShard(shardNumber);
client.waitForShardLoad();

新Client初始化步驟

AliyunAccount account = new AliyunAccount(accessId, accessKey);
DatahubConfiguration conf = new DatahubConfiguration(account, datahubEndpoint);
DatahubClient client = new DatahubClient(conf);

獲取Shard列表

原獲取Shard列表及狀態方式

HashMap<Long, DatahubClient.ShardState> shardStatus = client.getShardStatus();

新方式

ListShardResult listShardResult = client.listShard(projectName, topicName);

寫入數據

原寫入方式

DatahubWriter writer = client.openDatahubWriter(shardId);
TableSchema schema = client.getStreamSchema();
DatahubRecordPack recordPack = new DatahubRecordPack(schema);

/* Write another 20 records recordPack into another partition */
for (int i = 0; i < 20; i++) {
    Record record = makeRecord(schema);
    recordPack.append(record);
}

partSpec = "pt='20150809'";
packId = writer.write(new PartitionSpec(partSpec), recordPack)
    .getPackId();
System.out.println("record append to the pack: " + packId);

新寫入方式

List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
RecordEntry entry = new RecordEntry(schema);
entry.setString(0, "Test");
entry.setBigint(1, 5L);
entry.setShardId(shardId);
recordEntries.add(entry);
PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
    List<ErrorEntry> errors = result.getFailedRecordError();
    // deal with result.getFailedRecords()
}

完整寫入新DataHub示例代碼

使用Fluentd

通過Fluend插件寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟

  • 更換,安裝新插件包
  • 根據配置文件對比,修改現有配置文件
  • 使用新配置文件重新啟動fluend進程

插件包更換

新版Fluentd插件使用文檔

原安裝語句

gem install fluent-plugin-aliyun-odps

新安裝語句(也可按照新版文檔提供的一鍵安裝包安裝logstash)

gem install fluent-plugin-datahub

配置對比

部分配置不需更改,更改match 部分配置即可。

老服務配置項 新服務配置項 備注
type type 需要從aliyun_odps改為dataHub
aliyun_access_id access_id 雲賬號accessid
aliyun_access_key access_key 雲賬號accesskey
aliyun_odps_hub_endpoint endpoint Datahub服務域名,需要改為新服務的域名
aliyun_odps_endpoint 不再需要
buffer_chunk_limit buffer_chunk_limit 不需要變化,但是新配置不能超過3MB
buffer_queue_limit buffer_queue_limit 不需要變化
flush_interval flush_interval 不需要變化
project project_name datahub的Project,非odps project
table topic_name datahub的topic,非odps table
fields column_names 指定需要采集的列
partition 不再需要
time_format 不再需要
shard_number 不再需要
enable_fast_crc 不再需要
retry_time retry_time 重試次數
retry_interval retry_interval 重試間隔
abandon_mode 不再需要

新增配置

新服務配置項 備注
dirty_data_continue true/false遇到增數據是否繼續,若為true 遇到髒數據會重試,重試次數用完,會將髒數據寫入髒數據文件
dirty_data_file 指定髒數據文件的位置
put_data_batch_size 每1000條record寫一次DataHub
shard_id 指定shard_id寫入,默認round-robin方式寫入
shard_keys 指定用作分區key,用key值hash後作為寫入shard的索引

[TODO] 能否放一個新老的diff文件example

使用Logstash

通過Logstash插件寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟

  • 更換,安裝新插件包
  • 根據配置文件對比,修改現有配置文件
  • 使用新配置文件重新啟動Logstash進程

插件包更換

新版Logstash插件使用文檔

配置對比

input部分配置不需更改,更改output部分配置即可。

老服務配置項 新服務配置項 備注
type type 需要從aliyun_odps改為dataHub
aliyun_access_id access_id 雲賬號accessid
aliyun_access_key access_key 雲賬號accesskey
aliyun_odps_hub_endpoint endpoint Datahub服務域名,需要改為新服務的域名
aliyun_odps_endpoint 不再需要
value_field 不再需要
project project_name datahub的Project,非odps project
table topic_name datahub的topic,非odps table
partition 不再需要
partition_time_format 不再需要
shard_number 不再需要
batch_size 通過logstash啟動參數設置 logstash -f <上述配置文件地址> -b 256 (256即為每次batch大小)
batch_timeout 不再需要

新增配置

新服務配置項 備注
dirty_data_continue true/false遇到增數據是否繼續,若為true 遇到髒數據會重試,重試次數用完,會將髒數據寫入髒數據文件
dirty_data_file 指定髒數據文件的位置
put_data_batch_size 每1000條record寫一次DataHub
shard_keys 數組類型,數據落shard的字段名稱,插件會根據這些字段的值計算hash將每條數據落某個shard, 注意shard_keys和shard_id都未指定,默認輪詢落shard
shard_id 所有數據落指定的shard,注意shard_keys和shard_id都未指定,默認輪詢落shard
retry_times 重試次數,-1為無限重試、0為不重試、>0表示需要有限次數, 默認值為-1
retry_interval 下一次重試的間隔,單位為秒,默認值為5

使用Apache Flume

通過Flume工具寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟

  • 更換,安裝新Flume工具插件
  • 根據配置文件對比,修改現有配置文件
  • 使用新配置文件重新啟動Flume進程

插件更新

新版Flume工具文檔

配置對比

老服務配置項 新服務配置項 備注
a1.sinks.k1.type a1.sinks.k1.type 從com.aliyun.odps.flume.sink.OdpsSink改為com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.accessID a1.sinks.k1.datahub.accessID 雲賬號accessid
a1.sinks.k1.accessKey a1.sinks.k1.datahub.accessKey 雲賬號accesskey
a1.sinks.k1.odps.endPoint a1.sinks.k1.datahub.endPoint Datahub服務域名,需要改為新服務的域名
aliyun_odps_endpoint 不再需要
a1.sinks.k1.odps.project a1.sinks.k1.datahub.project datahub的Project,非odps project
a1.sinks.k1.odps.table a1.sinks.k1.datahub.topic datahub的topic,非odps table
a1.sinks.k1.odps.partition 不再需要
a1.sinks.k1.batchSize a1.sinks.k1.batchSize 批次大小
a1.sinks.k1.serializer a1.sinks.k1.serializer 無變化
a1.sinks.k1.serializer.delimiter a1.sinks.k1.serializer.delimiter 無變化
a1.sinks.k1.serializer.fieldnames a1.sinks.k1.serializer.fieldnames 無變化
a1.sinks.k1.serializer.charset a1.sinks.k1.serializer.charset 無變化
a1.sinks.k1.serializer.delimiter a1.sinks.k1.serializer.delimiter 無變化
a1.sinks.k1.shard.number 不再需要
a1.sinks.k1.shard.maxTimeOut a1.sinks.k1.shard.maxTimeOut 無變化
a1.sinks.k1.autoCreatePartition 不再需要

使用OGG

通過OGG工具寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟

  • 更換,安裝新OGG工具插件
  • 根據配置文件對比,修改現有配置文件
  • 使用新配置文件重新啟動OGG進程

插件更新

新版OGG工具文檔

配置對比

老服務配置項 新服務配置項 備注
gg.handlerlist gg.handlerlist 不需修改,仍然為ggdatahub
gg.handler.ggdatahub.type gg.handler.ggdatahub.type 不需修改,仍然為com.aliyun.odps.ogg.handler.datahub.DatahubHandler
gg.classpath gg.classpath YOUR_DATAHUB_HANDLER_DIRECTORY/datahub_lib/*改為{YOUR_HOME}/datahub-ogg-plugin/lib/*

除以上配置外,其他DataHub相關配置均獨立到configure.xml文件配置,具體含義請參看新版OGG工具文檔

最後更新:2017-06-21 18:02:24

  上一篇:go  《數據驅動安全:數據安全分析、可視化和儀表盤》一3.4 探索數據
  下一篇:go  《數據驅動安全:數據安全分析、可視化和儀表盤》一3.3 讀入數據