新老DataHub遷移手冊
DataHub服務用戶遷移文檔
前言
原Odps版內測DataHub(下文統稱為老DataHub服務),於2016年11月21日起已經處於維護狀態,新版DataHub屆時已經開啟公測,公測至今已有半年以上時間,我們決定開始逐步下線老DataHub服務,老版部分用戶需要遷移至新版DataHub。
新版本具有更多的特性,性能功能都有不少提升,可以同時支持數據一份數據同步到Odps、OSS、ElasticSearch等多個不同服務中,且提供WebConsole控製台進行更簡單的操作。
準備工作
本文檔針對使用Logstash、Fluentd、Flume以及使用SDK寫入老DataHub服務的用戶,提供遷移到新服務的指引,過程中遇到任何困難可以聯係我們
新版DataHub相關文檔
創建新datahub project
新版DataHub中存在項目空間-Project概念,與Odps中Project類似,但是不等於Odps中的Project,為了方便管理,我們建議遷移時在DataHub中創建與Odps Project同名的Project(不同名稱也可以)
- 登錄DataHub官網控製台,使用阿裏雲賬號登錄;
- 點擊創建Project,輸入名稱及描述,點擊創建(Project描述中建議攜帶Project用處及Owner的郵箱或聯係方式)
創建新DataHub topic
新版DataHub存在主題-Topic的概念,與Odps的Table類似,但是不等於Odps的Table,通常如果是需要導入數據到Odps的話,需要為每張表創建一個Topic,且字段類型、順序與名稱需要一致,Odps中的分區字段當做普通的Topic字段處理,新版DataHub會根據該分區字段再DataHub中的數據值,將數據同步到Odps離線表中。
例如:
MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
對應Topic應為如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)
創建Topic可以通過以下方式:
- 若Topic數量較少,可以再WebConsole控製台,進入Project頁麵後點擊創建Topic按鈕,選擇從MaxCompute導入,輸入配置信息後勾選“自動創建DataConnector”,點擊“導入表結構”即可導入odps表對應的格式,確認格式無誤後選擇Shard數量及生命周期, Shard數量建議與老服務一樣,生命周期建議3天,點擊創建即可。
- 若Topic過多,可以使用遷移工具DataHub表結構遷移工具,工具將對列表中的所有表創建對應Topic及Connector。
DataHub與MaxCompute字段類型對應表
MaxCompute表中的類型 | DataHub Topic中的類型 |
---|---|
STRING | STRING |
DOUBLE | DOUBLE |
BIGINT | BIGINT |
DATETIME | TIMESTAMP (注:以微秒為度量單位) |
BOOLEAN | BOOLEAN |
DECIMAL | 不支持 |
MAP | 不支持 |
ARRAY | 不支持 |
映射Odps分區
老DataHub在寫入數據時需要直接指定分區,如果是通過fluend或logstash等插件寫入的用戶是需要配置分區信息或者通過某個時間字段轉為固定格式作為分區
新版DataHub在這一行為上有所改變,Odps表的分區字段再DataHub中將會變成一個普通字段,後台Connector同步任務在同步數據到Odps表時會根據分區字段比如pt具體每條記錄的值寫入Odps對應分區中。
例如:
MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
對應Topic應為如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)
數據1: ("test", "test", "0.14", "a1", "20170405")
數據2: ("test", "test", "0.14", "aa", "20170406")
則數據1將會同步到odps分區ds=a1,pt=20170405
則數據2將會同步到odps分區ds=a2,pt=20170406
- 若使用插件導入,並且是通過字符串轉換為固定格式的分區值的用戶,新的插件需要使用fluentd/logstash的filter功能,對分區字段的值進行轉換,具體使用方式可以參考這些開源工具的官方文檔
不同類型接入方式遷移
使用Java SDK
需要換成新版本DataHub的SDK,Mvn依賴變化
原依賴
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.xxx</version>
</dependency>
新依賴
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.3.0-public</version>
</dependency>
Client初始化
原Client初始化步驟
Account account = new AliyunAccount(accessId, accessKey);
odps = new Odps(account);
odps.setDefaultProject(project);
odps.setEndpoint(odpsEndpoint);
DatahubClient client = new DatahubClient(odps, project, table, datahubEndpoint);
client.loadShard(shardNumber);
client.waitForShardLoad();
新Client初始化步驟
AliyunAccount account = new AliyunAccount(accessId, accessKey);
DatahubConfiguration conf = new DatahubConfiguration(account, datahubEndpoint);
DatahubClient client = new DatahubClient(conf);
獲取Shard列表
原獲取Shard列表及狀態方式
HashMap<Long, DatahubClient.ShardState> shardStatus = client.getShardStatus();
新方式
ListShardResult listShardResult = client.listShard(projectName, topicName);
寫入數據
原寫入方式
DatahubWriter writer = client.openDatahubWriter(shardId);
TableSchema schema = client.getStreamSchema();
DatahubRecordPack recordPack = new DatahubRecordPack(schema);
/* Write another 20 records recordPack into another partition */
for (int i = 0; i < 20; i++) {
Record record = makeRecord(schema);
recordPack.append(record);
}
partSpec = "pt='20150809'";
packId = writer.write(new PartitionSpec(partSpec), recordPack)
.getPackId();
System.out.println("record append to the pack: " + packId);
新寫入方式
List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
RecordEntry entry = new RecordEntry(schema);
entry.setString(0, "Test");
entry.setBigint(1, 5L);
entry.setShardId(shardId);
recordEntries.add(entry);
PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
List<ErrorEntry> errors = result.getFailedRecordError();
// deal with result.getFailedRecords()
}
使用Fluentd
通過Fluend插件寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟
- 更換,安裝新插件包
- 根據配置文件對比,修改現有配置文件
- 使用新配置文件重新啟動fluend進程
插件包更換
原安裝語句
gem install fluent-plugin-aliyun-odps
新安裝語句(也可按照新版文檔提供的一鍵安裝包安裝logstash)
gem install fluent-plugin-datahub
配置對比
部分配置不需更改,更改match 部分配置即可。
老服務配置項 | 新服務配置項 | 備注 |
---|---|---|
type | type | 需要從aliyun_odps改為dataHub |
aliyun_access_id | access_id | 雲賬號accessid |
aliyun_access_key | access_key | 雲賬號accesskey |
aliyun_odps_hub_endpoint | endpoint | Datahub服務域名,需要改為新服務的域名 |
aliyun_odps_endpoint | 無 | 不再需要 |
buffer_chunk_limit | buffer_chunk_limit | 不需要變化,但是新配置不能超過3MB |
buffer_queue_limit | buffer_queue_limit | 不需要變化 |
flush_interval | flush_interval | 不需要變化 |
project | project_name | datahub的Project,非odps project |
table | topic_name | datahub的topic,非odps table |
fields | column_names | 指定需要采集的列 |
partition | 無 | 不再需要 |
time_format | 無 | 不再需要 |
shard_number | 無 | 不再需要 |
enable_fast_crc | 無 | 不再需要 |
retry_time | retry_time | 重試次數 |
retry_interval | retry_interval | 重試間隔 |
abandon_mode | 無 | 不再需要 |
新增配置
新服務配置項 | 備注 |
---|---|
dirty_data_continue | true/false遇到增數據是否繼續,若為true 遇到髒數據會重試,重試次數用完,會將髒數據寫入髒數據文件 |
dirty_data_file | 指定髒數據文件的位置 |
put_data_batch_size | 每1000條record寫一次DataHub |
shard_id | 指定shard_id寫入,默認round-robin方式寫入 |
shard_keys | 指定用作分區key,用key值hash後作為寫入shard的索引 |
[TODO] 能否放一個新老的diff文件example
使用Logstash
通過Logstash插件寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟
- 更換,安裝新插件包
- 根據配置文件對比,修改現有配置文件
- 使用新配置文件重新啟動Logstash進程
插件包更換
配置對比
input部分配置不需更改,更改output部分配置即可。
老服務配置項 | 新服務配置項 | 備注 |
---|---|---|
type | type | 需要從aliyun_odps改為dataHub |
aliyun_access_id | access_id | 雲賬號accessid |
aliyun_access_key | access_key | 雲賬號accesskey |
aliyun_odps_hub_endpoint | endpoint | Datahub服務域名,需要改為新服務的域名 |
aliyun_odps_endpoint | 無 | 不再需要 |
value_field | 無 | 不再需要 |
project | project_name | datahub的Project,非odps project |
table | topic_name | datahub的topic,非odps table |
partition | 無 | 不再需要 |
partition_time_format | 無 | 不再需要 |
shard_number | 無 | 不再需要 |
batch_size | 通過logstash啟動參數設置 | logstash -f <上述配置文件地址> -b 256 (256即為每次batch大小) |
batch_timeout | 無 | 不再需要 |
新增配置
新服務配置項 | 備注 |
---|---|
dirty_data_continue | true/false遇到增數據是否繼續,若為true 遇到髒數據會重試,重試次數用完,會將髒數據寫入髒數據文件 |
dirty_data_file | 指定髒數據文件的位置 |
put_data_batch_size | 每1000條record寫一次DataHub |
shard_keys | 數組類型,數據落shard的字段名稱,插件會根據這些字段的值計算hash將每條數據落某個shard, 注意shard_keys和shard_id都未指定,默認輪詢落shard |
shard_id | 所有數據落指定的shard,注意shard_keys和shard_id都未指定,默認輪詢落shard |
retry_times | 重試次數,-1為無限重試、0為不重試、>0表示需要有限次數, 默認值為-1 |
retry_interval | 下一次重試的間隔,單位為秒,默認值為5 |
使用Apache Flume
通過Flume工具寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟
- 更換,安裝新Flume工具插件
- 根據配置文件對比,修改現有配置文件
- 使用新配置文件重新啟動Flume進程
插件更新
配置對比
老服務配置項 | 新服務配置項 | 備注 |
---|---|---|
a1.sinks.k1.type | a1.sinks.k1.type | 從com.aliyun.odps.flume.sink.OdpsSink改為com.aliyun.datahub.flume.sink.DatahubSink |
a1.sinks.k1.accessID | a1.sinks.k1.datahub.accessID | 雲賬號accessid |
a1.sinks.k1.accessKey | a1.sinks.k1.datahub.accessKey | 雲賬號accesskey |
a1.sinks.k1.odps.endPoint | a1.sinks.k1.datahub.endPoint | Datahub服務域名,需要改為新服務的域名 |
aliyun_odps_endpoint | 無 | 不再需要 |
a1.sinks.k1.odps.project | a1.sinks.k1.datahub.project | datahub的Project,非odps project |
a1.sinks.k1.odps.table | a1.sinks.k1.datahub.topic | datahub的topic,非odps table |
a1.sinks.k1.odps.partition | 無 | 不再需要 |
a1.sinks.k1.batchSize | a1.sinks.k1.batchSize | 批次大小 |
a1.sinks.k1.serializer | a1.sinks.k1.serializer | 無變化 |
a1.sinks.k1.serializer.delimiter | a1.sinks.k1.serializer.delimiter | 無變化 |
a1.sinks.k1.serializer.fieldnames | a1.sinks.k1.serializer.fieldnames | 無變化 |
a1.sinks.k1.serializer.charset | a1.sinks.k1.serializer.charset | 無變化 |
a1.sinks.k1.serializer.delimiter | a1.sinks.k1.serializer.delimiter | 無變化 |
a1.sinks.k1.shard.number | 無 | 不再需要 |
a1.sinks.k1.shard.maxTimeOut | a1.sinks.k1.shard.maxTimeOut | 無變化 |
a1.sinks.k1.autoCreatePartition | 無 | 不再需要 |
使用OGG
通過OGG工具寫入數據的用戶,遷移除了上述準備工作外,還需進行三個步驟
- 更換,安裝新OGG工具插件
- 根據配置文件對比,修改現有配置文件
- 使用新配置文件重新啟動OGG進程
插件更新
配置對比
老服務配置項 | 新服務配置項 | 備注 |
---|---|---|
gg.handlerlist | gg.handlerlist | 不需修改,仍然為ggdatahub |
gg.handler.ggdatahub.type | gg.handler.ggdatahub.type | 不需修改,仍然為com.aliyun.odps.ogg.handler.datahub.DatahubHandler |
gg.classpath | gg.classpath | YOUR_DATAHUB_HANDLER_DIRECTORY/datahub_lib/*改為{YOUR_HOME}/datahub-ogg-plugin/lib/* |
除以上配置外,其他DataHub相關配置均獨立到configure.xml文件配置,具體含義請參看新版OGG工具文檔。
最後更新:2017-06-21 18:02:24