513
阿里云
原生SDK介绍__Java SDK介绍_MapReduce_大数据计算服务-阿里云
在本小节,我们仅会对较为常用的MapReduce核心接口做简短介绍。使用Maven的用户可以从Maven库中搜索”odps-sdk-mapred”获取不同版本的Java SDK,相关配置信息:
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.20.7-public</version>
</dependency>
主要接口 | 描述 |
---|---|
MapperBase | 用户自定义的Map函数需要继承自此类。处理输入表的记录对 象,加工处理成键值对集合输出到Reduce阶段,或者不经过 Reduce阶段直接输出结果记录到结果表。不经过Reduce阶段而 直接输出计算结果的作业,也可称之为Map-Only作业。 |
ReducerBase | 用户自定义的Reduce函数需要继承自此类。对与一个键(Key) 关联的一组数值集(Values)进行归约计算。 |
TaskContext | 是MapperBase及ReducerBase多个成员函数的输入参数之一。 含有任务运行的上下文信息。 |
JobClient | 用于提交和管理作业,提交方式包括阻塞(同步)方式及非阻塞 (异步)方式。 |
RunningJob | 作业运行时对象,用于跟踪运行中的MapReduce作业实例。 |
JobConf | 描述一个MapReduce任务的配置,通常在主程序(main函数)中 定义JobConf对象,然后通过JobClient提交作业给ODPS服务。 |
MapperBase
主要函数接口:
主要接口 | 描述 |
---|---|
void cleanup(TaskContext context) | 在Map阶段结束时,map方法之后调用。 |
void map(long key, Record record, TaskContext context) | map方法,处理输入表的记录。 |
void setup(TaskContext context) | 在Map阶段开始时,map方法之前调用。 |
ReducerBase
主要函数接口:
主要接口 | 描述 |
---|---|
void cleanup( TaskContext context) | 在Reduce阶段结束时,reduce方法之后调用。 |
void reduce(Record key, Iterator<Record > values, TaskContext context) | reduce方法,处理输入表的记录。 |
void setup( TaskContext context) | 在Reduce阶段开始时,reduce方法之前调用。 |
TaskContext
主要函数接口:
主要接口 | 描述 |
---|---|
TableInfo[] getOutputTableInfo() | 获取输出的表信息 |
Record createOutputRecord() | 创建默认输出表的记录对象 |
Record createOutputRecord(String label) | 创建给定label输出表的记录对象 |
Record createMapOutputKeyRecord() | 创建Map输出Key的记录对象 |
Record createMapOutputValueRecord() | 创建Map输出Value的记录对象 |
void write(Record record) | 写记录到默认输出,用于Reduce端写出数据, 可以在Reduce端多次调用。 |
void write(Record record, String label) | 写记录到给定标签输出,用于Reduce端写出数据。可以在 Reduce端多次调用。 |
void write(Record key, Record value) | Map写记录到中间结果,可以在Map函数中多次调用。 可以在Map端多次调用。 |
BufferedInputStream readResourceFileAsStream(String resourceName) | 读取文件类型资源 |
Iterator<Record > readResourceTable(String resourceName) | 读取表类型资源 |
Counter getCounter(Enum<? > name) | 获取给定名称的Counter对象 |
Counter getCounter(String group, String name) | 获取给定组名和名称的Counter对象 |
void progress() | 向MapReduce框架报告心跳信息。 如果用户方法处理时间 很长,且中间没有调用框架,可以调用这个方法避免task 超时,框架默认600秒超时。 |
备注:ODPS的TaskContext接口中提供了progress功能,但此功能是防止Worker长时间运行未结束,被框架误认为超时而被杀的情况出现。 这个接口更类似于向框架发送心跳信息,并不是用来汇报Worker进度。ODPS MapReduce默认Worker超时时间为10分钟(系统默认配置,不受用户控制), 如果超过10分钟,Worker仍然没有向框架发送心跳(调用progress接口),框架会强制停止该Worker,MapReduce任务失败退出。因此, 建议用户在Mapper/Reducer函数中,定期调用progress接口,防止框架认为Worker超时,误杀任务。
JobConf
主要函数接口:
主要接口 | 描述 |
---|---|
void setResources(String resourceNames) | 声明本作业使用的资源。只有声明的资源才能在运行 Mapper/Reducer时通过TaskContext对象读取。 |
void setMapOutputKeySchema(Column[] schema) | 设置Mapper输出到Reducer的Key属性 |
void setMapOutputValueSchema(Column[] schema) | 设置Mapper输出到Reducer的Value属性 |
void setOutputKeySortColumns(String[] cols) | 设置Mapper输出到Reducer的Key排序列 |
void setOutputGroupingColumns(String[] cols) | 设置Key分组列 |
void setMapperClass(Class<? extends Mapper > theClass) | 设置作业的Mapper函数 |
void setPartitionColumns(String[] cols) | 设置作业指定的分区列. 默认是Mapper输出Key的所有列 |
void setReducerClass(Class<? extends Reducer > theClass) | 设置作业的Reducer |
void setCombinerClass(Class<? extends Reducer > theClass) | 设置作业的combiner。在Map端运行,作用类似于单个Map 对本地的相同Key值做Reduce |
void setSplitSize(long size) | 设置输入分片大小,单位 MB,默认256 |
void setNumReduceTasks(int n) | 设置Reducer任务数,默认为Mapper任务数的1/4 |
void setMemoryForMapTask(int mem) | 设置Mapper任务中单个Worker的内存大小,单位:MB, 默认值2048。 |
void setMemoryForReduceTask(int mem) | 设置Reducer任务中单个Worker的内存大小,单位:MB, 默认值 2048。 |
备注:
- 通常情况下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
- 在Map端,Mapper输出的Record会根据设置的PartitionColumns计算哈希值,决定分配到哪个Reducer,会根据KeySortColumns对Record进行排序。
- 在Reduce端,输入Records在按照KeySortColumns排序好后,会根据GroupingColumns指定的列对输入的Records进行分组,即会顺序遍历输入的Records,把GroupingColumns所指定列相同的Records作为一次reduce函数调用的输入。
JobClient
主要函数接口:
主要接口 | 描述 |
---|---|
static RunningJob runJob(JobConf job) | 阻塞(同步)方式提交MapReduce作业后立即返回 |
static RunningJob submitJob(JobConf job) | 非阻塞(异步)方式提交MapReduce作业后立即返回 |
RunningJob
主要函数接口:
主要接口 | 描述 |
---|---|
String getInstanceID() | 获取作业运行实例ID,用于查看运行日志和作业管理。 |
boolean isComplete() | 查询作业是否结束。 |
boolean isSuccessful() | 查询作业实例是否运行成功。 |
void waitForCompletion() | 等待直至作业实例结束。一般使用于异步方式提交的作业。 |
JobStatus getJobStatus() | 查询作业实例运行状态。 |
void killJob() | 结束此作业。 |
Counters getCounters() | 获取Conter信息。 |
InputUtils
主要函数接口:
主要接口 | 描述 |
---|---|
static void addTable(TableInfo table, JobConf conf) | 添加表table到任务输入,可以被调用多次 ,新加入的表以append方式添加到输入队列中。 |
static void setTables(TableInfo [] tables, JobConf conf) | 添加多张表到任务输入中。 |
OutputUtils
主要函数接口:
主要接口 | 描述 |
---|---|
static void addTable(TableInfo table, JobConf conf) | 添加表table到任务输出,可以被调用多次 ,新加入的表以append方式添加到输出队 列中。 |
static void setTables(TableInfo [] tables, JobConf conf) | 添加多张表到任务输出中。 |
Pipeline
Pipeline是MR2的主体类。可以通过Pipeline.builder构建一个Pipeline。Pipeline的主要接口如下:
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)
使用示例:
Job job = new Job();
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputValueSchema(
new Column[] { new Column("word", OdpsType.STRING),
new Column("count", OdpsType.BIGINT) })
.addReducer(IdentityReducer.class).createPipeline();
job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();
如上所示,用户可以在main函数中构建一个Map后连续接两个Reduce的MapReduce任务。如果用户比较熟悉MapReduce的基础功能,可以轻松的使用MR2 。
我们也建议用户在使用MR2功能之前,先了解MapReduce的基础用法。
当然,JobConf 仅能够配置Map后接单Reduce的MapReduce任务。
数据类型
MapReduce支持的数据类型有:bigint, string, double, boolean, datetime以及decimal类型。ODPS数据类型与Java类型的对应关系如下:
ODPS SQL Type | Bigint | String | Double | Boolean | Datetime | Decimal |
---|---|---|---|---|---|---|
Java Type | Long | String | Double | Boolean | Date | BigDecimal |
最后更新:2016-11-05 12:40:43
上一篇:
应用限制__MapReduce_大数据计算服务-阿里云
下一篇:
兼容版本SDK介绍__Java SDK介绍_MapReduce_大数据计算服务-阿里云
修改共享带宽包属性__NAT网关相关接口_API参考_专有网络 VPC-阿里云
开通简介__购买指导_访问控制-阿里云
LogHub数据源__准备数据源_用户指南_业务实时监控服务 ARMS-阿里云
ActionTrail支持查询多久的操作记录?__常见问题_常见问题_操作审计-阿里云
重磅 阿里云黑科技发布 马云再次改变世界
删除对象__管理文件_开发人员指南_对象存储 OSS-阿里云
重新创建集群实例__SDK接口说明_Java版SDK_批量计算-阿里云
查询截图作业__截图接口_API使用手册_媒体转码-阿里云
AnalyticDB数据源配置__数据源配置_数据同步手册_用户操作指南_大数据开发套件-阿里云
标签管理__媒体库管理_开发人员指南_视频点播-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云