Java UDF__UDF_SQL_大數據計算服務-阿裏雲
實現UDF需要繼承com.aliyun.odps.udf.UDF類,並實現evaluate方法。evaluate方法必須是非static的public方法。Evaluate方法的參數和返回值類型將作為SQL中UDF的函數簽名。這意味著用戶可以在UDF中實現多個evaluate方法,在調用UDF時,框架會依據UDF調用的參數類型匹配正確的evaluate方法。
下麵是一個UDF的例子。
package org.alidata.odps.udf.examples;import com.aliyun.odps.udf.UDF;public final class Lower extends UDF {public String evaluate(String s) {if (s == null) { return null; }return s.toLowerCase();}}
可以通過實現void setup(ExecutionContext ctx)和void close()來分別實現UDF的初始化和結束代碼。
UDF的使用方式於ODPS SQL中普通的內建函數相同,詳情請參考內建函數。
UDAF
實現Java UDAF類需要繼承com.aliyun.odps.udf.Aggregator,並實現如下幾個接口:
public abstract class Aggregator implements ContextFunction {@Overridepublic void setup(ExecutionContext ctx) throws UDFException {}@Overridepublic void close() throws UDFException {}/*** 創建聚合Buffer* @return Writable 聚合buffer*/abstract public Writable newBuffer();/*** @param buffer 聚合buffer* @param args SQL中調用UDAF時指定的參數* @throws UDFException*/abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;/*** 生成最終結果* @param buffer* @return Object UDAF的最終結果* @throws UDFException*/abstract public Writable terminate(Writable buffer) throws UDFException;abstract public void merge(Writable buffer, Writable partial) throws UDFException;}
其中最重要的是iterate,merge和terminate三個接口,UDAF的主要邏輯依賴於這三個接口的實現。此外,還需要用戶實現自定義的Writable buffer。以實現求平均值avg為例,下圖簡要說明了在ODPS UDAF中這一函數的實現邏輯及計算流程:

