閱讀169 返回首頁    go 人物


開源兼容MapReduce__概要_MapReduce_大數據計算服務-阿裏雲

MaxCompute(原ODPS)有一套原生的MapReduce編程模型和接口,簡單說來,這套接口的輸入輸出都是MaxCompute中的Table,處理的數據是以Record為組織形式的,它可以很好地描述Table中的數據處理過程,然而與社區的Hadoop相比,編程接口差異較大。Hadoop用戶如果要將原來的Hadoop MR作業遷移到MaxCompute的MR執行,需要重寫MR的代碼,使用MaxCompute的接口進行編譯和調試,運行正常後再打成一個Jar包才能放到MaxCompute的平台來運行。這個過程十分繁瑣,需要耗費很多的開發和測試人力。如果能夠完全不改或者少量地修改原來的Hadoop MapReduce代碼就能在MaxCompute平台上跑起來,將是一個比較理想的方式。

現在MaxCompute平台提供了一個Hadoop MapReduce到MaxCompute MR的適配工具,已經在一定程度上實現了Hadoop MapReduce作業的二進製級別的兼容,即用戶可以在不改代碼的情況下通過指定一些配置,就能將原來在Hadoop上運行的MapReduce jar包拿過來直接跑在MaxCompute上。用戶可通過這裏下載開發插件。目前該插件處於測試階段,暫時還不能支持用戶自定義comparator和自定義key類型,下麵將以WordCount程序為例,介紹一下這個插件的基本使用方式。

提示:

下載HadoopMR的插件

通過這裏下載插件,包名為hadoop2openmr-1.0.jar。

注意,這個jar裏麵已經包含hadoop-2.7.2版本的相關依賴,在作業的jar包中請不要攜帶hadoop的依賴,避免版本衝突。

準備好HadoopMR的程序jar包

編譯導出WordCount的jar包:wordcount_test.jar ,wordcount程序的源碼如下:

  1. package com.aliyun.odps.mapred.example.hadoop;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import java.io.IOException;
  12. import java.util.StringTokenizer;
  13. public class WordCount {
  14. public static class TokenizerMapper
  15. extends Mapper<Object, Text, Text, IntWritable>{
  16. private final static IntWritable one = new IntWritable(1);
  17. private Text word = new Text();
  18. public void map(Object key, Text value, Context context
  19. ) throws IOException, InterruptedException {
  20. StringTokenizer itr = new StringTokenizer(value.toString());
  21. while (itr.hasMoreTokens()) {
  22. word.set(itr.nextToken());
  23. context.write(word, one);
  24. }
  25. }
  26. }
  27. public static class IntSumReducer
  28. extends Reducer<Text,IntWritable,Text,IntWritable> {
  29. private IntWritable result = new IntWritable();
  30. public void reduce(Text key, Iterable<IntWritable> values,
  31. Context context
  32. ) throws IOException, InterruptedException {
  33. int sum = 0;
  34. for (IntWritable val : values) {
  35. sum += val.get();
  36. }
  37. result.set(sum);
  38. context.write(key, result);
  39. }
  40. }
  41. public static void main(String[] args) throws Exception {
  42. Configuration conf = new Configuration();
  43. Job job = Job.getInstance(conf, "word count");
  44. job.setJarByClass(WordCount.class);
  45. job.setMapperClass(TokenizerMapper.class);
  46. job.setCombinerClass(IntSumReducer.class);
  47. job.setReducerClass(IntSumReducer.class);
  48. job.setOutputKeyClass(Text.class);
  49. job.setOutputValueClass(IntWritable.class);
  50. FileInputFormat.addInputPath(job, new Path(args[0]));
  51. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  52. System.exit(job.waitForCompletion(true) ? 0 : 1);
  53. }
  54. }

測試數據準備

  • 創建輸入表和輸出表
    1. create table if not exists wc_in(line string);
    2. create table if not exists wc_out(key string, cnt bigint);
  • 通過tunnel將數據導入輸入表中

待導入文本文件data.txt的數據內容如下:

  1. hello maxcompute
  2. hello mapreduce

例如可以通過MaxCompute客戶端Tunnel命令將data.txt的數據導入wc_in中,

  1. tunnel upload data.txt wc_in;

準備好表與HDFS文件路徑的映射關係配置

配置文件命名為:wordcount-table-res.conf

  1. {
  2. "file:/foo": {
  3. "resolver": {
  4. "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",
  5. "properties": {
  6. "text.resolver.columns.combine.enable": "true",
  7. "text.resolver.seperator": "t"
  8. }
  9. },
  10. "tableInfos": [
  11. {
  12. "tblName": "wc_in",
  13. "partSpec": {},
  14. "label": "__default__"
  15. }
  16. ],
  17. "matchMode": "exact"
  18. },
  19. "file:/bar": {
  20. "resolver": {
  21. "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",
  22. "properties": {
  23. "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
  24. "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
  25. }
  26. },
  27. "tableInfos": [
  28. {
  29. "tblName": "wc_out",
  30. "partSpec": {},
  31. "label": "__default__"
  32. }
  33. ],
  34. "matchMode": "fuzzy"
  35. }
  36. }

配置項說明:

整個配置是一個json文件,描述HDFS上文件與maxcompute上表之間的映射關係,一般要配置輸入和輸出兩部分,一個HDFS路徑對應一個resolver配置,tableInfos配置以及matchMode配置。

  • resolver: 用於配置如何對待文件中的數據,目前有com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver兩個內置的resolver可以選用。除了指定好resolver的名字,還需要為相應的resolver配置一些properties指導它正確的進行數據解析。
    • TextFileResolver: 對於純文本的數據,輸入輸出都會當成純文本對待。當作為輸入resolver配置時,需要配置的properties有text.resolver.columns.combine.enable和text.resolver.seperator,當text.resolver.columns.combine.enable配置為true時,會把輸入表的所有列按找text.resolver.seperator指定的分隔符組合成一個字符串作為輸入。否則,會把輸入表的前兩列分別作為key,value。
    • BinaryFileResolver: 可以處理二進製的數據,自動將數據轉換為maxcompute可以支持的數據類型,如bigint, bool, double等。當作為輸出resolver配置時,需要配置的properties有binary.resolver.input.key.class和binary.resolver.input.value.class,分別代表中間結果的key和value類型。
  • tableInfos:用戶配置HDFS對應的maxcompute的表,目前隻支持配置表的名字tblName,而partSpec和label請保持和示例一致。
  • matchMode:路徑的匹配模式,可選項為exact和fuzzy,分別代表精確匹配和模煳匹配,如果設置為fuzzy,則可以通過正則來匹配HDFS的輸入路徑。

作業提交

使用MaxCompute命令行工具odpscmd提交作業。MaxCompute命令行工具的安裝和配置方法參考客戶端用戶手冊。在odpscmd下運行如下命令:

  1. jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar;

這裏假設我們已經將hadoop2openmr-1.0.jar和wordcount_test.jar以及wordcount-table-res.conf已經放置到odpscmd的當前目錄,否則在指定配置和-classpath的路徑時需要做相應的修改。

運行過程如下圖所示:

open source job run

當作業運行完成後,可以查看結果表wc_out裏麵的內容,驗證作業成功結束,結果符合預期。

open source result

最後更新:2016-10-18 11:46:39

  上一篇:go 擴展MapReduce__概要_MapReduce_大數據計算服務-阿裏雲
  下一篇:go 作業提交__功能介紹_MapReduce_大數據計算服務-阿裏雲