Hadoop 使用Combiner提高Map/Reduce程序效率
眾所周知,Hadoop框架使用Mapper將數據處理成一個<key,value>鍵值對,再網絡節點間對其進行整理(shuffle),然後使用Reducer處理數據並進行最終輸出。
在上述過程中,我們看到至少兩個性能瓶頸:
- 如果我們有10億個數據,Mapper會生成10億個鍵值對在網絡間進行傳輸,但如果我們隻是對數據求最大值,那麼很明顯的Mapper隻需要輸出它所知道的最大值即可。這樣做不僅可以減輕網絡壓力,同樣也可以大幅度提高程序效率。
- 使用專利中的國家一項來闡述數據傾斜這個定義。這樣的數據遠遠不是一致性的或者說平衡分布的,由於大多數專利的國家都屬於美國,這樣不僅Mapper中的鍵值對、中間階段(shuffle)的鍵值對等,大多數的鍵值對最終會聚集於一個單一的Reducer之上,壓倒這個Reducer,從而大大降低程序的性能。
Hadoop通過使用一個介於Mapper和Reducer之間的Combiner步驟來解決上述瓶頸。你可以將Combiner視為Reducer的一個幫手,它主要是為了削減Mapper的輸出從而減少網絡帶寬和Reducer之上的負載。如果我們定義一個Combiner,MapReducer框架會對中間數據多次地使用它進行處理。
如果Reducer隻運行簡單的分布式方法,例如最大值、最小值、或者計數,那麼我們可以讓Reducer自己作為Combiner。但許多有用的方法不是分布式的。以下我們使用求平均值作為例子進行講解:
Mapper輸出它所處理的鍵值對,為了使單個DataNode計算平均值Reducer會對它收到的<key,value>鍵值對進行排序,求和。
由於Reducer將它所收到的<key,value>鍵值的數目視為輸入數據中的<key,value>鍵值對的數目,此時使用Combiner的主要障礙就是計數操作。我們可以重寫MapReduce程序來明確的跟蹤計數過程。
代碼如下:
- package com;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.DoubleWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class AveragingWithCombiner extends Configured implements Tool {
- public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
- static enum ClaimsCounters { MISSING, QUOTED };
- // Map Method
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String fields[] = value.toString().split(",", -20);
- String country = fields[4];
- String numClaims = fields[8];
- if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {
- context.write(new Text(country), new Text(numClaims + ",1"));
- }
- }
- }
- public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> {
- // Reduce Method
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- double sum = 0;
- int count = 0;
- for (Text value : values) {
- String fields[] = value.toString().split(",");
- sum += Double.parseDouble(fields[0]);
- count += Integer.parseInt(fields[1]);
- }
- context.write(key, new DoubleWritable(sum/count));
- }
- }
- public static class Combine extends Reducer<Text,Text,Text,Text> {
- // Reduce Method
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- double sum = 0;
- int count = 0;
- for (Text value : values) {
- String fields[] = value.toString().split(",");
- sum += Double.parseDouble(fields[0]);
- count += Integer.parseInt(fields[1]);
- }
- context.write(key, new Text(sum+","+count));
- }
- }
- // run Method
- public int run(String[] args) throws Exception {
- // Create and Run the Job
- Job job = new Job();
- job.setJarByClass(AveragingWithCombiner.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setJobName("AveragingWithCombiner");
- job.setMapperClass(MapClass.class);
- job.setCombinerClass(Combine.class);
- job.setReducerClass(Reduce.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);
- System.exit(res);
- }
- }
最後更新:2017-04-03 16:48:31
上一篇:
期待,那將來
下一篇:
第三章 AOP 通過Java API創建增強
Android SDK 更新不成功的解決方法
Sphinx 2.2.3 安裝和配置,英文數字中文搜索
Ensemble learning: Boosting
poj 3062 Celebrity jeopardy
The Differences between AI, Machine Learning, and Deep Learning
Spring tool suite編譯不通過:Access restriction: The type XXX is not accessible
HybridDB · 最佳實踐 · 阿裏雲數據庫PetaData
《Netty官方文檔》本地傳輸接口
yield在WCF中的錯誤使用——99%的開發人員都有可能犯的錯誤[下篇]
文件監控(教學版)