在上圖中,輸入數據被按照一定的大小進行分片(有關分片的描述可參考 MapReduce ),每片的大小適合一個worker在適當的時間內完成。這個分片大小的設置需要用戶手動配置完成。UDAF的計算過程分為兩階段:
- 在第一階段,每個worker統計分片內數據的個數及匯總值,我們可以將每個分片內的數據個數及匯總值視為一個中間結果;
- 在第二階段,worker匯總上一個階段中每個分片內的信息。在最終輸出時,r.sum / r.count即是所有輸入數據的平均值;
下麵是一個計算平均值的UDAF的代碼示例:
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import com.aliyun.odps.io.DoubleWritable;import com.aliyun.odps.io.Writable;import com.aliyun.odps.udf.Aggregator;import com.aliyun.odps.udf.UDFException;import com.aliyun.odps.udf.annotation.Resolve;@Resolve({"double->double"})public class AggrAvg extends Aggregator {private static class AvgBuffer implements Writable {private double sum = 0;private long count = 0;@Overridepublic void write(DataOutput out) throws IOException {out.writeDouble(sum);out.writeLong(count);}@Overridepublic void readFields(DataInput in) throws IOException {sum = in.readDouble();count = in.readLong();}}private DoubleWritable ret = new DoubleWritable();@Overridepublic Writable newBuffer() {return new AvgBuffer();}@Overridepublic void iterate(Writable buffer, Writable[] args) throws UDFException {DoubleWritable arg = (DoubleWritable) args[0];AvgBuffer buf = (AvgBuffer) buffer;if (arg != null) {buf.count += 1;buf.sum += arg.get();}}@Overridepublic Writable terminate(Writable buffer) throws UDFException {AvgBuffer buf = (AvgBuffer) buffer;if (buf.count == 0) {ret.set(0);} else {ret.set(buf.sum / buf.count);}return ret;}@Overridepublic void merge(Writable buffer, Writable partial) throws UDFException {AvgBuffer buf = (AvgBuffer) buffer;AvgBuffer p = (AvgBuffer) partial;buf.sum += p.sum;buf.count += p.count;}}
注意:
UDTF
Java UDTF需要繼承com.aliyun.odps.udf.UDTF類。這個類需要實現4個接口。
| 接口定義 | 描述 |
|---|---|
| public void setup(ExecutionContext ctx) throws UDFException | 初始化方法,在UDTF處理輸入數據前,調用用戶自定義的初始化行為。在每個Worker內setup會被先調用一次。 |
| public void process(Object[] args) throws UDFException | 這個方法由框架調用,SQL中每一條記錄都會對應調用一次process,process的參數為SQL語句中指定的UDTF輸入參數。輸入參數以Object[]的形式傳入,輸出結果通過forward函數輸出。用戶需要在process函數內自行調用forward,以決定輸出數據。 |
| public void close() throws UDFException | UDTF的結束方法,此方法由框架調用,並且隻會被調用一次,即在處理完最後一條記錄之後。 |
| public void forward(Object …o) throws UDFException | 用戶調用forward方法輸出數據,每次forward代表輸出一條記錄。對應SQL語句UDTF的as子句指定的列。 |
下麵將給出一個UDTF程序示例:
package org.alidata.odps.udtf.examples;import com.aliyun.odps.udf.UDTF;import com.aliyun.odps.udf.UDTFCollector;import com.aliyun.odps.udf.annotation.Resolve;import com.aliyun.odps.udf.UDFException;// TODO define input and output types, e.g., "string,string->string,bigint".@Resolve({"string,bigint->string,bigint"})public class MyUDTF extends UDTF {@Overridepublic void process(Object[] args) throws UDFException {String a = (String) args[0];Long b = (Long) args[1];for (String t: a.split("\s+")) {forward(t, b);}}}
注:
- 以上隻是程序示例,關於如何在ODPS中運行UDTF,方法與UDF類似,具體實現步驟請參考運行UDF。
在SQL中可以這樣使用這個UDTF,假設在ODPS上創建UDTF時注冊函數名為user_udtf:
select user_udtf(col0, col1) as (c0, c1) from my_table;
假設my_table的col0, col1的值為:
+------+------+| col0 | col1 |+------+------+| A B | 1 || C D | 2 |+------+------+
則select出的結果為:
+----+----+| c0 | c1 |+----+----+| A | 1 || B | 1 || C | 2 || D | 2 |+----+----+
使用說明
UDTF在SQL中的常用方式如下:
select user_udtf(col0, col1, col2) as (c0, c1) from my_table;select user_udtf(col0, col1, col2) as (c0, c1) from(select * from my_table distribute by key sort by key) t;select reduce_udtf(col0, col1, col2) as (c0, c1) from(select col0, col1, col2 from(select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1distribute by col0 sort by col0, col1) t2;
但使用UDTF有如下使用限製:
- 同一個SELECT子句中不允許有其他表達式
select value, user_udtf(key) as mycol ...
- UDTF不能嵌套使用
select user_udtf1(user_udtf2(key)) as mycol...
- 不支持在同一個select子句中與 group by / distribute by / sort by 聯用
select user_udtf(key) as mycol ... group by mycol
其他UDTF示例
在UDTF中,用戶可以讀取ODPS的 資源 。下麵將介紹利用udtf讀取ODPS資源的示例。
編寫udtf程序,編譯成功後導出jar包(udtfexample1.jar)。
package com.aliyun.odps.examples.udf;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.util.Iterator;import com.aliyun.odps.udf.ExecutionContext;import com.aliyun.odps.udf.UDFException;import com.aliyun.odps.udf.UDTF;import com.aliyun.odps.udf.annotation.Resolve;/*** project: example_project* table: wc_in2* partitions: p2=1,p1=2* columns: colc,colb*/@Resolve({ "string,string->string,bigint,string" })public class UDTFResource extends UDTF {ExecutionContext ctx;long fileResourceLineCount;long tableResource1RecordCount;long tableResource2RecordCount;@Overridepublic void setup(ExecutionContext ctx) throws UDFException {this.ctx = ctx;try {InputStream in = ctx.readResourceFileAsStream("file_resource.txt");BufferedReader br = new BufferedReader(new InputStreamReader(in));String line;fileResourceLineCount = 0;while ((line = br.readLine()) != null) {fileResourceLineCount++;}br.close();Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();tableResource1RecordCount = 0;while (iterator.hasNext()) {tableResource1RecordCount++;iterator.next();}iterator = ctx.readResourceTable("table_resource2").iterator();tableResource2RecordCount = 0;while (iterator.hasNext()) {tableResource2RecordCount++;iterator.next();}} catch (IOException e) {throw new UDFException(e);}}@Overridepublic void process(Object[] args) throws UDFException {String a = (String) args[0];long b = args[1] == null ? 0 : ((String) args[1]).length();forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="+ tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);}}
添加資源到ODPS:
Add file file_resource.txt;Add jar udtfexample1.jar;Add table table_resource1 as table_resource1;Add table table_resource2 as table_resource2;
在ODPS中創建UDTF函數(my_udtf):
create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
在odps創建資源表table_resource1、table_resource2, 物理表tmp1。並插入相應的數據。
運行該udtf:
select mp_udtf("10","20") as (a, b, fileResourceLineCount) from table_resource1;返回:+-------+------------+-------+| a | b | fileResourceLineCount |+-------+------------+-------+| 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 || 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |+-------+------------+-------+
最後更新:2016-11-23 17:16:04
上一篇:
內建函數-下__SQL_大數據計算服務-阿裏雲
下一篇:
轉義字符__附錄_SQL_大數據計算服務-阿裏雲
修改物理專線屬性__高速通道相關接口_API 參考_雲服務器 ECS-阿裏雲
作業盒子、騰躍校長在線、音樂筆記等獲融資;新東方網與新東方擬出資參與設立霍東投資;阿裏雲發布互聯網大學
產品使用限製__用戶指南_高速通道-阿裏雲
開發者使用引導__開發人員指南_消息服務-阿裏雲
視頻_阿裏雲幫助中心-阿裏雲,領先的雲計算服務提供商
事務消息__最佳實踐_消息服務-阿裏雲
CDN服務如何開啟GZIP壓縮功能___產品使用問題_CDN-阿裏雲
使用入門__Java SDK_STS SDK使用手冊_訪問控製-阿裏雲
資源授權場景__場景示例_Open API_消息隊列 MQ-阿裏雲
子賬號訪問__授權管理_阿裏雲物聯網套件-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