閱讀668 返回首頁    go 阿裏雲


MapReduce開發插件介紹__Eclipse開發插件_工具_大數據計算服務-阿裏雲

選擇ODPS項目中的WordCount示例:

右鍵”WordCount.java”,依次點擊”Run As”,”ODPS MapReduce”:

彈出對話框後,選擇”example_project”,點擊確認:

運行成功後,會出現以下結果提示:

運行自定義MapReduce程序

右鍵選擇src目錄,選擇新建(New) -> Mapper:

選擇Mapper後出現下麵的對話框。輸入Mapper類的名字,並確認:

會看到在左側包資源管理器(Package Explorer)中,src目錄下生成文件UserMapper.java。該文件的內容即是一個Mapper類的模板:

  1. package odps;
  2. import java.io.IOException;
  3. import com.aliyun.odps.data.Record;
  4. import com.aliyun.odps.mapred.MapperBase;
  5. public class UserMapper extends MapperBase {
  6. @Override
  7. public void setup(TaskContext context) throws IOException {
  8. }
  9. @Override
  10. public void map(long recordNum, Record record, TaskContext context)
  11. throws IOException {
  12. }
  13. @Override
  14. public void cleanup(TaskContext context) throws IOException {
  15. }
  16. }

模板中,將package名稱默認配置為”odps”,用戶可以根據自己的需求進行修改。編寫模板內容:

  1. package odps;
  2. import java.io.IOException;
  3. import com.aliyun.odps.counter.Counter;
  4. import com.aliyun.odps.data.Record;
  5. import com.aliyun.odps.mapred.MapperBase;
  6. public class UserMapper extends MapperBase {
  7. Record word;
  8. Record one;
  9. Counter gCnt;
  10. @Override
  11. public void setup(TaskContext context) throws IOException {
  12. word = context.createMapOutputKeyRecord();
  13. one = context.createMapOutputValueRecord();
  14. one.set(new Object[] { 1L });
  15. gCnt = context.getCounter("MyCounters", "global_counts");
  16. }
  17. @Override
  18. public void map(long recordNum, Record record, TaskContext context)
  19. throws IOException {
  20. for (int i = 0; i < record.getColumnCount(); i++) {
  21. String[] words = record.get(i).toString().split("\s+");
  22. for (String w : words) {
  23. word.set(new Object[] { w });
  24. Counter cnt = context.getCounter("MyCounters", "map_outputs");
  25. cnt.increment(1);
  26. gCnt.increment(1);
  27. context.write(word, one);
  28. }
  29. }
  30. }
  31. @Override
  32. public void cleanup(TaskContext context) throws IOException {
  33. }
  34. }

同理,右鍵選擇src目錄,選擇新建(New)->Reduce:

輸入Reduce類的名字(本示例使用UserReduce):

同樣在包資源管理器(Package Explorer)中,src目錄下生成文件UserReduce.java。該文件的內容即是一個Reduce類的模板。編輯模板:

  1. package odps;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.counter.Counter;
  5. import com.aliyun.odps.data.Record;
  6. import com.aliyun.odps.mapred.ReducerBase;
  7. public class UserReduce extends ReducerBase {
  8. private Record result;
  9. Counter gCnt;
  10. @Override
  11. public void setup(TaskContext context) throws IOException {
  12. result = context.createOutputRecord();
  13. gCnt = context.getCounter("MyCounters", "global_counts");
  14. }
  15. @Override
  16. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  17. throws IOException {
  18. long count = 0;
  19. while (values.hasNext()) {
  20. Record val = values.next();
  21. count += (Long) val.get(0);
  22. }
  23. result.set(0, key.get(0));
  24. result.set(1, count);
  25. Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
  26. cnt.increment(1);
  27. gCnt.increment(1);
  28. context.write(result);
  29. }
  30. @Override
  31. public void cleanup(TaskContext context) throws IOException {
  32. }
  33. }

創建main函數: 右鍵選擇src目錄,選擇新建(New) -> MapReduce Driver。填寫Driver Name(示例中是UserDriver),Mapper及Recduce類(示例中是UserMapper及UserReduce),並確認。同樣會在src目錄下看到MyDriver.java文件:

