閱讀247 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Hadoop 使用Combiner提高Map/Reduce程序效率

      眾所周知,Hadoop框架使用Mapper將數據處理成一個<key,value>鍵值對,再網絡節點間對其進行整理(shuffle),然後使用Reducer處理數據並進行最終輸出。


      在上述過程中,我們看到至少兩個性能瓶頸:

  1. 如果我們有10億個數據,Mapper會生成10億個鍵值對在網絡間進行傳輸,但如果我們隻是對數據求最大值,那麼很明顯的Mapper隻需要輸出它所知道的最大值即可。這樣做不僅可以減輕網絡壓力,同樣也可以大幅度提高程序效率。
  2. 使用專利中的國家一項來闡述數據傾斜這個定義。這樣的數據遠遠不是一致性的或者說平衡分布的,由於大多數專利的國家都屬於美國,這樣不僅Mapper中的鍵值對、中間階段(shuffle)的鍵值對等,大多數的鍵值對最終會聚集於一個單一的Reducer之上,壓倒這個Reducer,從而大大降低程序的性能。

        

        Hadoop通過使用一個介於Mapper和Reducer之間的Combiner步驟來解決上述瓶頸。你可以將Combiner視為Reducer的一個幫手,它主要是為了削減Mapper的輸出從而減少網絡帶寬和Reducer之上的負載。如果我們定義一個Combiner,MapReducer框架會對中間數據多次地使用它進行處理。


        如果Reducer隻運行簡單的分布式方法,例如最大值、最小值、或者計數,那麼我們可以讓Reducer自己作為Combiner。但許多有用的方法不是分布式的。以下我們使用求平均值作為例子進行講解:


        Mapper輸出它所處理的鍵值對,為了使單個DataNode計算平均值Reducer會對它收到的<key,value>鍵值對進行排序,求和。


        由於Reducer將它所收到的<key,value>鍵值的數目視為輸入數據中的<key,value>鍵值對的數目,此時使用Combiner的主要障礙就是計數操作。我們可以重寫MapReduce程序來明確的跟蹤計數過程。

    

代碼如下:

[java] view plaincopy
  1. package com;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.DoubleWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  18. import org.apache.hadoop.util.Tool;  
  19. import org.apache.hadoop.util.ToolRunner;  
  20.   
  21. public class AveragingWithCombiner extends Configured implements Tool {  
  22.   
  23.     public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {  
  24.           
  25.         static enum ClaimsCounters { MISSING, QUOTED };  
  26.         // Map Method  
  27.         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  28.             String fields[] = value.toString().split(",", -20);  
  29.             String country = fields[4];  
  30.             String numClaims = fields[8];  
  31.               
  32.             if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {  
  33.                 context.write(new Text(country), new Text(numClaims + ",1"));  
  34.             }  
  35.         }  
  36.     }  
  37.       
  38.     public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> {  
  39.           
  40.         // Reduce Method  
  41.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  42.             double sum = 0;  
  43.             int count = 0;  
  44.             for (Text value : values) {  
  45.                 String fields[] = value.toString().split(",");  
  46.                 sum += Double.parseDouble(fields[0]);  
  47.                 count += Integer.parseInt(fields[1]);  
  48.             }  
  49.             context.write(key, new DoubleWritable(sum/count));  
  50.         }  
  51.     }  
  52.       
  53.     public static class Combine extends Reducer<Text,Text,Text,Text> {  
  54.           
  55.         // Reduce Method  
  56.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  57.             double sum = 0;  
  58.             int count = 0;  
  59.             for (Text value : values) {  
  60.                 String fields[] = value.toString().split(",");  
  61.                 sum += Double.parseDouble(fields[0]);  
  62.                 count += Integer.parseInt(fields[1]);  
  63.             }  
  64.             context.write(key, new Text(sum+","+count));  
  65.         }  
  66.     }  
  67.       
  68.     // run Method  
  69.     public int run(String[] args) throws Exception {  
  70.         // Create and Run the Job  
  71.         Job job = new Job();  
  72.         job.setJarByClass(AveragingWithCombiner.class);  
  73.           
  74.         FileInputFormat.addInputPath(job, new Path(args[0]));  
  75.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  76.           
  77.         job.setJobName("AveragingWithCombiner");  
  78.         job.setMapperClass(MapClass.class);  
  79.         job.setCombinerClass(Combine.class);  
  80.         job.setReducerClass(Reduce.class);  
  81.         job.setInputFormatClass(TextInputFormat.class);  
  82.         job.setOutputFormatClass(TextOutputFormat.class);  
  83.           
  84.         job.setOutputKeyClass(Text.class);  
  85.         job.setOutputValueClass(Text.class);  
  86.           
  87.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  88.         return 0;  
  89.     }  
  90.       
  91.     public static void main(String[] args) throws Exception {  
  92.         int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);  
  93.         System.exit(res);  
  94.     }  
  95.   
  96. }  

最後更新:2017-04-03 16:48:31

  上一篇:go 期待,那將來
  下一篇:go 第三章 AOP 通過Java API創建增強