阅读491 返回首页    go 阿里云


编写UDF__快速开始_大数据计算服务-阿里云

MaxCompute 的 UDF 包括:UDF,UDAF,UDTF三种函数。通常情况下,此三种函数被统称为 UDF。使用Maven的用户可以从Maven库中搜索”odps-sdk-udf”获取不同版本的Java SDK,相关配置信息:

  1. <dependency>
  2. <groupId>com.aliyun.odps</groupId>
  3. <artifactId>odps-sdk-udf</artifactId>
  4. <version>0.20.7-public</version>
  5. </dependency>

备注:

  • UDF目前只支持Java语言接口,用户如果想编写UDF程序,可以通过 添加资源 的方式将UDF代码上传到项目间中,使用 注册函数 语句创建UDF。
  • 本章节中会分别给出UDF,UDAF,UDTF的代码示例,运行UDF的示例请参考:UDF开发插件介绍
  • 如果用户需要使用UDF功能,需要在工单系统上提交申请,提供odps project名称,简单描述使用场景。只有申请通过,开通好权限后才可以创建UDF。

UDF示例

下面我们将给出一个完整的开发UDF流程示例,例如,实现一个字符小写转换功能的UDF,需要经过如下几个步骤:

  • 代码编写:按照 MaxCompute UDF 框架的规定,实现函数功能,并进行编译。下面给出一个简单的代码实现:
  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.udf.UDF;
  3. public final class Lower extends UDF {
  4. public String evaluate(String s) {
  5. if (s == null) { return null; }
  6. return s.toLowerCase();
  7. }
  8. }

将这个jar包命名为”my_lower.jar”。

备注:SDK的使用信息请参考UDF SDK

  • 添加资源:在运行UDF之前,必须指定引用的UDF代码。用户代码通过资源的形式添加到ODPS中。Java UDF必须被打成jar包,以jar资源添加到ODPS中,UDF框架会自动加载jar包,运行用户自定义的UDF。 ODPS MapReduce也用到了资源这一特有概念,MapReduce文档中对资源的使用也有阐述。

执行命令:

  1. add jar my_lower.jar;
  2. -- 如果存在同名的资源请将这个jar包重命名,
  3. -- 并注意修改下面示例命令中相关jar包的名字;
  4. -- 又或者直接使用-f选项覆盖原有的jar资源
  • 注册UDF函数:用户的jar包被上传后,使得 MaxCompute 有条件自动获取用户代码并运行。但此时仍然无法使用这个UDF,因为 MaxCompute 中并没有关于这个UDF的任何信息。因此需要用户在 MaxCompute 中注册一个唯一的函数名,并指定这个函数名与哪个jar资源的哪个函数对应。关于如何注册 UDF,请参考 注册函数 。运行命令:
  1. CREATE FUNCTION test_lower AS org.alidata.odps.udf.examples.Lower USING my_lower.jar;

在sql中使用此函数:

  1. select test_lower('A') from my_test_table;

UDAF示例

UDAF的注册方式与UDF基本相同,使用方式与内建函数中得 聚合函数 相同。下面是一个计算平均值的UDAF的代码示例:

  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.io.LongWritable;
  3. import com.aliyun.odps.io.Text;
  4. import com.aliyun.odps.io.Writable;
  5. import com.aliyun.odps.udf.Aggregator;
  6. import com.aliyun.odps.udf.UDFException;
  7. /**
  8. * project: example_project
  9. * table: wc_in2
  10. * partitions: p2=1,p1=2
  11. * columns: colc,colb,cola
  12. */
  13. public class UDAFExample extends Aggregator {
  14. @Override
  15. public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
  16. LongWritable result = (LongWritable) arg0;
  17. for (Writable item : arg1) {
  18. Text txt = (Text) item;
  19. result.set(result.get() + txt.getLength());
  20. }
  21. }
  22. @Override
  23. public void merge(Writable arg0, Writable arg1) throws UDFException {
  24. LongWritable result = (LongWritable) arg0;
  25. LongWritable partial = (LongWritable) arg1;
  26. result.set(result.get() + partial.get());
  27. }
  28. @Override
  29. public Writable newBuffer() {
  30. return new LongWritable(0L);
  31. }
  32. @Override
  33. public Writable terminate(Writable arg0) throws UDFException {
  34. return arg0;
  35. }
  36. }

UDTF示例

UDTF的注册方式与UDF基本相同,使用方式也与UDF相同。代码示例:

  1. package org.alidata.odps.udtf.examples;
  2. import com.aliyun.odps.udf.UDTF;
  3. import com.aliyun.odps.udf.UDTFCollector;
  4. import com.aliyun.odps.udf.annotation.Resolve;
  5. import com.aliyun.odps.udf.UDFException;
  6. // TODO define input and output types, e.g., "string,string->string,bigint".
  7. @Resolve({"string,bigint->string,bigint"})
  8. public class MyUDTF extends UDTF {
  9. @Override
  10. public void process(Object[] args) throws UDFException {
  11. String a = (String) args[0];
  12. Long b = (Long) args[1];
  13. for (String t: a.split("\s+")) {
  14. forward(t, b);
  15. }
  16. }
  17. }

最后更新:2016-11-23 17:16:04

  上一篇:go 运行SQL__快速开始_大数据计算服务-阿里云
  下一篇:go 编写MapReduce__快速开始_大数据计算服务-阿里云