編輯driver內容:

  1. package odps;
  2. import com.aliyun.odps.OdpsException;
  3. import com.aliyun.odps.data.TableInfo;
  4. import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
  5. import com.aliyun.odps.examples.mr.WordCount.SumReducer;
  6. import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
  7. import com.aliyun.odps.mapred.JobClient;
  8. import com.aliyun.odps.mapred.RunningJob;
  9. import com.aliyun.odps.mapred.conf.JobConf;
  10. import com.aliyun.odps.mapred.utils.InputUtils;
  11. import com.aliyun.odps.mapred.utils.OutputUtils;
  12. import com.aliyun.odps.mapred.utils.SchemaUtils;
  13. public class UserDriver {
  14. public static void main(String[] args) throws OdpsException {
  15. JobConf job = new JobConf();
  16. job.setMapperClass(TokenizerMapper.class);
  17. job.setCombinerClass(SumCombiner.class);
  18. job.setReducerClass(SumReducer.class);
  19. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
  20. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
  21. InputUtils.addTable(
  22. TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
  23. InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
  24. OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
  25. RunningJob rj = JobClient.runJob(job);
  26. rj.waitForCompletion();
  27. }
  28. }

運行MapReduce程序,選中UserDriver.java,右鍵選擇Run As -> ODPS MapReduce,點擊確認。出現如下對話框:

選擇ODPS Project為:example_project,點擊Finish按鈕開始本地運行MapReduce程序:

有如上輸出信息,說明本地運行成功。運行的輸出結果在warehouse目錄下。關於warehouse的說明請參考 本地運行 。刷新ODPS工程:

wc_out即是輸出目錄,R_000000即是結果文件。通過本地調試,確定輸出結果正確後,可以通過Eclipse導出(Export)功能將MapReduce打包。打包後將jar包上傳到ODPS中。在分布式環境下執行MapReduce,詳情請參考 快速入門

本地調試通過後,用戶可以通過Eclipse的Export功能將代碼打成jar包,供後續分布式環境使用。在本示例中,我們將程序包命名為mr-examples.jar。選擇src目錄,點擊Export:

選擇導出模式為Jar File:

僅需要導出src目錄下package(com.aliyun.odps.mapred.open.example),Jar File名稱指定為”mr-examples.jar”:

確認後,導出成功。

如果用戶想在本地模擬新建Project,可以在warehouse下麵,創建一個新的子目錄(與example_project平級的目錄),目錄層次結構為:

  1. <warehouse>
  2. |____example_project(項目空間目錄)
  3. |____ <__tables__>
  4. | |__table_name1(非分區表)
  5. | | |____ data(文件)
  6. | | |
  7. | | |____ <__schema__> (文件)
  8. | |
  9. | |__table_name2(分區表)
  10. | |____ partition_name=partition_value(分區目錄)
  11. | | |____ data(文件)
  12. | |
  13. | |____ <__schema__> (文件)
  14. |
  15. |____ <__resources__>
  16. |
  17. |___table_resource_name (表資源)
  18. | |____<__ref__>
  19. |
  20. |___ file_resource_name(文件資源)

schema文件示例:

  1. 非分區表:
  2. project=project_name
  3. table=table_name
  4. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  5. 分區表:
  6. project=project_name
  7. table=table_name
  8. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  9. partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  10. 注:當前支持5種數據格式:bigint,double,boolean,datetime,string, 對應到java中的數據類型-long,double,boolean,java.util.Date,java.lang.String。

data文件示例:

  1. 1,1.1,true,2015-06-04 11:22:42 896,hello world
  2. N,N,N,N,N
  3. 注:時間格式精確到毫秒級別,所有類型用N表示null。

注解:

  • 本地模式運行MapReduce程序,默認情況下先到warehouse下查找相應的數據表或資源,如果表或資源不存在會到服務器上下載相應的數據存入warehouse目錄下,再以本地模式運行。
  • 運行完MapReduce後,請刷新warehouse目錄,才能看到生成的結果

最後更新:2016-11-23 17:16:04

  上一篇:go 創建ODPS工程__Eclipse開發插件_工具_大數據計算服務-阿裏雲
  下一篇:go UDF開發插件介紹__Eclipse開發插件_工具_大數據計算服務-阿裏雲