閱讀491 返回首頁    go iPhone_iPad_Mac_手機_平板_蘋果apple


編寫UDF__快速開始_大數據計算服務-阿裏雲

MaxCompute 的 UDF 包括:UDF,UDAF,UDTF三種函數。通常情況下,此三種函數被統稱為 UDF。使用Maven的用戶可以從Maven庫中搜索”odps-sdk-udf”獲取不同版本的Java SDK,相關配置信息:

  1. <dependency>
  2. <groupId>com.aliyun.odps</groupId>
  3. <artifactId>odps-sdk-udf</artifactId>
  4. <version>0.20.7-public</version>
  5. </dependency>

備注:

  • UDF目前隻支持Java語言接口,用戶如果想編寫UDF程序,可以通過 添加資源 的方式將UDF代碼上傳到項目間中,使用 注冊函數 語句創建UDF。
  • 本章節中會分別給出UDF,UDAF,UDTF的代碼示例,運行UDF的示例請參考:UDF開發插件介紹
  • 如果用戶需要使用UDF功能,需要在工單係統上提交申請,提供odps project名稱,簡單描述使用場景。隻有申請通過,開通好權限後才可以創建UDF。

UDF示例

下麵我們將給出一個完整的開發UDF流程示例,例如,實現一個字符小寫轉換功能的UDF,需要經過如下幾個步驟:

  • 代碼編寫:按照 MaxCompute UDF 框架的規定,實現函數功能,並進行編譯。下麵給出一個簡單的代碼實現:
  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.udf.UDF;
  3. public final class Lower extends UDF {
  4. public String evaluate(String s) {
  5. if (s == null) { return null; }
  6. return s.toLowerCase();
  7. }
  8. }

將這個jar包命名為”my_lower.jar”。

備注:SDK的使用信息請參考UDF SDK

  • 添加資源:在運行UDF之前,必須指定引用的UDF代碼。用戶代碼通過資源的形式添加到ODPS中。Java UDF必須被打成jar包,以jar資源添加到ODPS中,UDF框架會自動加載jar包,運行用戶自定義的UDF。 ODPS MapReduce也用到了資源這一特有概念,MapReduce文檔中對資源的使用也有闡述。

執行命令:

  1. add jar my_lower.jar;
  2. -- 如果存在同名的資源請將這個jar包重命名,
  3. -- 並注意修改下麵示例命令中相關jar包的名字;
  4. -- 又或者直接使用-f選項覆蓋原有的jar資源
  • 注冊UDF函數:用戶的jar包被上傳後,使得 MaxCompute 有條件自動獲取用戶代碼並運行。但此時仍然無法使用這個UDF,因為 MaxCompute 中並沒有關於這個UDF的任何信息。因此需要用戶在 MaxCompute 中注冊一個唯一的函數名,並指定這個函數名與哪個jar資源的哪個函數對應。關於如何注冊 UDF,請參考 注冊函數 。運行命令:
  1. CREATE FUNCTION test_lower AS org.alidata.odps.udf.examples.Lower USING my_lower.jar;

在sql中使用此函數:

  1. select test_lower('A') from my_test_table;

UDAF示例

UDAF的注冊方式與UDF基本相同,使用方式與內建函數中得 聚合函數 相同。下麵是一個計算平均值的UDAF的代碼示例:

  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.io.LongWritable;
  3. import com.aliyun.odps.io.Text;
  4. import com.aliyun.odps.io.Writable;
  5. import com.aliyun.odps.udf.Aggregator;
  6. import com.aliyun.odps.udf.UDFException;
  7. /**
  8. * project: example_project
  9. * table: wc_in2
  10. * partitions: p2=1,p1=2
  11. * columns: colc,colb,cola
  12. */
  13. public class UDAFExample extends Aggregator {
  14. @Override
  15. public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
  16. LongWritable result = (LongWritable) arg0;
  17. for (Writable item : arg1) {
  18. Text txt = (Text) item;
  19. result.set(result.get() + txt.getLength());
  20. }
  21. }
  22. @Override
  23. public void merge(Writable arg0, Writable arg1) throws UDFException {
  24. LongWritable result = (LongWritable) arg0;
  25. LongWritable partial = (LongWritable) arg1;
  26. result.set(result.get() + partial.get());
  27. }
  28. @Override
  29. public Writable newBuffer() {
  30. return new LongWritable(0L);
  31. }
  32. @Override
  33. public Writable terminate(Writable arg0) throws UDFException {
  34. return arg0;
  35. }
  36. }

UDTF示例

UDTF的注冊方式與UDF基本相同,使用方式也與UDF相同。代碼示例:

  1. package org.alidata.odps.udtf.examples;
  2. import com.aliyun.odps.udf.UDTF;
  3. import com.aliyun.odps.udf.UDTFCollector;
  4. import com.aliyun.odps.udf.annotation.Resolve;
  5. import com.aliyun.odps.udf.UDFException;
  6. // TODO define input and output types, e.g., "string,string->string,bigint".
  7. @Resolve({"string,bigint->string,bigint"})
  8. public class MyUDTF extends UDTF {
  9. @Override
  10. public void process(Object[] args) throws UDFException {
  11. String a = (String) args[0];
  12. Long b = (Long) args[1];
  13. for (String t: a.split("\s+")) {
  14. forward(t, b);
  15. }
  16. }
  17. }

最後更新:2016-11-23 17:16:04

  上一篇:go 運行SQL__快速開始_大數據計算服務-阿裏雲
  下一篇:go 編寫MapReduce__快速開始_大數據計算服務-阿裏雲