761
人物
Java UDF__UDF_SQL_大數據計算服務-阿裏雲
實現UDF需要繼承com.aliyun.odps.udf.UDF類,並實現evaluate方法。evaluate方法必須是非static的public方法。Evaluate方法的參數和返回值類型將作為SQL中UDF的函數簽名。這意味著用戶可以在UDF中實現多個evaluate方法,在調用UDF時,框架會依據UDF調用的參數類型匹配正確的evaluate方法。
下麵是一個UDF的例子。
package org.alidata.odps.udf.examples;
import com.aliyun.odps.udf.UDF;
public final class Lower extends UDF {
public String evaluate(String s) {
if (s == null) { return null; }
return s.toLowerCase();
}
}
可以通過實現void setup(ExecutionContext ctx)
和void close()
來分別實現UDF的初始化和結束代碼。
UDF的使用方式於ODPS SQL中普通的內建函數相同,詳情請參考內建函數。
UDAF
實現Java UDAF類需要繼承com.aliyun.odps.udf.Aggregator,並實現如下幾個接口:
public abstract class Aggregator implements ContextFunction {
@Override
public void setup(ExecutionContext ctx) throws UDFException {
}
@Override
public void close() throws UDFException {
}
/**
* 創建聚合Buffer
* @return Writable 聚合buffer
*/
abstract public Writable newBuffer();
/**
* @param buffer 聚合buffer
* @param args SQL中調用UDAF時指定的參數
* @throws UDFException
*/
abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
/**
* 生成最終結果
* @param buffer
* @return Object UDAF的最終結果
* @throws UDFException
*/
abstract public Writable terminate(Writable buffer) throws UDFException;
abstract public void merge(Writable buffer, Writable partial) throws UDFException;
}
其中最重要的是iterate,merge和terminate三個接口,UDAF的主要邏輯依賴於這三個接口的實現。此外,還需要用戶實現自定義的Writable buffer。以實現求平均值avg為例,下圖簡要說明了在ODPS UDAF中這一函數的實現邏輯及計算流程:
在上圖中,輸入數據被按照一定的大小進行分片(有關分片的描述可參考 MapReduce ),每片的大小適合一個worker在適當的時間內完成。這個分片大小的設置需要用戶手動配置完成。UDAF的計算過程分為兩階段:
- 在第一階段,每個worker統計分片內數據的個數及匯總值,我們可以將每個分片內的數據個數及匯總值視為一個中間結果;
- 在第二階段,worker匯總上一個階段中每個分片內的信息。在最終輸出時,r.sum / r.count即是所有輸入數據的平均值;
下麵是一個計算平均值的UDAF的代碼示例:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({"double->double"})
public class AggrAvg extends Aggregator {
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg != null) {
buf.count += 1;
buf.sum += arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else {
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum += p.sum;
buf.count += p.count;
}
}
注意:
UDTF
Java UDTF需要繼承com.aliyun.odps.udf.UDTF類。這個類需要實現4個接口。
接口定義 | 描述 |
---|---|
public void setup(ExecutionContext ctx) throws UDFException | 初始化方法,在UDTF處理輸入數據前,調用用戶自定義的初始化行為。在每個Worker內setup會被先調用一次。 |
public void process(Object[] args) throws UDFException | 這個方法由框架調用,SQL中每一條記錄都會對應調用一次process,process的參數為SQL語句中指定的UDTF輸入參數。輸入參數以Object[]的形式傳入,輸出結果通過forward函數輸出。用戶需要在process函數內自行調用forward,以決定輸出數據。 |
public void close() throws UDFException | UDTF的結束方法,此方法由框架調用,並且隻會被調用一次,即在處理完最後一條記錄之後。 |
public void forward(Object …o) throws UDFException | 用戶調用forward方法輸出數據,每次forward代表輸出一條記錄。對應SQL語句UDTF的as子句指定的列。 |
下麵將給出一個UDTF程序示例:
package org.alidata.odps.udtf.examples;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.UDFException;
// TODO define input and output types, e.g., "string,string->string,bigint".
@Resolve({"string,bigint->string,bigint"})
public class MyUDTF extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
Long b = (Long) args[1];
for (String t: a.split("\s+")) {
forward(t, b);
}
}
}
注:
- 以上隻是程序示例,關於如何在ODPS中運行UDTF,方法與UDF類似,具體實現步驟請參考運行UDF。
在SQL中可以這樣使用這個UDTF,假設在ODPS上創建UDTF時注冊函數名為user_udtf:
select user_udtf(col0, col1) as (c0, c1) from my_table;
假設my_table的col0, col1的值為:
+------+------+
| col0 | col1 |
+------+------+
| A B | 1 |
| C D | 2 |
+------+------+
則select出的結果為:
+----+----+
| c0 | c1 |
+----+----+
| A | 1 |
| B | 1 |
| C | 2 |
| D | 2 |
+----+----+
使用說明
UDTF在SQL中的常用方式如下:
select user_udtf(col0, col1, col2) as (c0, c1) from my_table;
select user_udtf(col0, col1, col2) as (c0, c1) from
(select * from my_table distribute by key sort by key) t;
select reduce_udtf(col0, col1, col2) as (c0, c1) from
(select col0, col1, col2 from
(select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1
distribute by col0 sort by col0, col1) t2;
但使用UDTF有如下使用限製:
- 同一個SELECT子句中不允許有其他表達式
select value, user_udtf(key) as mycol ...
- UDTF不能嵌套使用
select user_udtf1(user_udtf2(key)) as mycol...
- 不支持在同一個select子句中與 group by / distribute by / sort by 聯用
select user_udtf(key) as mycol ... group by mycol
其他UDTF示例
在UDTF中,用戶可以讀取ODPS的 資源 。下麵將介紹利用udtf讀取ODPS資源的示例。
編寫udtf程序,編譯成功後導出jar包(udtfexample1.jar)。
package com.aliyun.odps.examples.udf;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
/**
* project: example_project
* table: wc_in2
* partitions: p2=1,p1=2
* columns: colc,colb
*/
@Resolve({ "string,string->string,bigint,string" })
public class UDTFResource extends UDTF {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
fileResourceLineCount = 0;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount++;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount++;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
long b = args[1] == null ? 0 : ((String) args[1]).length();
forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
+ tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
}
}
添加資源到ODPS:
Add file file_resource.txt;
Add jar udtfexample1.jar;
Add table table_resource1 as table_resource1;
Add table table_resource2 as table_resource2;
在ODPS中創建UDTF函數(my_udtf):
create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
在odps創建資源表table_resource1、table_resource2, 物理表tmp1。並插入相應的數據。
運行該udtf:
select mp_udtf("10","20") as (a, b, fileResourceLineCount) from table_resource1;
返回:
+-------+------------+-------+
| a | b | fileResourceLineCount |
+-------+------------+-------+
| 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
| 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
+-------+------------+-------+
最後更新:2016-11-23 17:16:04
上一篇:
內建函數-下__SQL_大數據計算服務-阿裏雲
下一篇:
轉義字符__附錄_SQL_大數據計算服務-阿裏雲
修改物理專線屬性__高速通道相關接口_API 參考_雲服務器 ECS-阿裏雲
作業盒子、騰躍校長在線、音樂筆記等獲融資;新東方網與新東方擬出資參與設立霍東投資;阿裏雲發布互聯網大學
產品使用限製__用戶指南_高速通道-阿裏雲
開發者使用引導__開發人員指南_消息服務-阿裏雲
視頻_阿裏雲幫助中心-阿裏雲,領先的雲計算服務提供商
事務消息__最佳實踐_消息服務-阿裏雲
CDN服務如何開啟GZIP壓縮功能___產品使用問題_CDN-阿裏雲
使用入門__Java SDK_STS SDK使用手冊_訪問控製-阿裏雲
資源授權場景__場景示例_Open API_消息隊列 MQ-阿裏雲
子賬號訪問__授權管理_阿裏雲物聯網套件-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