閱讀513 返回首頁    go 阿裏雲 go 技術社區[雲棲]


原生SDK介紹__Java SDK介紹_MapReduce_大數據計算服務-阿裏雲

在本小節,我們僅會對較為常用的MapReduce核心接口做簡短介紹。使用Maven的用戶可以從Maven庫中搜索”odps-sdk-mapred”獲取不同版本的Java SDK,相關配置信息:

  1. <dependency>
  2. <groupId>com.aliyun.odps</groupId>
  3. <artifactId>odps-sdk-mapred</artifactId>
  4. <version>0.20.7-public</version>
  5. </dependency>
主要接口 描述
MapperBase 用戶自定義的Map函數需要繼承自此類。處理輸入表的記錄對 象,加工處理成鍵值對集合輸出到Reduce階段,或者不經過 Reduce階段直接輸出結果記錄到結果表。不經過Reduce階段而 直接輸出計算結果的作業,也可稱之為Map-Only作業。
ReducerBase 用戶自定義的Reduce函數需要繼承自此類。對與一個鍵(Key) 關聯的一組數值集(Values)進行歸約計算。
TaskContext 是MapperBase及ReducerBase多個成員函數的輸入參數之一。 含有任務運行的上下文信息。
JobClient 用於提交和管理作業,提交方式包括阻塞(同步)方式及非阻塞 (異步)方式。
RunningJob 作業運行時對象,用於跟蹤運行中的MapReduce作業實例。
JobConf 描述一個MapReduce任務的配置,通常在主程序(main函數)中 定義JobConf對象,然後通過JobClient提交作業給ODPS服務。

MapperBase

主要函數接口:

主要接口 描述
void cleanup(TaskContext context) 在Map階段結束時,map方法之後調用。
void map(long key, Record record, TaskContext context) map方法,處理輸入表的記錄。
void setup(TaskContext context) 在Map階段開始時,map方法之前調用。

ReducerBase

主要函數接口:

主要接口 描述
void cleanup( TaskContext context) 在Reduce階段結束時,reduce方法之後調用。
void reduce(Record key, Iterator<Record > values, TaskContext context) reduce方法,處理輸入表的記錄。
void setup( TaskContext context) 在Reduce階段開始時,reduce方法之前調用。

TaskContext

主要函數接口:

主要接口 描述
TableInfo[] getOutputTableInfo() 獲取輸出的表信息
Record createOutputRecord() 創建默認輸出表的記錄對象
Record createOutputRecord(String label) 創建給定label輸出表的記錄對象
Record createMapOutputKeyRecord() 創建Map輸出Key的記錄對象
Record createMapOutputValueRecord() 創建Map輸出Value的記錄對象
void write(Record record) 寫記錄到默認輸出,用於Reduce端寫出數據, 可以在Reduce端多次調用。
void write(Record record, String label) 寫記錄到給定標簽輸出,用於Reduce端寫出數據。可以在 Reduce端多次調用。
void write(Record key, Record value) Map寫記錄到中間結果,可以在Map函數中多次調用。 可以在Map端多次調用。
BufferedInputStream readResourceFileAsStream(String resourceName) 讀取文件類型資源
Iterator<Record > readResourceTable(String resourceName) 讀取表類型資源
Counter getCounter(Enum<? > name) 獲取給定名稱的Counter對象
Counter getCounter(String group, String name) 獲取給定組名和名稱的Counter對象
void progress() 向MapReduce框架報告心跳信息。 如果用戶方法處理時間 很長,且中間沒有調用框架,可以調用這個方法避免task 超時,框架默認600秒超時。

備注:ODPS的TaskContext接口中提供了progress功能,但此功能是防止Worker長時間運行未結束,被框架誤認為超時而被殺的情況出現。 這個接口更類似於向框架發送心跳信息,並不是用來匯報Worker進度。ODPS MapReduce默認Worker超時時間為10分鍾(係統默認配置,不受用戶控製), 如果超過10分鍾,Worker仍然沒有向框架發送心跳(調用progress接口),框架會強製停止該Worker,MapReduce任務失敗退出。因此, 建議用戶在Mapper/Reducer函數中,定期調用progress接口,防止框架認為Worker超時,誤殺任務。

JobConf

主要函數接口:

主要接口 描述
void setResources(String resourceNames) 聲明本作業使用的資源。隻有聲明的資源才能在運行 Mapper/Reducer時通過TaskContext對象讀取。
void setMapOutputKeySchema(Column[] schema) 設置Mapper輸出到Reducer的Key屬性
void setMapOutputValueSchema(Column[] schema) 設置Mapper輸出到Reducer的Value屬性
void setOutputKeySortColumns(String[] cols) 設置Mapper輸出到Reducer的Key排序列
void setOutputGroupingColumns(String[] cols) 設置Key分組列
void setMapperClass(Class<? extends Mapper > theClass) 設置作業的Mapper函數
void setPartitionColumns(String[] cols) 設置作業指定的分區列. 默認是Mapper輸出Key的所有列
void setReducerClass(Class<? extends Reducer > theClass) 設置作業的Reducer
void setCombinerClass(Class<? extends Reducer > theClass) 設置作業的combiner。在Map端運行,作用類似於單個Map 對本地的相同Key值做Reduce
void setSplitSize(long size) 設置輸入分片大小,單位 MB,默認256
void setNumReduceTasks(int n) 設置Reducer任務數,默認為Mapper任務數的1/4
void setMemoryForMapTask(int mem) 設置Mapper任務中單個Worker的內存大小,單位:MB, 默認值2048。
void setMemoryForReduceTask(int mem) 設置Reducer任務中單個Worker的內存大小,單位:MB, 默認值 2048。

