閱讀625 返回首頁    go 團貸網


分區表輸入示例__示例程序_MapReduce_大數據計算服務-阿裏雲

代碼示例1,一段簡單的將Partition作為輸入輸出的例子,僅供參考:

     public static void main(String[] args) throws Exception {

     JobConf job = new JobConf();

     ...

     LinkedHashMap<String, String> input = new LinkedHashMap<String, String>();
     input.put("pt", "123456");
     InputUtils.addTable(TableInfo.builder().tableName("input_table").partSpec(input).build(), job);

     LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
     output.put("ds", "654321");
     OutputUtils.addTable(TableInfo.builder().tableName("output_table").partSpec(output).build(), job);

     JobClient.runJob(job);
    }

代碼示例2,僅供參考:

    package com.aliyun.odps.mapred.open.example;

    ...
      public static void main(String[] args) throws Exception {
        if (args.length != 2) {
          System.err.println("Usage: WordCount <in_table> <out_table>");
          System.exit(2);
        }

        JobConf job = new JobConf();

        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(SumCombiner.class);
        job.setReducerClass(SumReducer.class);

        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));


        Account account = new AliyunAccount("my_access_id", "my_access_key");
        Odps odps = new Odps(account);
        odps.setEndpoint("odps_endpoint_url");
        odps.setDefaultProject("my_project");

        Table table = odps.tables().get(tblname);
        TableInfoBuilder builder = TableInfo.builder().tableName(tblname);

        for (Partition p : table.getPartitions()) {
          if (applicable(p)) {
            LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
            for (String key : p.getPartitionSpec().keys()) {
              partSpec.put(key, p.getPartitionSpec().get(key));
            }
            InputUtils.addTable(builder.partSpec(partSpec).build(), conf);
          }
        }

        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);

        JobClient.runJob(job);
      }

備注:

  • 這是一段使用ODPS SDK和MapReduce SDK組合實現MapReduce任務讀取範圍Partitoin的示例。此段代碼不能夠編譯執行, 僅給出了main函數的示例。示例中applicable函數是用戶邏輯,用來決定該Partition是否符合作為該MapReduce作業的輸入。

最後更新:2016-05-06 10:43:08

  上一篇:go sort示例__示例程序_MapReduce_大數據計算服務-阿裏雲
  下一篇:go pipeline示例__示例程序_MapReduce_大數據計算服務-阿裏雲