482
魔獸
MapReduce 開發手冊__Hadoop_開發人員指南_E-MapReduce-阿裏雲
在 MapReduce 中使用 OSS
要在 MapReduce 中讀寫 OSS,需要配置如下的參數
conf.set("fs.oss.accessKeyId", "${accessKeyId}");
conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
conf.set("fs.oss.endpoint","${endpoint}");
參數說明:
${accessKeyId}: 您賬號的 AccessKeyId。
${accessKeySecret}: 該 AccessKeyId 對應的密鑰。
${endpoint}: 訪問 OSS 使用的網絡,由您集群所在的 region 決定,當然對應的 OSS 也需要是在集群對應的 region。
具體的值請參考 OSS Endpoint
Word Count
以下示例介紹了如何從 OSS 中讀取文本,然後統計其中單詞的數量。其操作步驟如下:
程序編寫。以 JAVA 代碼為例,將 Hadoop 官網 WordCount 例子做如下修改。對該實例的修改隻是在代碼中添加了 Access Key ID 和 Access Key Secret 的配置,以便作業有權限訪問 OSS 文件。
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class EmrWordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
conf.set("fs.oss.accessKeyId", "${accessKeyId}");
conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
conf.set("fs.oss.endpoint","${endpoint}");
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(EmrWordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
編譯程序。首先要將 jdk 和 Hadoop 環境配置好,然後進行如下操作:
mkdir wordcount_classes
javac -classpath ${HADOOP_HOME}/share/hadoop/common/hadoop-common-2.6.0.jar:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/commons-cli-1.2.jar -d wordcount_classes EmrWordCount.java
jar cvf wordcount.jar -C wordcount_classes .
創建作業。
將上一步打好的 jar 文件上傳到 OSS,具體可登錄 OSS 官網進行操作。假設 jar 文件在 OSS 上的路徑為 oss://emr/jars/wordcount.jar, 輸入輸出路徑分別為 oss://emr/data/WordCount/Input 和 oss://emr/data/WordCount/Output。
在 E-MapReduce作業 中創建如下作業:
創建執行計劃。在 E-MapReduce 執行計劃中創建執行計劃,將上一步創建好的作業添加到執行計劃中,策略選擇“立即執行”,這樣 wordcount 作業就會在選定集群中運行起來了。
使用 Maven 工程來管理 MR 作業
當您的工程規模越來越大時,會變得非常複雜,不易管理。我們推薦你采用類似 Maven 這樣的軟件項目管理工具來進行管理。其操作步驟如下:
安裝 Maven。首先確保您已經安裝了 Maven。
生成工程框架。在您的工程根目錄處(假設您的工程開發根目錄位置是 D:/workspace)執行如下命令:
mvn archetype:generate -DgroupId=com.aliyun.emr.hadoop.examples -DartifactId=wordcountv2 -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
mvn 會自動生成一個空的 Sample 工程位於 D:/workspace/wordcountv2(和您指定的 artifactId 一致),裏麵包含一個簡單的 pom.xml 和 App 類(類的包路徑和您指定的 groupId 一致)。
加入 Hadoop 依賴。使用任意 IDE 打開這個工程,編輯 pom.xml 文件。在 dependencies 內添加如下內容:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
編寫代碼。在 com.aliyun.emr.hadoop.examples 包下和 App 類平行的位置添加新類 WordCount2.java。內容如下:
package com.aliyun.emr.hadoop.examples;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
public class WordCount2 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
static enum CountersEnum { INPUT_WORDS }
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive;
private Set<String> patternsToSkip = new HashSet<String>();
private Configuration conf;
private BufferedReader fis;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
if (conf.getBoolean("wordcount.skip.patterns", true)) {
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : patternsURIs) {
Path patternsPath = new Path(patternsURI.getPath());
String patternsFileName = patternsPath.getName().toString();
parseSkipFile(patternsFileName);
}
}
}
private void parseSkipFile(String fileName) {
try {
fis = new BufferedReader(new FileReader(fileName));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (IOException ioe) {
System.err.println("Caught exception while parsing the cached file '"
+ StringUtils.stringifyException(ioe));
}
}
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = (caseSensitive) ?
value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
Counter counter = context.getCounter(CountersEnum.class.getName(),
CountersEnum.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.oss.accessKeyId", "${accessKeyId}");
conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
conf.set("fs.oss.endpoint","${endpoint}");
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgs = new ArrayList<String>();
for (int i=0; i < remainingArgs.length; ++i) {
if ("-skip".equals(remainingArgs[i])) {
job.addCacheFile(new Path(EMapReduceOSSUtil.buildOSSCompleteUri(remainingArgs[++i], conf)).toUri());
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
} else {
otherArgs.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(0), conf)));
FileOutputFormat.setOutputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(1), conf)));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
其中的 EMapReduceOSSUtil 類代碼請參見如下示例,放在和 WordCount2 相同目錄:
package com.aliyun.emr.hadoop.examples;
import org.apache.hadoop.conf.Configuration;
public class EMapReduceOSSUtil {
private static String SCHEMA = "oss://";
private static String AKSEP = ":";
private static String BKTSEP = "@";
private static String EPSEP = ".";
private static String HTTP_HEADER = "https://";
/**
* complete OSS uri
* convert uri like: oss://bucket/path to oss://accessKeyId:accessKeySecret@bucket.endpoint/path
* ossref do not need this
*
* @param oriUri original OSS uri
*/
public static String buildOSSCompleteUri(String oriUri, String akId, String akSecret, String endpoint) {
if (akId == null) {
System.err.println("miss accessKeyId");
return oriUri;
}
if (akSecret == null) {
System.err.println("miss accessKeySecret");
return oriUri;
}
if (endpoint == null) {
System.err.println("miss endpoint");
return oriUri;
}
int index = oriUri.indexOf(SCHEMA);
if (index == -1 || index != 0) {
return oriUri;
}
int bucketIndex = index + SCHEMA.length();
int pathIndex = oriUri.indexOf("/", bucketIndex);
String bucket = null;
if (pathIndex == -1) {
bucket = oriUri.substring(bucketIndex);
} else {
bucket = oriUri.substring(bucketIndex, pathIndex);
}
StringBuilder retUri = new StringBuilder();
retUri.append(SCHEMA)
.append(akId)
.append(AKSEP)
.append(akSecret)
.append(BKTSEP)
.append(bucket)
.append(EPSEP)
.append(stripHttp(endpoint));
if (pathIndex > 0) {
retUri.append(oriUri.substring(pathIndex));
}
return retUri.toString();
}
public static String buildOSSCompleteUri(String oriUri, Configuration conf) {
return buildOSSCompleteUri(oriUri, conf.get("fs.oss.accessKeyId"), conf.get("fs.oss.accessKeySecret"), conf.get("fs.oss.endpoint"));
}
private static String stripHttp(String endpoint) {
if (endpoint.startsWith(HTTP_HEADER)) {
return endpoint.substring(HTTP_HEADER.length());
}
return endpoint;
}
}
編譯並打包上傳。在工程的目錄下,執行如下命令:
mvn clean package -DskipTests
您即可在工程目錄的 target 目錄下看到一個 wordcountv2-1.0-SNAPSHOT.jar,這個就是作業 jar 包了。請您將這個 jar 包上傳到 OSS 中。
創建作業。在 E-MapReduce 中新建一個作業,請使用類似如下的參數配置:
jar ossref://yourBucket/yourPath/wordcountv2-1.0-SNAPSHOT.jar com.aliyun.emr.hadoop.examples.WordCount2 -Dwordcount.case.sensitive=true oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txt oss://yourBucket/yourPath/output -skip oss://yourBucket/yourPath/patterns.txt
這裏的 yourBucket 是您的一個 OSS bucket,yourPath 是這個 bucket 上的一個路徑,需要您按照實際情況填寫。請您將
oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txt
和oss://yourBucket/yourPath/patterns.txt
這兩個用來處理相關資源的文件下載下來並放到您的 OSS 上。作業需要資源可以從下麵下載,然後放到您的 OSS 對應目錄下。創建執行計劃並運行。在 E-MapReduce 中創建執行計劃,關聯這個作業並運行。
最後更新:2016-11-23 16:03:59
上一篇:
SDK-Release__Spark_開發人員指南_E-MapReduce-阿裏雲
下一篇:
Hive 開發手冊__Hadoop_開發人員指南_E-MapReduce-阿裏雲
修改實例 VPC 屬性__實例相關接口_API 參考_雲服務器 ECS-阿裏雲
阿裏雲攜手德施曼等ICA聯盟企業聯合發布《中國智能鎖應用與發展白皮書》!
數據導入__快速入門_雲數據庫 Redis 版-阿裏雲
InstanceAttributesType__數據類型_API 參考_雲服務器 ECS-阿裏雲
CreateTable__API 概覽_API 參考_表格存儲-阿裏雲
查詢媒體信息作業__媒體信息接口_API使用手冊_媒體轉碼-阿裏雲
企業信息安全整體解決方案 阿裏雲棲大會,我們來了!
刪除伸縮組__伸縮組_用戶指南_彈性伸縮-阿裏雲
修改執行計劃__執行計劃_API參考_E-MapReduce-阿裏雲
ALIYUN::RDS::DBInstanceParameterGroup__資源列表_資源編排-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