閱讀482 返回首頁    go 魔獸


MapReduce 開發手冊__Hadoop_開發人員指南_E-MapReduce-阿裏雲

在 MapReduce 中使用 OSS

要在 MapReduce 中讀寫 OSS,需要配置如下的參數

  1. conf.set("fs.oss.accessKeyId", "${accessKeyId}");
  2. conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
  3. conf.set("fs.oss.endpoint","${endpoint}");

參數說明:

${accessKeyId}: 您賬號的 AccessKeyId。

${accessKeySecret}: 該 AccessKeyId 對應的密鑰。

${endpoint}: 訪問 OSS 使用的網絡,由您集群所在的 region 決定,當然對應的 OSS 也需要是在集群對應的 region。

具體的值請參考 OSS Endpoint

Word Count

以下示例介紹了如何從 OSS 中讀取文本,然後統計其中單詞的數量。其操作步驟如下:

  1. 程序編寫。以 JAVA 代碼為例,將 Hadoop 官網 WordCount 例子做如下修改。對該實例的修改隻是在代碼中添加了 Access Key ID 和 Access Key Secret 的配置,以便作業有權限訪問 OSS 文件。

    1. package org.apache.hadoop.examples;
    2. import java.io.IOException;
    3. import java.util.StringTokenizer;
    4. import org.apache.hadoop.conf.Configuration;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.IntWritable;
    7. import org.apache.hadoop.io.Text;
    8. import org.apache.hadoop.mapreduce.Job;
    9. import org.apache.hadoop.mapreduce.Mapper;
    10. import org.apache.hadoop.mapreduce.Reducer;
    11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13. import org.apache.hadoop.util.GenericOptionsParser;
    14. public class EmrWordCount {
    15. public static class TokenizerMapper
    16. extends Mapper<Object, Text, Text, IntWritable>{
    17. private final static IntWritable one = new IntWritable(1);
    18. private Text word = new Text();
    19. public void map(Object key, Text value, Context context
    20. ) throws IOException, InterruptedException {
    21. StringTokenizer itr = new StringTokenizer(value.toString());
    22. while (itr.hasMoreTokens()) {
    23. word.set(itr.nextToken());
    24. context.write(word, one);
    25. }
    26. }
    27. }
    28. public static class IntSumReducer
    29. extends Reducer<Text,IntWritable,Text,IntWritable> {
    30. private IntWritable result = new IntWritable();
    31. public void reduce(Text key, Iterable<IntWritable> values,
    32. Context context
    33. ) throws IOException, InterruptedException {
    34. int sum = 0;
    35. for (IntWritable val : values) {
    36. sum += val.get();
    37. }
    38. result.set(sum);
    39. context.write(key, result);
    40. }
    41. }
    42. public static void main(String[] args) throws Exception {
    43. Configuration conf = new Configuration();
    44. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    45. if (otherArgs.length < 2) {
    46. System.err.println("Usage: wordcount <in> [<in>...] <out>");
    47. System.exit(2);
    48. }
    49. conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    50. conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    51. conf.set("fs.oss.endpoint","${endpoint}");
    52. Job job = Job.getInstance(conf, "word count");
    53. job.setJarByClass(EmrWordCount.class);
    54. job.setMapperClass(TokenizerMapper.class);
    55. job.setCombinerClass(IntSumReducer.class);
    56. job.setReducerClass(IntSumReducer.class);
    57. job.setOutputKeyClass(Text.class);
    58. job.setOutputValueClass(IntWritable.class);
    59. for (int i = 0; i < otherArgs.length - 1; ++i) {
    60. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    61. }
    62. FileOutputFormat.setOutputPath(job,
    63. new Path(otherArgs[otherArgs.length - 1]));
    64. System.exit(job.waitForCompletion(true) ? 0 : 1);
    65. }
    66. }
  2. 編譯程序。首先要將 jdk 和 Hadoop 環境配置好,然後進行如下操作:

    1. mkdir wordcount_classes
    2. javac -classpath ${HADOOP_HOME}/share/hadoop/common/hadoop-common-2.6.0.jar:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/commons-cli-1.2.jar -d wordcount_classes EmrWordCount.java
    3. jar cvf wordcount.jar -C wordcount_classes .
  3. 創建作業。

    • 將上一步打好的 jar 文件上傳到 OSS,具體可登錄 OSS 官網進行操作。假設 jar 文件在 OSS 上的路徑為 oss://emr/jars/wordcount.jar, 輸入輸出路徑分別為 oss://emr/data/WordCount/Input 和 oss://emr/data/WordCount/Output。

    • E-MapReduce作業 中創建如下作業:

  4. 創建執行計劃。在 E-MapReduce 執行計劃中創建執行計劃,將上一步創建好的作業添加到執行計劃中,策略選擇“立即執行”,這樣 wordcount 作業就會在選定集群中運行起來了。

