MaxCompute實踐
一、寫在前麵
本人之前一直從事程序開發的(PHP、JAVA、Python)工作,在之前的工作經曆中有過一段時間配合Hadoop工程師的事務,但接觸的並不深,隻能說略知點皮毛,有點管中窺豹的感覺。
今年進了新公司,因為公司正在組建新的數據部門,非常有幸本人得以調入該部門,恰逢MaxCompute橫空出世,剛好因為我那時工作比較空閑,得以安排調研它的功能及測試是否符合我們的要求。
由於個人對大數據這塊的經驗不足,涉及的內容也不是太高端的,所得知識基本是通過閱讀文檔獲得,所以本文僅以初學者的角度去闡述,主要圍繞數據遷移這塊,其他部分還未來得急涉及,如有不正確的地方,還請指出,謝謝。
二、平台體係的選型
因初期數據量相對較小,使用Kettle進行抽取數據等工作,ETL的工作大部分在MySQL數據倉庫中完成。多種數據源使用Presto(集群)作為查詢中間鍵進行相應的數據分析。但隨著業務的瘋狂增長,數據表單表達到數億後,磁盤容量達數幾百GB時,數據要求的複雜度逐步提升,使用MySQL作為基礎數據倉庫的基石已經不足以應付,常出現查詢響應時間等待過長,甚至內存崩潰導致執行失敗的情況,極大的影響了工作效率。所以使用選擇一款大數據平台勢在必行。初期考慮使用Hadoop來做進行數據分析平台,恰逢MaxCompute橫空出世給我們一個新的選擇,在經過仔細比對後,考慮到公司新數據部門剛成立不久,Hadoop大數據相關人才儲備較為緊缺,如果等待Hadoop體係完成搭建並使用,可能需要更多的時間,而這勢必會拖慢之後的工作進度。而如果使用MaxCompute相對於自建Haoop數據平台要簡單,快速的多,但任何產品在正式使用前必然需要進行詳細的調研,所以我就開始調研的過程。
三、數據遷移
3.1 數據源類型介紹
目前我們主要有兩種數據源的數據需要放至MaxCompute中進行處理,一種為MySQL,一種為MongoDB。在此主要描述MySQL相關的,至於MongoDB嘛,看到最後你就會知道了^_^。
3.2 遷移工具的選擇
雖然MaxCompute有多種遷移方案可供選擇,但如果使用大數據開發套件上的腳本模式能夠有效完成的話,個人還是比較傾向使用大數據開發套件去完成,除非在此之上無法完成。因為相對於自己使用SDK直接同步或其他工具去實現,大數據開發平台要簡單快速的多,任務調度係統完善,任務運行情況相對清晰便於查錯,之後更多任務可以靈活組合搭配。
3.3 同步策略介紹
數據主要分為會變化的數據(有修改+有增加)與不會變化的數據(隻增加),官方文檔中是建議每天全量的同步策略,即每天的數據作為一個副本存至同步日期為分區,這樣做確實有很多的好處。
但實際情況可能由於數據量過大,每天同步可能會花更多的時間(測試過1億數據大概在3~4小時左右,腳本參數配置speed": { "concurrent": "1", "mbps": "1" }
),因為我們業務中不存在DELTE操作,所以我們這裏處理不管數據是否變化都使用每日增量的方式處理,最終按數據的創建日期存放在對應的分區(前期也不知道怎麼設計,如何設計最佳,所以就先設計為1級分區),雖然這樣需要多做一步合並操作,多耗費一些資源費用,但確實是實現我們的要求。
3.4 步驟介紹
在“大數據開發套件”中同步數據主要使用時間段的方式進行同步,主要分為兩個步驟,第一步為手動全量同步,第二步為自動每日增量。如果想讓數據的每條數據按數據的創建時間存放到對應的日期的話,則需要在此基礎上進行細化。針對不同的數據(會變化與不會變化)的處理方式也各有所不同,基本上是按照文檔中所描述的進行了,隻是稍微有些不同。
3.5 舉例說明
3.5.1 準備工作
假設當前日期為: 2017-07-13
-
Mysql表信息
數據表名: test
創建時間字段: created_at
更改時間字段: updated_at
時間字段值: unix時間戳
表結構定義如下:CREATE TABLE `test` ( `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT '用戶ID', `name` varchar(10) NOT NULL COMMENT '姓名', `age` tinyint(4) NOT NULL COMMENT '年齡', `sex` tinyint(4) NOT NULL DEFAULT '0' COMMENT '性名(1=男, 2=女, 0=未選擇)', `created_at` int(11) NOT NULL COMMENT '創建時間', `updated_at` int(11) NOT NULL COMMENT '修改時間', PRIMARY KEY ( `uid`), KEY `created_at` ( `created_at`), KEY `updated_at` ( `updated_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用戶信息表'
-
MaxCompute數據表
創建一張與這對應的MaxCompute數據表
數據表名: odps_test_history
分 區 名: pt
分 區 值: 數據的創建日期
表結構定義如下:CREATE TABLE odps_test_history ( uid BIGINT COMMENT '用戶ID', name STRING COMMENT '姓名', age BIGINT COMMENT '年齡', sex BIGINT COMMENT '性名(1=男, 2=女, 0=未選擇)', created_at BIGINT COMMENT '創建時間', updated_at BIGINT COMMENT '修改時' ) PARTITIONED BY ( pt STRING ) LIFECYCLE 100000;
命名約定
臨時表表名後綴:_history
增量表表名後綴:_ inc
3.5.1 同步不會變化的數據
1. 手動全量同步
通過腳本模式將小於當前日期(2017-07-13)的所有的數據一次性導入到一張臨時表中。
在數據開發中點擊測試運行,看到任務成功運行後,在任務設置調度參數中設置暫停(測試運行不會受到調度幹擾,直接運行)。
腳本配置如下:
{
"configuration": {
"reader": {
"plugin": "mysql",
"parameter": {
"datasource": "mysql_db001",
"column": [
"uid",
"name",
"age",
"sex",
"created_at",
"updated_at"
],
"where": "created_at < UNIX_TIMESTAMP('20170713')",
"splitPk": "uid",
"table": "test"
}
},
"writer": {
"plugin": "odps",
"parameter": {
"partition": "pt=history",
"truncate": true,
"datasource": "odps_first",
"column": [
"uid",
"name",
"age",
"sex",
"created_at",
"updated_at"
],
"table": "odps_test_history"
}
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": "1",
"mbps": "1"
}
}
},
"type": "job",
"version": "1.0"
}
2. 創建最終目標表
執行ODPS_SQL複製odps_test_history表並命名為odps_test作為最終目標表。
-- 創建目標表
CREATE TABLE odps_test LIKE odps_test_history;
3. 根據數據的時間進行動態分區
執行ODPS_SQL將數據導入到最終目標表中。
--- 將曆史表中的數據根據數據的創建的時間插入目標表對應的分區中
INSERT OVERWRITE TABLE odps_test PARTITION (pt)
SELECT uid, name, age, sex, created_at, updated_at, TO_CHAR(FROM_UNIXTIME(created_at), 'yyyymmdd') AS pt
FROM odps_test_history;
4. 簡單驗證數據
執行ODPS SQL,如兩個數一樣則說明導入成功,如不一樣,請重試。
-- 驗證數據總數
SELECT COUNT(*) FROM odps_test_history;
SELECT COUNT(*) FROM odps_test;
5. 刪除曆史臨時表
如確認數據無誤,則可以刪除臨時表。當然你也可以不刪除,以便出了錯隻需將此時間節點內的數據導入即可,避免從頭導入的過程。
--- 刪除曆史表
ALTER TABLE odps_test_history DROP IF EXISTS PARTITION ( pt='history' );
6. 更改同步腳本
更新同步腳本,並將任務進行提交,之前的第一步操作的暫停記得勾掉。
更改後的同步腳本如下,請注意藍色部分:
7. 完成
如果執行到這一步基本上就完成了,隔天就會在你在設定時間內運行,你也可以去運維中心中查看你的任務情況。
下麵附上”不會變化的數據“數據同步的全部流程圖:
3.5.2 同步會變化的數據
注意事項:
A. 為方便演示,繼續使用test表來作為演示表。
B. 同步會變化的數據與不會變化的數據,前麵5個步驟一樣是一樣,在此不進行複述。
1. 手動全量同步
參考同步不會變化的數據
2.創建最終目標表
參考同步不會變化的數據
3. 根據數據的時間進行動態分區
參考同步不會變化的數據
4. 簡單驗證數據
參考同步不會變化的數據
5. 刪除臨時表
參考同步不會變化的數據
6. 創建增量表
執行ODPS SQL,創建一張增量表
--- 創建增量表
CREATE TABLE odps_test_inc LIKE odps_test;
7. 更改同步腳本
因為太長不好截圖,所以腳本模式這裏的where
的參數值使用:
FROM_UNIXTIME ( created_at,'%Y%m%d')=${bdp.system.bizdate} OR FROM_UNIXTIME(updated_at, '%Y%m%d') = ${bdp.system.bizdate}
如果想效率更高一點可使用下方語句,當字段created_at有索引時可以命中:
(created_at >= UNIX_TIMESTAMP('${bdp.system.bizdate}000000') AND created_at <= UNIX_TIMESTAMP('${bdp.system.bizdate}235959')) OR (updated_at >= UNIX_TIMESTAMP('${bdp.system.bizdate}000000') AND updated_at <= UNIX_TIMESTAMP('${bdp.system.bizdate}235959'))
8. 創建合並數據(ODPS_SQL)任務
在數據開發->任務開發中創建一個節點任務,任務名稱使用_merge
結尾
並輸入以下內容:
--- 合並數據並根據數據的創建時間更新到指定分區中
INSERT OVERWRITE TABLE odps_test PARTITION (pt)
SELECT
CASE WHEN b.id IS NOT NULL THEN b.uid ELSE a.uid END AS uid,
CASE WHEN b.id IS NOT NULL THEN b.name ELSE a.name END AS name,
CASE WHEN b.id IS NOT NULL THEN b.age ELSE a.age END AS age,
CASE WHEN b.id IS NOT NULL THEN b.sex ELSE a.sex END AS sex,
CASE WHEN b.id IS NOT NULL THEN b.created_at ELSE a.created_at END AS created_at,
CASE WHEN b.id IS NOT NULL THEN b.updated_at ELSE a.updated_at END AS updated_at,
CASE WHEN b.id IS NOT NULL THEN TO_CHAR(FROM_UNIXTIME(b.created_at), 'yyyymmdd') ELSE TO_CHAR(FROM_UNIXTIME(a.created_at), 'yyyymmdd') END AS pt
FROM
odps_test a
FULL OUTER JOIN odps_test_inc b
ON a.id = b.id ;
--- 刪除無用數據(如想觀察每日數據變化不進行刪除,上麵一行的SQL需要加上條件: pt='${bdp.system.bizdate}')
ALTER TABLE odps_test_inc DROP IF EXISTS PARTITION ( pt='${bdp.system.bizdate}');
9. 設置調度依賴
將合並數據的任務的上遊任務設置為每日增量同步數據那個任務,並提交即可。
注: odps_test是就是同步數據的腳本模式的任務名稱。
10. 完成
如果執行到這一步基本上就完成了,隔天就會在你在設定時間內運行,你也可以去運維中心中查看你的任務情況。
下麵附上”會變化的數據“數據同步的全部流程圖:
四、數據遷移
4.1 腳本模式下mongodb同步MaxCompute,時間字段值存的unix時間戳如何篩選?
這個問題我之前也有提交過工單,工單內容如下:
我們在測試使用數據同步更新,使用query參數過濾數據,按文檔中所描述,語法參照MongoDB查詢語法
https://help.aliyun.com/knowledge_detail/50354.html?spm=a2c1i.8282367.0.0.OPStre因為我們mongodb表中表示數據"最後更新時間"的字段為updated_at,其存放的內容為unix時間戳。
mongodb中將日期格式化成unix時間戳的方法是:ISODate("2012-10-15T21:26:17+0800").valueOf() / 1000根據該寫法,那在腳本模式中的寫法將是:
"query": "{'updated_at':{'$gte':ISODate('${last_day}T00:00:00.424+0800').valueOf() / 1000}}",但是我們運行後,程序報錯無法運行了,報錯信息為說是格式的問題,解析不了。
報錯信息如下:
2017-06-23 20:46:32.647 [job-37098472] ERROR JobContainer - Exception when job run
org.bson.json.JsonParseException: Invalid JSON input. Position: 62. Character: '.'.當mongodb中字段為unix時間戳時,如果使用query參數進行過濾?
我們花了時間查看文檔依舊沒有得到答案,所以前來請教一下。
最終MaxCompute技術人員給的解決方案如下:
dataworks 裏麵配置shell + datax, 在shell 裏麵講dataworks的調度時間參數yyyy-mm-dd格式 轉換為 unix時間戳格式, 時間戳傳遞給datax,再由datax發給mongodb server 做數據過濾查詢。
在mongodb server 表裏麵添加時間列,時間列的值具體是你原來整數unix時間戳列生成的, 這個隻需要操作修改一次。
4.2 腳本模式下mongodb同步MaxCompute,同步速度如何優化?
因為各種原因我們一開始是拿MongoDB進行的測試,為此專門找DBA挑一張較小的數據表( 約3GB左右)進行。按照文檔中描述,使用腳本模式根據文檔進行了配置,確實很挺簡單的,所以進行任務測試,任務如期正常運行,但經過觀察發現數據同步的速度逐步下降,最讓人無法忍受的隻有10KB/S,該表同步了20多個小時都沒同步完,最後任務自行中止了。經過詢問相關技術人員,可能由於Mongodb同步數據的底層實現還存在一些問題導致同步速度過慢導致的。為此隻能選擇替代方案,主要集中在“基於Kettle的MaxCompute插件”及“基於Tunnel SDK開發”,經過比對後考慮到開發速度及同步速度,最終選擇暫時采用Kettle來進行mongodb的導入工作。
4.3 基於Kettle的MaxCompute插件同步mongodb遇到的問題
以下為我的同事在使用中的經驗總結,在此我一並提一下:
MaxCompute的kettle插件的問題(個人隻用過Aliyun MaxCompute Output,沒有用過Aliyun MaxCompute Input,因此隻對Output部分做評價):
1. 沒有提供更新的功能,隻能插入,做增量更新的時候會很棘手。
2. 沒有提供輸出流,無法記錄日誌。
3. 不支持從上一步獲取變量的功能,對於想要使用變量替換的操作很麻煩。
4. EndPoint、AccessId等配置不支持從配置文件讀取,十分不靈活。
5. 文檔太少,沒有提供對MongoDB等數據庫的幫助文檔。
4.4 Mysql全庫同步工具時間字段是unix時間戳沒辦法用。
因為我們有大量的Mysql數據表需要導入,發現有個整庫遷移的工具可以使用,剛發現時就怦然心動,然而經過實測後大失所望,因為該工具日期字段的值必須是yyyymmdd格式,像我們使用UNIX時間戳壓根沒法用。
另外該功能不支持表字段指定,如果某些數據表字段過於敏感想不進行同步,也不能設置。
4.4 大數據開發套件中“數據表管理”如果表數量過多就不便於查找,且數據表分類隻有一級。
如果左側能提供一個樹形菜單將多級分類列出,點擊就能查看對應分類下的表就方便多了。
五、感謝
接觸一件新事務時遇到了困難,並且自己無法解決時,難免會感到彷徨無助,有種挫敗感。
萬幸的是我在釘釘上找到了組織,非常感謝阿裏雲MaxCompute項目組的@數加·禕休(yī xiū)
、@一斅
、@彭敏
、@李珍珍
等工作人員的鼎力支持,在我們提交問題後,不厭其煩及時並且認真的回答解決我們的問題,祝你們工作順利,事事順心。
最後更新:2017-07-24 21:32:49