閱讀74 返回首頁    go 微軟 go windows


MaxCompute MapReduce

前言

MapReduce已經有文檔,用戶可以參考文檔使用。本文是在文檔的基礎上做一些類似注解及細節解釋上的工作。

功能介紹

MapReduce

說起MapReduce就少不了WordCount,我特別喜歡文檔裏的這個圖片。
alt
比如有一張很大的表。表裏有個String字段記錄的是用空格分割開單詞。最後需要統計所有記錄中,每個單詞出現的次數是多少。那整體的計算流程是
1. 輸入階段:根據工作量,生成幾個Mapper,把這些表的數據分配給這些Mapper。每個Mapper分配到表裏的一部分記錄。
2. Map階段:每個Mapper針對每條數據,解析裏麵的字符串,用空格切開字符串,得到一組單詞。針對其中每個單詞,寫一條記錄<Word:單詞名,Count:1>
3. Shuffle階段-合並排序:也是發生在Mapper上。會先對數據進行排序。比如WordCount的例子,會根據單詞進行排序。排序後的合並,又稱Combiner階段,因為前麵已經根據單詞排序過了,相同的單詞都是連在一起的。那可以把2個相鄰的<Word:單詞名,Count:1>合並成1個<Word:單詞名,Count:2>。Combiner可以減少在後續Reduce端的計算量,也可以減少Mapper往Reducer的數據傳輸的工作量。
4. Shuffle階段-分配Reducer:把Mapper輸出的單詞分發給Reducer。Reducer拿到數據後,再做一次排序。因為Reducer拿到的數據已經在Mapper裏已經是排序過的了,所以這裏的排序隻是針對排序過的數據做合並排序。
5. Reduce階段:Reducer拿前麵已經排序好的輸入,相同的單詞的所有輸入進入同一個Redue循環,在循環裏,做個數的累加。
6. 輸出階段:輸出Reduce的計算結果,寫入到表裏或者返回給客戶端。

拓展MapReduce

如果Reduce後麵還需要做進一步的Reduce計算,可以用拓展MapReduce模型(簡稱MRR)。MRR其實就是Reduce階段結束後,不直接輸出結果,而是再次經過Shuffle後接另外一個Reduce。

Q:如何實現M->R->M->R這種邏輯呢
A:在Reduce代碼裏直接嵌套上Map的邏輯就可以了,把第二個M的工作在前一個R裏完成,而不是作為計算引擎調度層麵上的一個單獨步驟,比如

reduce(){
    ...
    map();
}

快速開始

運行環境

工欲善其事,必先利其器。MR的開發提供了基於IDEA和Eclipse的插件。其中比較推薦用IDEA的插件,因為IDEA我們還在持續做迭代,而Eclipse已經停止做更新了。而且IDEA的功能也比較豐富。

  • 具體的插件的安裝方法步驟可以參考文檔,本文不在贅言。
  • 另外後續還需要用到客戶端,可以參考文檔安裝。
  • 後續為了更加清楚地說明問題,我會盡可能地在客戶端上操作,而不用IDEA裏已經集成的方法。

線上運行

以WrodCount為例,文檔可以參考這裏
步驟為
1. 做數據準備,包括創建表和使用Tunnel命令行工具導入數據
2. 將代碼拷貝到IDE裏,編譯打包成mapreduce-examples.jar
3. 在odpscmd裏執行add jar命令:
add jar /JarPath/mapreduce-examples.jar -f;
這裏的 /JarPath/mapreduce-examples.jar的路徑要替換成本地實際的文件路徑。這個命令能把本地的jar包傳到服務器上,-f是如果已經有同名的jar包就覆蓋,實際使用中對於是報錯還是覆蓋需要謹慎考慮。
4. 在odpscmd裏執行
jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out

5. 等待作業執行成功後,可以在SQL通過查詢wc_out表的數據,看到執行的結果

功能解讀

任務提交

任務的是在MaxComput(ODPS)上運行的,客戶端通過jar命令發起請求。
對比前麵的快速開始,可以看到除去數據準備階段,和MR相關的,有資源的上傳(add jar步驟)和jar命令啟動MR作業兩步。
1. 客戶端發起add jar/add file等資源操作,把在客戶端的機器(比如我測試的時候是從我的筆記本)上,運行任務涉及的資源文件傳到服務器上。這樣後麵運行任務的時候,服務器上才能有對應的代碼和文件可以用。如果以前已經傳過了,這一步可以省略。
2. jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out
這個命令發起作業。MapReduce的任務是運行在MaxCompute集群上的,客戶端需要通過這個命令把任務運行相關的信息告訴集群。

  1. 客戶端先解析-classpath參數,找到main方法相關的jar包的位置
  2. 根據com.aliyun.odps.mapred.open.example.WordCount,找到main方法所在類的路徑和名字
  3. wc_in wc_out是傳給main方法的參數,通過解析main方法傳入參數String[] args獲得這個參數
  4. -resources告訴服務器,在運行任務的時候,需要用到的資源有哪些。

JobConfig