使用 Maven 工程來管理 MR 作業

當您的工程規模越來越大時,會變得非常複雜,不易管理。我們推薦你采用類似 Maven 這樣的軟件項目管理工具來進行管理。其操作步驟如下:

  1. 安裝 Maven。首先確保您已經安裝了 Maven

  2. 生成工程框架。在您的工程根目錄處(假設您的工程開發根目錄位置是 D:/workspace)執行如下命令:

    1. mvn archetype:generate -DgroupId=com.aliyun.emr.hadoop.examples -DartifactId=wordcountv2 -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

    mvn 會自動生成一個空的 Sample 工程位於 D:/workspace/wordcountv2(和您指定的 artifactId 一致),裏麵包含一個簡單的 pom.xml 和 App 類(類的包路徑和您指定的 groupId 一致)。

  3. 加入 Hadoop 依賴。使用任意 IDE 打開這個工程,編輯 pom.xml 文件。在 dependencies 內添加如下內容:

    1. <dependency>
    2. <groupId>org.apache.hadoop</groupId>
    3. <artifactId>hadoop-mapreduce-client-common</artifactId>
    4. <version>2.6.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.hadoop</groupId>
    8. <artifactId>hadoop-common</artifactId>
    9. <version>2.6.0</version>
    10. </dependency>
  4. 編寫代碼。在 com.aliyun.emr.hadoop.examples 包下和 App 類平行的位置添加新類 WordCount2.java。內容如下:

    1. package com.aliyun.emr.hadoop.examples;
    2. import java.io.BufferedReader;
    3. import java.io.FileReader;
    4. import java.io.IOException;
    5. import java.net.URI;
    6. import java.util.ArrayList;
    7. import java.util.HashSet;
    8. import java.util.List;
    9. import java.util.Set;
    10. import java.util.StringTokenizer;
    11. import org.apache.hadoop.conf.Configuration;
    12. import org.apache.hadoop.fs.Path;
    13. import org.apache.hadoop.io.IntWritable;
    14. import org.apache.hadoop.io.Text;
    15. import org.apache.hadoop.mapreduce.Job;
    16. import org.apache.hadoop.mapreduce.Mapper;
    17. import org.apache.hadoop.mapreduce.Reducer;
    18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    20. import org.apache.hadoop.mapreduce.Counter;
    21. import org.apache.hadoop.util.GenericOptionsParser;
    22. import org.apache.hadoop.util.StringUtils;
    23. public class WordCount2 {
    24. public static class TokenizerMapper
    25. extends Mapper<Object, Text, Text, IntWritable>{
    26. static enum CountersEnum { INPUT_WORDS }
    27. private final static IntWritable one = new IntWritable(1);
    28. private Text word = new Text();
    29. private boolean caseSensitive;
    30. private Set<String> patternsToSkip = new HashSet<String>();
    31. private Configuration conf;
    32. private BufferedReader fis;
    33. @Override
    34. public void setup(Context context) throws IOException,
    35. InterruptedException {
    36. conf = context.getConfiguration();
    37. caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
    38. if (conf.getBoolean("wordcount.skip.patterns", true)) {
    39. URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
    40. for (URI patternsURI : patternsURIs) {
    41. Path patternsPath = new Path(patternsURI.getPath());
    42. String patternsFileName = patternsPath.getName().toString();
    43. parseSkipFile(patternsFileName);
    44. }
    45. }
    46. }
    47. private void parseSkipFile(String fileName) {
    48. try {
    49. fis = new BufferedReader(new FileReader(fileName));
    50. String pattern = null;
    51. while ((pattern = fis.readLine()) != null) {
    52. patternsToSkip.add(pattern);
    53. }
    54. } catch (IOException ioe) {
    55. System.err.println("Caught exception while parsing the cached file '"
    56. + StringUtils.stringifyException(ioe));
    57. }
    58. }
    59. @Override
    60. public void map(Object key, Text value, Context context
    61. ) throws IOException, InterruptedException {
    62. String line = (caseSensitive) ?
    63. value.toString() : value.toString().toLowerCase();
    64. for (String pattern : patternsToSkip) {
    65. line = line.replaceAll(pattern, "");
    66. }
    67. StringTokenizer itr = new StringTokenizer(line);
    68. while (itr.hasMoreTokens()) {
    69. word.set(itr.nextToken());
    70. context.write(word, one);
    71. Counter counter = context.getCounter(CountersEnum.class.getName(),
    72. CountersEnum.INPUT_WORDS.toString());
    73. counter.increment(1);
    74. }
    75. }
    76. }
    77. public static class IntSumReducer
    78. extends Reducer<Text,IntWritable,Text,IntWritable> {
    79. private IntWritable result = new IntWritable();
    80. public void reduce(Text key, Iterable<IntWritable> values,
    81. Context context
    82. ) throws IOException, InterruptedException {
    83. int sum = 0;
    84. for (IntWritable val : values) {
    85. sum += val.get();
    86. }
    87. result.set(sum);
    88. context.write(key, result);
    89. }
    90. }
    91. public static void main(String[] args) throws Exception {
    92. Configuration conf = new Configuration();
    93. conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    94. conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    95. conf.set("fs.oss.endpoint","${endpoint}");
    96. GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    97. String[] remainingArgs = optionParser.getRemainingArgs();
    98. if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
    99. System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
    100. System.exit(2);
    101. }
    102. Job job = Job.getInstance(conf, "word count");
    103. job.setJarByClass(WordCount2.class);
    104. job.setMapperClass(TokenizerMapper.class);
    105. job.setCombinerClass(IntSumReducer.class);
    106. job.setReducerClass(IntSumReducer.class);
    107. job.setOutputKeyClass(Text.class);
    108. job.setOutputValueClass(IntWritable.class);
    109. List<String> otherArgs = new ArrayList<String>();
    110. for (int i=0; i < remainingArgs.length; ++i) {
    111. if ("-skip".equals(remainingArgs[i])) {
    112. job.addCacheFile(new Path(EMapReduceOSSUtil.buildOSSCompleteUri(remainingArgs[++i], conf)).toUri());
    113. job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
    114. } else {
    115. otherArgs.add(remainingArgs[i]);
    116. }
    117. }
    118. FileInputFormat.addInputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(0), conf)));
    119. FileOutputFormat.setOutputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(1), conf)));
    120. System.exit(job.waitForCompletion(true) ? 0 : 1);
    121. }
    122. }

    其中的 EMapReduceOSSUtil 類代碼請參見如下示例,放在和 WordCount2 相同目錄:

    1. package com.aliyun.emr.hadoop.examples;
    2. import org.apache.hadoop.conf.Configuration;
    3. public class EMapReduceOSSUtil {
    4. private static String SCHEMA = "oss://";
    5. private static String AKSEP = ":";
    6. private static String BKTSEP = "@";
    7. private static String EPSEP = ".";
    8. private static String HTTP_HEADER = "https://";
    9. /**
    10. * complete OSS uri
    11. * convert uri like: oss://bucket/path to oss://accessKeyId:accessKeySecret@bucket.endpoint/path
    12. * ossref do not need this
    13. *
    14. * @param oriUri original OSS uri
    15. */
    16. public static String buildOSSCompleteUri(String oriUri, String akId, String akSecret, String endpoint) {
    17. if (akId == null) {
    18. System.err.println("miss accessKeyId");
    19. return oriUri;
    20. }
    21. if (akSecret == null) {
    22. System.err.println("miss accessKeySecret");
    23. return oriUri;
    24. }
    25. if (endpoint == null) {
    26. System.err.println("miss endpoint");
    27. return oriUri;
    28. }
    29. int index = oriUri.indexOf(SCHEMA);
    30. if (index == -1 || index != 0) {
    31. return oriUri;
    32. }
    33. int bucketIndex = index + SCHEMA.length();
    34. int pathIndex = oriUri.indexOf("/", bucketIndex);
    35. String bucket = null;
    36. if (pathIndex == -1) {
    37. bucket = oriUri.substring(bucketIndex);
    38. } else {
    39. bucket = oriUri.substring(bucketIndex, pathIndex);
    40. }
    41. StringBuilder retUri = new StringBuilder();
    42. retUri.append(SCHEMA)
    43. .append(akId)
    44. .append(AKSEP)
    45. .append(akSecret)
    46. .append(BKTSEP)
    47. .append(bucket)
    48. .append(EPSEP)
    49. .append(stripHttp(endpoint));
    50. if (pathIndex > 0) {
    51. retUri.append(oriUri.substring(pathIndex));
    52. }
    53. return retUri.toString();
    54. }
    55. public static String buildOSSCompleteUri(String oriUri, Configuration conf) {
    56. return buildOSSCompleteUri(oriUri, conf.get("fs.oss.accessKeyId"), conf.get("fs.oss.accessKeySecret"), conf.get("fs.oss.endpoint"));
    57. }
    58. private static String stripHttp(String endpoint) {
    59. if (endpoint.startsWith(HTTP_HEADER)) {
    60. return endpoint.substring(HTTP_HEADER.length());
    61. }
    62. return endpoint;
    63. }
    64. }
  5. 編譯並打包上傳。在工程的目錄下,執行如下命令:

    1. mvn clean package -DskipTests

    您即可在工程目錄的 target 目錄下看到一個 wordcountv2-1.0-SNAPSHOT.jar,這個就是作業 jar 包了。請您將這個 jar 包上傳到 OSS 中。

  6. 創建作業。在 E-MapReduce 中新建一個作業,請使用類似如下的參數配置:

    1. jar ossref://yourBucket/yourPath/wordcountv2-1.0-SNAPSHOT.jar com.aliyun.emr.hadoop.examples.WordCount2 -Dwordcount.case.sensitive=true oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txt oss://yourBucket/yourPath/output -skip oss://yourBucket/yourPath/patterns.txt

    這裏的 yourBucket 是您的一個 OSS bucket,yourPath 是這個 bucket 上的一個路徑,需要您按照實際情況填寫。請您將oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txtoss://yourBucket/yourPath/patterns.txt這兩個用來處理相關資源的文件下載下來並放到您的 OSS 上。作業需要資源可以從下麵下載,然後放到您的 OSS 對應目錄下。

    資源下載:The_Sorrows_of_Young_Werther.txtpatterns.txt

  7. 創建執行計劃並運行。在 E-MapReduce 中創建執行計劃,關聯這個作業並運行。

最後更新:2016-11-23 16:03:59

  上一篇:go SDK-Release__Spark_開發人員指南_E-MapReduce-阿裏雲
  下一篇:go Hive 開發手冊__Hadoop_開發人員指南_E-MapReduce-阿裏雲