原生SDK介紹__Java SDK介紹_MapReduce_大數據計算服務-阿裏雲
在本小節,我們僅會對較為常用的MapReduce核心接口做簡短介紹。使用Maven的用戶可以從Maven庫中搜索”odps-sdk-mapred”獲取不同版本的Java SDK,相關配置信息:
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.20.7-public</version>
</dependency>
主要接口 | 描述 |
---|---|
MapperBase | 用戶自定義的Map函數需要繼承自此類。處理輸入表的記錄對 象,加工處理成鍵值對集合輸出到Reduce階段,或者不經過 Reduce階段直接輸出結果記錄到結果表。不經過Reduce階段而 直接輸出計算結果的作業,也可稱之為Map-Only作業。 |
ReducerBase | 用戶自定義的Reduce函數需要繼承自此類。對與一個鍵(Key) 關聯的一組數值集(Values)進行歸約計算。 |
TaskContext | 是MapperBase及ReducerBase多個成員函數的輸入參數之一。 含有任務運行的上下文信息。 |
JobClient | 用於提交和管理作業,提交方式包括阻塞(同步)方式及非阻塞 (異步)方式。 |
RunningJob | 作業運行時對象,用於跟蹤運行中的MapReduce作業實例。 |
JobConf | 描述一個MapReduce任務的配置,通常在主程序(main函數)中 定義JobConf對象,然後通過JobClient提交作業給ODPS服務。 |
MapperBase
主要函數接口:
主要接口 | 描述 |
---|---|
void cleanup(TaskContext context) | 在Map階段結束時,map方法之後調用。 |
void map(long key, Record record, TaskContext context) | map方法,處理輸入表的記錄。 |
void setup(TaskContext context) | 在Map階段開始時,map方法之前調用。 |
ReducerBase
主要函數接口:
主要接口 | 描述 |
---|---|
void cleanup( TaskContext context) | 在Reduce階段結束時,reduce方法之後調用。 |
void reduce(Record key, Iterator<Record > values, TaskContext context) | reduce方法,處理輸入表的記錄。 |
void setup( TaskContext context) | 在Reduce階段開始時,reduce方法之前調用。 |
TaskContext
主要函數接口:
主要接口 | 描述 |
---|---|
TableInfo[] getOutputTableInfo() | 獲取輸出的表信息 |
Record createOutputRecord() | 創建默認輸出表的記錄對象 |
Record createOutputRecord(String label) | 創建給定label輸出表的記錄對象 |
Record createMapOutputKeyRecord() | 創建Map輸出Key的記錄對象 |
Record createMapOutputValueRecord() | 創建Map輸出Value的記錄對象 |
void write(Record record) | 寫記錄到默認輸出,用於Reduce端寫出數據, 可以在Reduce端多次調用。 |
void write(Record record, String label) | 寫記錄到給定標簽輸出,用於Reduce端寫出數據。可以在 Reduce端多次調用。 |
void write(Record key, Record value) | Map寫記錄到中間結果,可以在Map函數中多次調用。 可以在Map端多次調用。 |
BufferedInputStream readResourceFileAsStream(String resourceName) | 讀取文件類型資源 |
Iterator<Record > readResourceTable(String resourceName) | 讀取表類型資源 |
Counter getCounter(Enum<? > name) | 獲取給定名稱的Counter對象 |
Counter getCounter(String group, String name) | 獲取給定組名和名稱的Counter對象 |
void progress() | 向MapReduce框架報告心跳信息。 如果用戶方法處理時間 很長,且中間沒有調用框架,可以調用這個方法避免task 超時,框架默認600秒超時。 |
備注:ODPS的TaskContext接口中提供了progress功能,但此功能是防止Worker長時間運行未結束,被框架誤認為超時而被殺的情況出現。 這個接口更類似於向框架發送心跳信息,並不是用來匯報Worker進度。ODPS MapReduce默認Worker超時時間為10分鍾(係統默認配置,不受用戶控製), 如果超過10分鍾,Worker仍然沒有向框架發送心跳(調用progress接口),框架會強製停止該Worker,MapReduce任務失敗退出。因此, 建議用戶在Mapper/Reducer函數中,定期調用progress接口,防止框架認為Worker超時,誤殺任務。
JobConf
主要函數接口:
主要接口 | 描述 |
---|---|
void setResources(String resourceNames) | 聲明本作業使用的資源。隻有聲明的資源才能在運行 Mapper/Reducer時通過TaskContext對象讀取。 |
void setMapOutputKeySchema(Column[] schema) | 設置Mapper輸出到Reducer的Key屬性 |
void setMapOutputValueSchema(Column[] schema) | 設置Mapper輸出到Reducer的Value屬性 |
void setOutputKeySortColumns(String[] cols) | 設置Mapper輸出到Reducer的Key排序列 |
void setOutputGroupingColumns(String[] cols) | 設置Key分組列 |
void setMapperClass(Class<? extends Mapper > theClass) | 設置作業的Mapper函數 |
void setPartitionColumns(String[] cols) | 設置作業指定的分區列. 默認是Mapper輸出Key的所有列 |
void setReducerClass(Class<? extends Reducer > theClass) | 設置作業的Reducer |
void setCombinerClass(Class<? extends Reducer > theClass) | 設置作業的combiner。在Map端運行,作用類似於單個Map 對本地的相同Key值做Reduce |
void setSplitSize(long size) | 設置輸入分片大小,單位 MB,默認256 |
void setNumReduceTasks(int n) | 設置Reducer任務數,默認為Mapper任務數的1/4 |
void setMemoryForMapTask(int mem) | 設置Mapper任務中單個Worker的內存大小,單位:MB, 默認值2048。 |
void setMemoryForReduceTask(int mem) | 設置Reducer任務中單個Worker的內存大小,單位:MB, 默認值 2048。 |
備注:
- 通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
- 在Map端,Mapper輸出的Record會根據設置的PartitionColumns計算哈希值,決定分配到哪個Reducer,會根據KeySortColumns對Record進行排序。
- 在Reduce端,輸入Records在按照KeySortColumns排序好後,會根據GroupingColumns指定的列對輸入的Records進行分組,即會順序遍曆輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數調用的輸入。
JobClient
主要函數接口:
主要接口 | 描述 |
---|---|
static RunningJob runJob(JobConf job) | 阻塞(同步)方式提交MapReduce作業後立即返回 |
static RunningJob submitJob(JobConf job) | 非阻塞(異步)方式提交MapReduce作業後立即返回 |
RunningJob
主要函數接口:
主要接口 | 描述 |
---|---|
String getInstanceID() | 獲取作業運行實例ID,用於查看運行日誌和作業管理。 |
boolean isComplete() | 查詢作業是否結束。 |
boolean isSuccessful() | 查詢作業實例是否運行成功。 |
void waitForCompletion() | 等待直至作業實例結束。一般使用於異步方式提交的作業。 |
JobStatus getJobStatus() | 查詢作業實例運行狀態。 |
void killJob() | 結束此作業。 |
Counters getCounters() | 獲取Conter信息。 |
InputUtils
主要函數接口:
主要接口 | 描述 |
---|---|
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務輸入,可以被調用多次 ,新加入的表以append方式添加到輸入隊列中。 |
static void setTables(TableInfo [] tables, JobConf conf) | 添加多張表到任務輸入中。 |
OutputUtils
主要函數接口:
主要接口 | 描述 |
---|---|
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務輸出,可以被調用多次 ,新加入的表以append方式添加到輸出隊 列中。 |
static void setTables(TableInfo [] tables, JobConf conf) | 添加多張表到任務輸出中。 |
Pipeline
Pipeline是MR2的主體類。可以通過Pipeline.builder構建一個Pipeline。Pipeline的主要接口如下:
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)
使用示例:
Job job = new Job();
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputValueSchema(
new Column[] { new Column("word", OdpsType.STRING),
new Column("count", OdpsType.BIGINT) })
.addReducer(IdentityReducer.class).createPipeline();
job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();
如上所示,用戶可以在main函數中構建一個Map後連續接兩個Reduce的MapReduce任務。如果用戶比較熟悉MapReduce的基礎功能,可以輕鬆的使用MR2 。
我們也建議用戶在使用MR2功能之前,先了解MapReduce的基礎用法。
當然,JobConf 僅能夠配置Map後接單Reduce的MapReduce任務。
數據類型
MapReduce支持的數據類型有:bigint, string, double, boolean, datetime以及decimal類型。ODPS數據類型與Java類型的對應關係如下:
ODPS SQL Type | Bigint | String | Double | Boolean | Datetime | Decimal |
---|---|---|---|---|---|---|
Java Type | Long | String | Double | Boolean | Date | BigDecimal |
最後更新:2016-11-05 12:40:43
上一篇:
應用限製__MapReduce_大數據計算服務-阿裏雲
下一篇:
兼容版本SDK介紹__Java SDK介紹_MapReduce_大數據計算服務-阿裏雲
修改共享帶寬包屬性__NAT網關相關接口_API參考_專有網絡 VPC-阿裏雲
開通簡介__購買指導_訪問控製-阿裏雲
LogHub數據源__準備數據源_用戶指南_業務實時監控服務 ARMS-阿裏雲
ActionTrail支持查詢多久的操作記錄?__常見問題_常見問題_操作審計-阿裏雲
重磅 阿裏雲黑科技發布 馬雲再次改變世界
刪除對象__管理文件_開發人員指南_對象存儲 OSS-阿裏雲
重新創建集群實例__SDK接口說明_Java版SDK_批量計算-阿裏雲
查詢截圖作業__截圖接口_API使用手冊_媒體轉碼-阿裏雲
AnalyticDB數據源配置__數據源配置_數據同步手冊_用戶操作指南_大數據開發套件-阿裏雲
標簽管理__媒體庫管理_開發人員指南_視頻點播-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