多任务示例__示例程序_MapReduce_大数据计算服务-阿里云
(1)准备好测试程序jar包,假设名字为mapreduce-examples.jar;
(2)准备好MultiJobs测试表和资源;
创建表
create table mr_empty (key string, value string); create table mr_multijobs_out (value bigint);
添加资源
add table mr_multijobs_out as multijobs_res_table -f; add jar mapreduce-examples.jar -f;
测试步骤
在odpscmd中执行MultiJobs
jar -resources mapreduce-examples.jar,multijobs_res_table -classpath mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;
预期结果
作业成功结束。 输出表mr_multijobs_out中内容为:
+------------+
| value |
+------------+
| 0 |
+------------+
代码示例
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* MultiJobs
*
* Running multiple job
*
**/
public class MultiJobs {
public static class InitMapper extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
Record record = context.createOutputRecord();
long v = context.getJobConf().getLong("multijobs.value", 2);
record.set(0, v);
context.write(record);
}
}
public static class DecreaseMapper extends MapperBase {
@Override
public void cleanup(TaskContext context) throws IOException {
//从JobConf中获取main函数中定义的变量值
long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
long v = -1;
int count = 0;
Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
while (iter.hasNext()) {
Record r = iter.next();
v = (Long) r.get(0);
if (expect != v) {
throw new IOException("expect: " + expect + ", but: " + v);
}
count++;
}
if (count != 1) {
throw new IOException("res_table should have 1 record, but: " + count);
}
Record record = context.createOutputRecord();
v--;
record.set(0, v);
context.write(record);
context.getCounter("multijobs", "value").setValue(v);
}
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: TestMultiJobs <table>");
System.exit(1);
}
String tbl = args[0];
long iterCount = 2;
System.err.println("Start to run init job.");
JobConf initJob = new JobConf();
initJob.setLong("multijobs.value", iterCount);
initJob.setMapperClass(InitMapper.class);
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
initJob.setNumReduceTasks(0);
JobClient.runJob(initJob);
while (true) {
System.err.println("Start to run iter job, count: " + iterCount);
JobConf decJob = new JobConf();
decJob.setLong("multijobs.expect.value", iterCount);
decJob.setMapperClass(DecreaseMapper.class);
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
decJob.setNumReduceTasks(0);
RunningJob rJob = JobClient.runJob(decJob);
iterCount--;
if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
break;
}
}
if (iterCount != 0) {
throw new IOException("Job failed.");
}
}
}
最后更新:2016-11-24 11:23:47
上一篇:
多路输入输出示例__示例程序_MapReduce_大数据计算服务-阿里云
下一篇:
二次排序示例__示例程序_MapReduce_大数据计算服务-阿里云
GetBucketInfo__关于Bucket的操作_API 参考_对象存储 OSS-阿里云
处理数据__规则引擎_控制台使用手册_阿里云物联网套件-阿里云
云服务器 ECS Linux SSH 连接交互过程简介__远程登录 (SSH)_Linux操作运维问题_云服务器 ECS-阿里云
查询执行计划详情__执行计划_API参考_E-MapReduce-阿里云
查询网络带宽__资源监控接口_API 手册_CDN-阿里云
储值卡使用规则介绍___推荐码_代金券及推荐码_财务-阿里云
nginx日志__常见日志格式_用户指南_日志服务-阿里云
业务数据分析__快速开始_移动数据分析-阿里云
启动执行计划调度__执行计划_API参考_E-MapReduce-阿里云
计费案例__购买指南_对象存储 OSS-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云