備注:

  • 通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
  • 在Map端,Mapper輸出的Record會根據設置的PartitionColumns計算哈希值,決定分配到哪個Reducer,會根據KeySortColumns對Record進行排序。
  • 在Reduce端,輸入Records在按照KeySortColumns排序好後,會根據GroupingColumns指定的列對輸入的Records進行分組,即會順序遍曆輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數調用的輸入。

JobClient

主要函數接口:

主要接口 描述
static RunningJob runJob(JobConf job) 阻塞(同步)方式提交MapReduce作業後立即返回
static RunningJob submitJob(JobConf job) 非阻塞(異步)方式提交MapReduce作業後立即返回

RunningJob

主要函數接口:

主要接口 描述
String getInstanceID() 獲取作業運行實例ID,用於查看運行日誌和作業管理。
boolean isComplete() 查詢作業是否結束。
boolean isSuccessful() 查詢作業實例是否運行成功。
void waitForCompletion() 等待直至作業實例結束。一般使用於異步方式提交的作業。
JobStatus getJobStatus() 查詢作業實例運行狀態。
void killJob() 結束此作業。
Counters getCounters() 獲取Conter信息。

InputUtils

主要函數接口:

主要接口 描述
static void addTable(TableInfo table, JobConf conf) 添加表table到任務輸入,可以被調用多次 ,新加入的表以append方式添加到輸入隊列中。
static void setTables(TableInfo [] tables, JobConf conf) 添加多張表到任務輸入中。

OutputUtils

主要函數接口:

主要接口 描述
static void addTable(TableInfo table, JobConf conf) 添加表table到任務輸出,可以被調用多次 ,新加入的表以append方式添加到輸出隊 列中。
static void setTables(TableInfo [] tables, JobConf conf) 添加多張表到任務輸出中。

Pipeline

Pipeline是MR2的主體類。可以通過Pipeline.builder構建一個Pipeline。Pipeline的主要接口如下:

  1. public Builder addMapper(Class<? extends Mapper> mapper)
  2. public Builder addMapper(Class<? extends Mapper> mapper,
  3. Column[] keySchema, Column[] valueSchema, String[] sortCols,
  4. SortOrder[] order, String[] partCols,
  5. Class<? extends Partitioner> theClass, String[] groupCols)
  6. public Builder addReducer(Class<? extends Reducer> reducer)
  7. public Builder addReducer(Class<? extends Reducer> reducer,
  8. Column[] keySchema, Column[] valueSchema, String[] sortCols,
  9. SortOrder[] order, String[] partCols,
  10. Class<? extends Partitioner> theClass, String[] groupCols)
  11. public Builder setOutputKeySchema(Column[] keySchema)
  12. public Builder setOutputValueSchema(Column[] valueSchema)
  13. public Builder setOutputKeySortColumns(String[] sortCols)
  14. public Builder setOutputKeySortOrder(SortOrder[] order)
  15. public Builder setPartitionColumns(String[] partCols)
  16. public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
  17. public Builder setOutputGroupingColumns(String[] cols)

使用示例:

  1. Job job = new Job();
  2. Pipeline pipeline = Pipeline.builder()
  3. .addMapper(TokenizerMapper.class)
  4. .setOutputKeySchema(
  5. new Column[] { new Column("word", OdpsType.STRING) })
  6. .setOutputValueSchema(
  7. new Column[] { new Column("count", OdpsType.BIGINT) })
  8. .addReducer(SumReducer.class)
  9. .setOutputKeySchema(
  10. new Column[] { new Column("count", OdpsType.BIGINT) })
  11. .setOutputValueSchema(
  12. new Column[] { new Column("word", OdpsType.STRING),
  13. new Column("count", OdpsType.BIGINT) })
  14. .addReducer(IdentityReducer.class).createPipeline();
  15. job.setPipeline(pipeline);
  16. job.addInput(...)
  17. job.addOutput(...)
  18. job.submit();

如上所示,用戶可以在main函數中構建一個Map後連續接兩個Reduce的MapReduce任務。如果用戶比較熟悉MapReduce的基礎功能,可以輕鬆的使用MR2

我們也建議用戶在使用MR2功能之前,先了解MapReduce的基礎用法。

當然,JobConf 僅能夠配置Map後接單Reduce的MapReduce任務。

數據類型

MapReduce支持的數據類型有:bigint, string, double, boolean, datetime以及decimal類型。ODPS數據類型與Java類型的對應關係如下:

ODPS SQL Type Bigint String Double Boolean Datetime Decimal
Java Type Long String Double Boolean Date BigDecimal

最後更新:2016-11-05 12:40:43

  上一篇:go 應用限製__MapReduce_大數據計算服務-阿裏雲
  下一篇:go 兼容版本SDK介紹__Java SDK介紹_MapReduce_大數據計算服務-阿裏雲