閱讀921 返回首頁    go 阿裏雲


多任務示例__示例程序_MapReduce_大數據計算服務-阿裏雲

(1)準備好測試程序jar包,假設名字為mapreduce-examples.jar;

(2)準備好MultiJobs測試表和資源;

  • 創建表

    create table mr_empty (key string, value string);
    create table mr_multijobs_out (value bigint);
    
  • 添加資源

    add table mr_multijobs_out as multijobs_res_table -f;
    add jar mapreduce-examples.jar -f;
    

測試步驟

在odpscmd中執行MultiJobs

 jar -resources mapreduce-examples.jar,multijobs_res_table -classpath mapreduce-examples.jar 
          com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;

預期結果

作業成功結束。 輸出表mr_multijobs_out中內容為:

+------------+
| value      |
+------------+
| 0          |
+------------+

代碼示例

    package com.aliyun.odps.mapred.open.example;

    import java.io.IOException;
    import java.util.Iterator;


    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.data.TableInfo;
    import com.aliyun.odps.mapred.JobClient;
    import com.aliyun.odps.mapred.MapperBase;
    import com.aliyun.odps.mapred.RunningJob;
    import com.aliyun.odps.mapred.TaskContext;
    import com.aliyun.odps.mapred.conf.JobConf;
    import com.aliyun.odps.mapred.utils.InputUtils;
    import com.aliyun.odps.mapred.utils.OutputUtils;
    import com.aliyun.odps.mapred.utils.SchemaUtils;

    /**
     * MultiJobs
     *
     * Running multiple job
     *
     **/
    public class MultiJobs {

      public static class InitMapper extends MapperBase {

        @Override
        public void setup(TaskContext context) throws IOException {
          Record record = context.createOutputRecord();
          long v = context.getJobConf().getLong("multijobs.value", 2);
          record.set(0, v);
          context.write(record);
        }
      }

      public static class DecreaseMapper extends MapperBase {

        @Override
        public void cleanup(TaskContext context) throws IOException {
          //從JobConf中獲取main函數中定義的變量值
          long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
          long v = -1;
          int count = 0;

          Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
          while (iter.hasNext()) {
            Record r = iter.next();
            v = (Long) r.get(0);
            if (expect != v) {
              throw new IOException("expect: " + expect + ", but: " + v);
            }
            count++;
          }

          if (count != 1) {
            throw new IOException("res_table should have 1 record, but: " + count);
          }

          Record record = context.createOutputRecord();
          v--;
          record.set(0, v);
          context.write(record);
          context.getCounter("multijobs", "value").setValue(v);
        }
      }

      public static void main(String[] args) throws Exception {
        if (args.length != 1) {
          System.err.println("Usage: TestMultiJobs <table>");
          System.exit(1);
        }
        String tbl = args[0];
        long iterCount = 2;

        System.err.println("Start to run init job.");

        JobConf initJob = new JobConf();

        initJob.setLong("multijobs.value", iterCount);
        initJob.setMapperClass(InitMapper.class);

        InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
        OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);

        initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
        initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));

        initJob.setNumReduceTasks(0);

        JobClient.runJob(initJob);

        while (true) {
          System.err.println("Start to run iter job, count: " + iterCount);

          JobConf decJob = new JobConf();

          decJob.setLong("multijobs.expect.value", iterCount);
          decJob.setMapperClass(DecreaseMapper.class);

          InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
          OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);

          decJob.setNumReduceTasks(0);

          RunningJob rJob = JobClient.runJob(decJob);

          iterCount--;

          if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
            break;
          }
        }

        if (iterCount != 0) {
          throw new IOException("Job failed.");
        }
      }
    }

最後更新:2016-11-24 11:23:47

  上一篇:go 多路輸入輸出示例__示例程序_MapReduce_大數據計算服務-阿裏雲
  下一篇:go 二次排序示例__示例程序_MapReduce_大數據計算服務-阿裏雲