MaxCompute MapReduce
前言
MapReduce已經有文檔,用戶可以參考文檔使用。本文是在文檔的基礎上做一些類似注解及細節解釋上的工作。
功能介紹
MapReduce
說起MapReduce就少不了WordCount,我特別喜歡文檔裏的這個圖片。
比如有一張很大的表。表裏有個String字段記錄的是用空格分割開單詞。最後需要統計所有記錄中,每個單詞出現的次數是多少。那整體的計算流程是
1. 輸入階段:根據工作量,生成幾個Mapper,把這些表的數據分配給這些Mapper。每個Mapper分配到表裏的一部分記錄。
2. Map階段:每個Mapper針對每條數據,解析裏麵的字符串,用空格切開字符串,得到一組單詞。針對其中每個單詞,寫一條記錄<Word:單詞名,Count:1>
3. Shuffle階段-合並排序:也是發生在Mapper上。會先對數據進行排序。比如WordCount的例子,會根據單詞進行排序。排序後的合並,又稱Combiner階段,因為前麵已經根據單詞排序過了,相同的單詞都是連在一起的。那可以把2個相鄰的<Word:單詞名,Count:1>
合並成1個<Word:單詞名,Count:2>
。Combiner可以減少在後續Reduce端的計算量,也可以減少Mapper往Reducer的數據傳輸的工作量。
4. Shuffle階段-分配Reducer:把Mapper輸出的單詞分發給Reducer。Reducer拿到數據後,再做一次排序。因為Reducer拿到的數據已經在Mapper裏已經是排序過的了,所以這裏的排序隻是針對排序過的數據做合並排序。
5. Reduce階段:Reducer拿前麵已經排序好的輸入,相同的單詞的所有輸入進入同一個Redue循環,在循環裏,做個數的累加。
6. 輸出階段:輸出Reduce的計算結果,寫入到表裏或者返回給客戶端。
拓展MapReduce
如果Reduce後麵還需要做進一步的Reduce計算,可以用拓展MapReduce模型(簡稱MRR)。MRR其實就是Reduce階段結束後,不直接輸出結果,而是再次經過Shuffle後接另外一個Reduce。
Q:如何實現M->R->M->R這種邏輯呢
A:在Reduce代碼裏直接嵌套上Map的邏輯就可以了,把第二個M的工作在前一個R裏完成,而不是作為計算引擎調度層麵上的一個單獨步驟,比如reduce(){ ... map(); }
快速開始
運行環境
工欲善其事,必先利其器。MR的開發提供了基於IDEA和Eclipse的插件。其中比較推薦用IDEA的插件,因為IDEA我們還在持續做迭代,而Eclipse已經停止做更新了。而且IDEA的功能也比較豐富。
線上運行
以WrodCount為例,文檔可以參考這裏
步驟為
1. 做數據準備,包括創建表和使用Tunnel命令行工具導入數據
2. 將代碼拷貝到IDE裏,編譯打包成mapreduce-examples.jar
3. 在odpscmd裏執行add jar命令:add jar /JarPath/mapreduce-examples.jar -f;
這裏的 /JarPath/mapreduce-examples.jar
的路徑要替換成本地實際的文件路徑。這個命令能把本地的jar包傳到服務器上,-f是如果已經有同名的jar包就覆蓋,實際使用中對於是報錯還是覆蓋需要謹慎考慮。
4. 在odpscmd裏執行jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out
5. 等待作業執行成功後,可以在SQL通過查詢wc_out表的數據,看到執行的結果
功能解讀
任務提交
任務的是在MaxComput(ODPS)上運行的,客戶端通過jar命令發起請求。
對比前麵的快速開始,可以看到除去數據準備階段,和MR相關的,有資源的上傳(add jar步驟)和jar命令啟動MR作業兩步。
1. 客戶端發起add jar/add file等資源操作,把在客戶端的機器(比如我測試的時候是從我的筆記本)上,運行任務涉及的資源文件傳到服務器上。這樣後麵運行任務的時候,服務器上才能有對應的代碼和文件可以用。如果以前已經傳過了,這一步可以省略。
2. jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out
這個命令發起作業。MapReduce的任務是運行在MaxCompute集群上的,客戶端需要通過這個命令把任務運行相關的信息告訴集群。
- 客戶端先解析-classpath參數,找到main方法相關的jar包的位置
- 根據com.aliyun.odps.mapred.open.example.WordCount,找到main方法所在類的路徑和名字
- wc_in wc_out是傳給main方法的參數,通過解析main方法傳入參數String[] args獲得這個參數
- -resources告訴服務器,在運行任務的時候,需要用到的資源有哪些。
JobConfig
JobConf定義了這個任務的細節,還是這個圖,解釋一下JobConf的其他設置項的用法。
1. 輸入數據InputUtils.addTable(TableInfo table, JobConf conf)
設置了輸入的表。setSplitSize(long size)
通過調整分片大小來調整Mapper個數,單位 MB,默認640。Mapper個數不通過void setNumMapTasks(int n)
設置。setMemoryForJVM(int mem)
設置 JVM虛擬機的內存資源,單位:MB,默認值 1024.
2. Map階段setMapperClass(Class<? extends Mapper> theClass)
設置Mapper使用的Java類。setMapOutputKeySchema(Column[] schema)
設置 Mapper 輸出到 Reducer 的 Key 行屬性。setMapOutputValueSchema(Column[] schema)
設置 Mapper 輸出到 Reducer 的 Value 行屬性。和上個設置一起定義了Mapper到Reducer的數據格式。
3. Shuffle-合並排序setOutputKeySortColumns(String[] cols)
設置 Mapper 輸出到 Reducer 的 Key 排序列。setOutputKeySortOrder(JobConf.SortOrder[] order)
設置 Key 排序列的順序。setCombinerOptimizeEnable(boolean isCombineOpt)
設置是否對Combiner進行優化。setCombinerClass(Class<? extends Reducer> theClass)
設置作業的 combiner。
4. Shuffle-分配ReducesetNumReduceTasks(int n)
設置 Reducer 任務數,默認為 Mapper 任務數的 1/4。如果是Map only的任務,需要設置成0。可以參考這裏。setPartitionColumns(String[] cols)
設置作業的分區列,定義了數據分配到Reducer的分配策略。
5. Reduce階段setOutputGroupingColumns(String[] cols)
數據在Reducer裏排序好了後,是哪些數據進入到同一個reduce方法的,就是看這裏的設置。一般來說,設置的和setPartitionColumns(String[] cols)
一樣。可以看到二次排序的用法。setReducerClass(Class<? extends Reducer> theClass)
設置Reducer使用的Java類。
6. 數據輸出setOutputOverwrite(boolean isOverwrite)
設置對輸出表是否進行覆蓋。類似SQL裏的Insert into/overwrite Talbe的區別。OutputUtils.addTable(TableInfo table, JobConf conf)
設置了輸出的表。多路輸入輸出可以參考這裏。
7. 其他void setResources(String resourceNames)
有和jar命令的-resources一樣的功能,但是優先級高於-resources(也就是說代碼裏的設置優先級比較高)
最後通過JobClient.runJob(job);
客戶端往服務器發起了這個MapReduce作業。
詳細的SDK的文檔,可以在Maven裏下載。這是下載地址。
Map/Reduce
讀表
在一個Mapper裏,隻會讀一張表,不同的表的數據會在不同的Mapper worker上運行,所以可以用示例裏的這個方法先獲得這個Mapper讀的是什麼表。
資源表/文件
資源表和文件可以讓一些小表/小文件可以方便被讀取。鑒於讀取數據的限製需要小於64次,一般是在setup裏讀取後緩存起來,具體的例子可以參考這裏。
生產及周期調度
任務提交
客戶端做的就是給服務器發起任務的調度的指令。之前提到的jar命令就是一種方法。鑒於實際上運行場景的多樣性,這裏介紹其他的幾種常見方法:
-
odpscmd -e/-f:odpscmd的-e命令可以在shell腳本裏直接運行一個odpscmd裏的命令,所以可以在shell腳本裏運行
odpscmd -e 'jar -resources xxxxxx'
這樣的命令,在shell腳本裏調用MapReduce作業。一個完整的例子是odpscmd -u accessId -p accessKey --project=testproject --endpoint=https://service.odps.aliyun.com/api -e "jar -resources aaa.jar -classpath ./aaa.jar com.XXX.A"
如果在odpscmd的配置文件裏已經配置好了,那隻需要寫-e的部分。
-f和-e一樣,隻是把命令寫到文件裏,然後用odpscmd -f xxx.sql引用這個文件,那這個文件裏的多個指令都會被執行。 大數據開發套件可以配置MapReduce作業。
大數據開發套件可以配置Shell作業。可以在Shell作業裏參考上麵的方法用odpscmd -e/-f來調度MapReduce作業。
在JAVA代碼裏直接調用MapReduce作業,可以通過設置
SessionState.setLocalRun(false);
實現,具體可以參考這裏。
定時調度
大數據開發套件的定時任務/工作流可以配置調度周期和任務依賴,配合前麵提到的方法裏的MapReduce作業/Shell作業,實現任務的調度。
產品限製
安全沙箱
沙箱是MaxCompute的一套安全體係,使得在MaxCompute上運行的作業無法獲得其他用戶的信息,也無法獲得係統的一些信息。主要包括以下幾點,完整的列表可以參考文檔
- 無法訪問外部數據源(不能當爬蟲,不能讀RDS等)
- 無法起多線程/多進程
- 不支持反射/自定義類加載器(所以不支持一些第三方包)
- 不允許讀本地文件(比如JSON裏就用到了,就需要改用GSON)
- 不允許JNI調用
其他限製
最後更新:2017-05-21 19:31:20