《Hadoop與大數據挖掘》一2.4.2 MapReduce原理
本節書摘來華章計算機《Hadoop與大數據挖掘》一書中的第2章 ,第2.4.2節,張良均 樊 哲 位文超 劉名軍 許國傑 周 龍 焦正升 著 更多章節內容可以訪問雲棲社區“華章計算機”公眾號查看。
2.4.2 MapReduce原理
1.通俗理解MapReduce原理
現在你接到一個任務,給你10本長篇英文小說,讓你統計這10本書中每一個單詞出現的次數。這便是Hadoop編程中赫赫有名的HelloWorld程序:詞頻統計。這個任務的結果形式如表2-6所示。
即在這10本書中a共出現了12300次,ai共出現了63次……依次計算出每一個單詞出現多少次。天啊,這個工作必須由專業人士做呀,自己做的話還不累死呀。這時你可以把這個工作外包給一支職業分布式運算工程隊做。
分布式運算工程隊中按崗位有Mapper、Mapper助理Comb-iner、Mapper助理InputFormat、Mapper助理Patitioner、運輸負責Shuffle、Reducer、Reducer助理Sort、Reducer助理OutputFormat。除了Combiner是非必需人員外,其他崗位都是必需的。下麵描述一下這個工程隊是怎麼做這項工作的。
首先把這10本書分別分到10個Mapper手中。Mapper助理InputFormat負責從書中讀取記錄,Mapper負責記錄怎麼解析重新組織成新的格式。然後Mapper把自己的處理結果排好序後放到書旁邊,等待Shuffle取走結果。Shuffle把取到的結果送給Reducer助理Sort,由Sort負責把所有Mapper的結果排好序,然後送給Reducer來進行匯總,以得到最終的結果;最後,由Reducer助理Outputformat記錄到規定位置並存檔。
下麵說明什麼時候需要Combiner。Maper助理InputForormat從書中一行行讀取記錄,給到Mapper,Mapper從Inputformat的記錄中解析出一個個單詞,並進行記錄。Mapper處理的結果形如“a出現了一次,a出現了一次,ai出現了一次……zhe出現了一次”。工作一段時間後發現負責搬運工作的Suffle有點吃不消,這時就用到Mapper助理Combiner了。由Combiner對的輸出結果進行短暫的匯總,把Mapper的結果處理成形如“書本一中單詞a共出現1500次,ai出現了14次,are出現了80次……”這樣Shuffle的壓力頓時減輕了許多。
對於每個崗位工程隊都是有默認時限的。但如果默認時限不能滿足需求,也可以對工作量進行自定義。
上麵的過程描述了一個MapReduce工程隊是如何進行配合工作的。這個過程與MapReduce分布式運算是基本對應的。理解了上麵的過程也就大概理解了Hadoop的Map-Reduce過程了。
2. MapReduce過程解析
MapReduce過程可以解析為如下所示:
1)文件在HDFS上被分塊存儲,DataNode存儲實際的塊。
2)在Map階段,針對每個文件塊建立一個map任務,map任務直接運行在DataNode上,即移動計算,而非數據,如圖2-30所示。
3)每個map任務處理自己的文件塊,然後輸出新的鍵值對,如圖2-31所示。
4)Map輸出的鍵值對經過shuffle/sort階段後,相同key的記錄會被輸送到同一個reducer中,同時鍵是排序的,值被放入一個列表中,如圖2-32所示。
5)每個reducer處理從map輸送過來的鍵值對,然後輸出新的鍵值對,一般輸出到HDFS上。
3.單詞計數源碼解析
上麵的分析都是建立在理論基礎上的,這樣的分析有利於編寫MapReduce程序。但是如果要實際編寫一個MapReduce的簡單程序,還是不夠的,需要具體看示例代碼。這裏直接以官網提供的example代碼中的WordCount程序作為示例,進行代碼級別分析和說明。
首先,在Hadoop的發行版中找到對應的代碼。在解壓下載的Hadoop2.6.0的發行版目錄中,找到hadoop-2.6.0\share\hadoop\mapreduce\sources目錄,該目錄下麵有一個hadoop-mapreduce-examples-2.6.0-sources.jar文件,使用壓縮文件解壓縮該文件,在目錄org/apache/Hadoop/examples中即可找到WordCount.java文件,如圖2-33所示。
找到該文件後,使用文本軟件打開,或拷貝到Eclipse工程中查看,如代碼清單2-23所示。
代碼清單2-23 WordCount.java代碼
package org.apache.hadoop.examples;
/**
省略代碼
*/
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2){
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
下麵對該代碼進行分析。
(1)應用程序Driver分析
這裏的Driver程序主要指的是main函數,在main函數裏麵設置MapReduce程序的一些初始化設置,並提交任務等待程序運行完成,如代碼清單2-24所示。
代碼清單2-24 WordCount main 函數代碼
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2){
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
下麵,針對WordCount main函數代碼進行分析說明。
1)第1部分Configuration代碼,初始化相關Hadoop配置。在2.4.1節中也看到過,這裏直接新建一個實例即可。如果是在實際的應用程序中,可以通過conf.set()函數添加必要參數,即可直接運行。
2)第2部分代碼新建Job,並設置主類。這裏的Job實例需要把Configuration的實例傳入,後麵的“word count”是該MapReduce任務的任務名(注意這裏的方式使用的還是不推薦的MRV1的版本,推薦使用MRV2的版本)。
3)第3部分代碼設置Mapper、Reducer、Combiner,這裏的設置代碼都是固定寫法,裏麵的類名可以改變,一般情況下裏麵的類名為實際任務Mapper、Reducer、Combiner。
4)第4部分代碼設置輸出鍵值對格式。在MapReduce任務中涉及三個鍵值對格式:Mapper輸入鍵值對格式,Mapper輸出鍵值對格式,Reducer輸入鍵值對格式,Reducer輸出鍵值對格式。當Mapper輸出鍵值對格式和Reducer輸出鍵值對格式一樣的時候,可以隻設置輸出鍵值對的格式(這個其實就是Reducer輸出的鍵值對格式),否則需要設置“job.setMapOutputKeyClass(Text.class); job.setMapOutputKeyClass(IntWritable.class);”。
5)第5、第6部分代碼設置輸入、輸出路徑,其實還有輸入、輸出文件格式的設置,隻是這裏沒有設置,如果不是默認格式,那麼還是需要設置的。
6)最後部分代碼是提交MapReduce任務運行(是固定寫法),並等待任務運行結束。
綜合上麵的描述,這裏給出MapReduce任務初始化以及提交運行的一般代碼,如代碼清單2-25所示。
代碼清單2-25 MapReduce通用Driver代碼
Configuration conf = new Configuration();
Job job =Job.getInstance(conf);
job.setMapperClass(AverageMapper.class);
job.setReducerClass(AverageReducer.class);
job.setCombinerClass(Reducer.class);
job.setMapOutputKeyClass(Writable.class);
job.setMapOutputValueClass(Writable.class);
job.setOutputKeyClass(Writable.class);
job.setOutputValueClass(Writable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
在實際應用程序中,一般是直接從應用程序提交任務到Hadoop集群的,而非使用yarn jar的方式提交jar包來運行算法的。這裏給出通用的提交應用程序到Hadoop集群的代碼作為參考,不過在此之前需要簡要分析下Configuration這個類。
Configuration是Hadoop係統的基礎公共類,可以通過這個類的API加載配置信息,同時在初始化這個類的實例的時候也可以設置Hadoop集群的配置,從而直接針對某個Hadoop集群提交任務,其API如圖2-34所示。
Configuration各種set API中用得比較多的還是第1個,通用的提交應用程序到Hadoop集群的代碼也是使用的第1個,見代碼清單2-26。
代碼清單2-26 通用提交應用程序到Hadoop集群代碼
Configuration configuration = new Configuration();
configuration.setBoolean("mapreduce.app-submission.cross-platform", true);
// 配置使用跨平台提交任務
configuration.set("fs.defaultFS", "hdfs://node1:8020"); // 指定namenode
configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
configuration.set("yarn.resourcemanager.address", "node1:8032");
// 指定resourcemanager
configuration.set("yarn.resourcemanager.scheduler.address", "node1:8030");
// 指定資源分配器
configuration.set("mapreduce.jobhistory.address", "node2:10020");
// 指定historyserver
configuration.set("mapreduce.job.jar","C:\\Users\\fansy\\Desktop\\jars\\
import2hbase.jar");//設置包含Mapper、Reducer的jar包路徑
上麵的值需要根據實際的Hadoop集群對應配置進行修改。
同時,通過Configuration的set方法也可以實現在Mapper和Reducer任務之間信息共享。比如在Driver中設置一個參數number,在Mapper或Reducer中取出該參數,如代碼清單2-27所示(注意,在MapReduce程序中是不能通過全局static變量獲取值的,這點需要特別注意)。
代碼清單2-27 通過Configuration在Driver和Mapper/Reducer傳遞參數
// 在Driver中設置參數值
Configuration conf = new Configuration();
conf.setInt(“number”,10);
// 在Reducer中取出參數值
public class MyReducer extends Reducer<K2,V2,K3,V3>{
public void setup(Context context){
int number = context.getConfiguration().getInt(“number”);
}
}
(2)Mapper分析
對於用戶來說,其實比較關心的是Mapper的map函數以及Reducer的reduce函數,這裏先分析Mapper的map函數,如代碼清單2-28所示。
代碼清單2-28 WordCount Mapper代碼
public static class TokenizerMapper extends Mapper<Object, Text, Text, Int-Writable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
1)自定義Mapper需要繼承Mapper,同時需要設置輸入輸出鍵值對格式,其中輸入鍵值對格式是與輸入格式設置的類讀取生成的鍵值對格式匹配,而輸出鍵值對格式需要與Driver中設置的Mapper輸出的鍵值對格式匹配。
2)Mapper有3個函數,分別是setup、map、cleanup,其中實現setup、cleanup函數不是必須要求,Mapper任務啟動後首先執行setup函數,該函數主要用於初始化工作;針對每個鍵值對會執行一次map函數,所有鍵值對處理完成後會調用cleanup函數,主要用於關閉資源等操作。
3)實現的map函數就是與實際業務邏輯掛鉤的代碼,主要由用戶編寫,這裏是單詞計數程序,所以這裏的邏輯是把每個鍵值對(鍵值對組成為:<行的偏移量,行字符串>)的值(也就是行字符串)按照空格進行分割,得到每個單詞,然後輸出每個單詞和1這樣的鍵值對。
(3)Reducer分析
Reducer針對Mapper的輸出進行整合,同時輸入給Reducer的是鍵值對組,所以其實Reducer中的reduce函數就是針對每個鍵的所有匯總值的處理。Reducer代碼如代碼清單2-29所示。
代碼清單2-29 WordCount Reducer代碼
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
1)自定義Reducer同樣需要繼承Reducer,與Mapper相同,需要設置輸入輸出鍵值對格式,這裏的輸入鍵值對格式需要與Mapper的輸出鍵值對格式保持一致,輸出鍵值對格式需要與Driver中設置的輸出鍵值對格式保持一致。
2)Reducer也有3個函數:setup、cleanup、reduce,其中setup、cleanup函數其實和Mapper的同名函數功能一致,並且也是setup函數在最開始執行一次,而cleanup函數在最後執行一次。
3)用戶一般比較關心reduce函數的實現,這個函數裏麵寫的就是與業務相關的處理邏輯了,比如,這裏單詞計數,就針對相同鍵,把其值的列表全部加起來進行輸出。
最後更新:2017-06-26 10:03:07