JobConf定義了這個任務的細節,還是這個圖,解釋一下JobConf的其他設置項的用法。
alt
1. 輸入數據
InputUtils.addTable(TableInfo table, JobConf conf)設置了輸入的表。
setSplitSize(long size)通過調整分片大小來調整Mapper個數,單位 MB,默認640。Mapper個數不通過void setNumMapTasks(int n)設置。
setMemoryForJVM(int mem)設置 JVM虛擬機的內存資源,單位:MB,默認值 1024.
2. Map階段
setMapperClass(Class<? extends Mapper> theClass)設置Mapper使用的Java類。
setMapOutputKeySchema(Column[] schema)設置 Mapper 輸出到 Reducer 的 Key 行屬性。
setMapOutputValueSchema(Column[] schema)設置 Mapper 輸出到 Reducer 的 Value 行屬性。和上個設置一起定義了Mapper到Reducer的數據格式。
3. Shuffle-合並排序
setOutputKeySortColumns(String[] cols)設置 Mapper 輸出到 Reducer 的 Key 排序列。
setOutputKeySortOrder(JobConf.SortOrder[] order)設置 Key 排序列的順序。
setCombinerOptimizeEnable(boolean isCombineOpt)設置是否對Combiner進行優化。
setCombinerClass(Class<? extends Reducer> theClass)設置作業的 combiner。
4. Shuffle-分配Reduce
setNumReduceTasks(int n)設置 Reducer 任務數,默認為 Mapper 任務數的 1/4。如果是Map only的任務,需要設置成0。可以參考這裏
setPartitionColumns(String[] cols)設置作業的分區列,定義了數據分配到Reducer的分配策略。
5. Reduce階段
setOutputGroupingColumns(String[] cols)數據在Reducer裏排序好了後,是哪些數據進入到同一個reduce方法的,就是看這裏的設置。一般來說,設置的和setPartitionColumns(String[] cols)一樣。可以看到二次排序的用法。
setReducerClass(Class<? extends Reducer> theClass)設置Reducer使用的Java類。
6. 數據輸出
setOutputOverwrite(boolean isOverwrite)設置對輸出表是否進行覆蓋。類似SQL裏的Insert into/overwrite Talbe的區別。
OutputUtils.addTable(TableInfo table, JobConf conf)設置了輸出的表。多路輸入輸出可以參考這裏
7. 其他
void setResources(String resourceNames)有和jar命令的-resources一樣的功能,但是優先級高於-resources(也就是說代碼裏的設置優先級比較高)

最後通過JobClient.runJob(job);客戶端往服務器發起了這個MapReduce作業。
詳細的SDK的文檔,可以在Maven裏下載。這是下載地址

Map/Reduce

讀表

在一個Mapper裏,隻會讀一張表,不同的表的數據會在不同的Mapper worker上運行,所以可以用示例裏的這個方法先獲得這個Mapper讀的是什麼表。

資源表/文件

資源表和文件可以讓一些小表/小文件可以方便被讀取。鑒於讀取數據的限製需要小於64次,一般是在setup裏讀取後緩存起來,具體的例子可以參考這裏

生產及周期調度

任務提交

客戶端做的就是給服務器發起任務的調度的指令。之前提到的jar命令就是一種方法。鑒於實際上運行場景的多樣性,這裏介紹其他的幾種常見方法:

  • odpscmd -e/-f:odpscmd的-e命令可以在shell腳本裏直接運行一個odpscmd裏的命令,所以可以在shell腳本裏運行odpscmd -e 'jar -resources xxxxxx'這樣的命令,在shell腳本裏調用MapReduce作業。一個完整的例子是

    odpscmd  -u accessId  -p  accessKey  --project=testproject --endpoint=https://service.odps.aliyun.com/api  -e "jar -resources aaa.jar -classpath ./aaa.jar com.XXX.A"
    

    如果在odpscmd的配置文件裏已經配置好了,那隻需要寫-e的部分。
    -f和-e一樣,隻是把命令寫到文件裏,然後用odpscmd -f xxx.sql引用這個文件,那這個文件裏的多個指令都會被執行。

  • 大數據開發套件可以配置MapReduce作業。

  • 大數據開發套件可以配置Shell作業。可以在Shell作業裏參考上麵的方法用odpscmd -e/-f來調度MapReduce作業。

  • 在JAVA代碼裏直接調用MapReduce作業,可以通過設置SessionState.setLocalRun(false);實現,具體可以參考這裏

定時調度

大數據開發套件的定時任務/工作流可以配置調度周期和任務依賴,配合前麵提到的方法裏的MapReduce作業/Shell作業,實現任務的調度。

產品限製

安全沙箱

沙箱是MaxCompute的一套安全體係,使得在MaxCompute上運行的作業無法獲得其他用戶的信息,也無法獲得係統的一些信息。主要包括以下幾點,完整的列表可以參考文檔

  • 無法訪問外部數據源(不能當爬蟲,不能讀RDS等)
  • 無法起多線程/多進程
  • 不支持反射/自定義類加載器(所以不支持一些第三方包)
  • 不允許讀本地文件(比如JSON裏就用到了,就需要改用GSON)
  • 不允許JNI調用

其他限製

詳見MaxCompute MR 限製項匯總

最後更新:2017-05-21 19:31:20

  上一篇:go  基於MYSQL、R語言、SHINY的中型企業電子商務中心業務分析周報係統
  下一篇:go  docker 配置外網訪問