阅读761 返回首页    go 阿里云


Java UDF__UDF_SQL_大数据计算服务-阿里云

实现UDF需要继承com.aliyun.odps.udf.UDF类,并实现evaluate方法。evaluate方法必须是非static的public方法。Evaluate方法的参数和返回值类型将作为SQL中UDF的函数签名。这意味着用户可以在UDF中实现多个evaluate方法,在调用UDF时,框架会依据UDF调用的参数类型匹配正确的evaluate方法。

下面是一个UDF的例子。

  1. package org.alidata.odps.udf.examples;
  2. import com.aliyun.odps.udf.UDF;
  3. public final class Lower extends UDF {
  4. public String evaluate(String s) {
  5. if (s == null) { return null; }
  6. return s.toLowerCase();
  7. }
  8. }

可以通过实现void setup(ExecutionContext ctx)void close()来分别实现UDF的初始化和结束代码。

UDF的使用方式于ODPS SQL中普通的内建函数相同,详情请参考内建函数

UDAF

实现Java UDAF类需要继承com.aliyun.odps.udf.Aggregator,并实现如下几个接口:

  1. public abstract class Aggregator implements ContextFunction {
  2. @Override
  3. public void setup(ExecutionContext ctx) throws UDFException {
  4. }
  5. @Override
  6. public void close() throws UDFException {
  7. }
  8. /**
  9. * 创建聚合Buffer
  10. * @return Writable 聚合buffer
  11. */
  12. abstract public Writable newBuffer();
  13. /**
  14. * @param buffer 聚合buffer
  15. * @param args SQL中调用UDAF时指定的参数
  16. * @throws UDFException
  17. */
  18. abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
  19. /**
  20. * 生成最终结果
  21. * @param buffer
  22. * @return Object UDAF的最终结果
  23. * @throws UDFException
  24. */
  25. abstract public Writable terminate(Writable buffer) throws UDFException;
  26. abstract public void merge(Writable buffer, Writable partial) throws UDFException;
  27. }

其中最重要的是iterate,merge和terminate三个接口,UDAF的主要逻辑依赖于这三个接口的实现。此外,还需要用户实现自定义的Writable buffer。以实现求平均值avg为例,下图简要说明了在ODPS UDAF中这一函数的实现逻辑及计算流程:

在上图中,输入数据被按照一定的大小进行分片(有关分片的描述可参考 MapReduce ),每片的大小适合一个worker在适当的时间内完成。这个分片大小的设置需要用户手动配置完成。UDAF的计算过程分为两阶段:

  • 在第一阶段,每个worker统计分片内数据的个数及汇总值,我们可以将每个分片内的数据个数及汇总值视为一个中间结果;
  • 在第二阶段,worker汇总上一个阶段中每个分片内的信息。在最终输出时,r.sum / r.count即是所有输入数据的平均值;

下面是一个计算平均值的UDAF的代码示例:

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import com.aliyun.odps.io.DoubleWritable;
  5. import com.aliyun.odps.io.Writable;
  6. import com.aliyun.odps.udf.Aggregator;
  7. import com.aliyun.odps.udf.UDFException;
  8. import com.aliyun.odps.udf.annotation.Resolve;
  9. @Resolve({"double->double"})
  10. public class AggrAvg extends Aggregator {
  11. private static class AvgBuffer implements Writable {
  12. private double sum = 0;
  13. private long count = 0;
  14. @Override
  15. public void write(DataOutput out) throws IOException {
  16. out.writeDouble(sum);
  17. out.writeLong(count);
  18. }
  19. @Override
  20. public void readFields(DataInput in) throws IOException {
  21. sum = in.readDouble();
  22. count = in.readLong();
  23. }
  24. }
  25. private DoubleWritable ret = new DoubleWritable();
  26. @Override
  27. public Writable newBuffer() {
  28. return new AvgBuffer();
  29. }
  30. @Override
  31. public void iterate(Writable buffer, Writable[] args) throws UDFException {
  32. DoubleWritable arg = (DoubleWritable) args[0];
  33. AvgBuffer buf = (AvgBuffer) buffer;
  34. if (arg != null) {
  35. buf.count += 1;
  36. buf.sum += arg.get();
  37. }
  38. }
  39. @Override
  40. public Writable terminate(Writable buffer) throws UDFException {
  41. AvgBuffer buf = (AvgBuffer) buffer;
  42. if (buf.count == 0) {
  43. ret.set(0);
  44. } else {
  45. ret.set(buf.sum / buf.count);
  46. }
  47. return ret;
  48. }
  49. @Override
  50. public void merge(Writable buffer, Writable partial) throws UDFException {
  51. AvgBuffer buf = (AvgBuffer) buffer;
  52. AvgBuffer p = (AvgBuffer) partial;
  53. buf.sum += p.sum;
  54. buf.count += p.count;
  55. }
  56. }

注意:

  • UDAF在SQL中的使用语法与普通的内建聚合函数相同,详情请参考 聚合函数
  • 关于如何运行UDTF,方法与UDF类似,具体步骤请参考运行UDF

UDTF

Java UDTF需要继承com.aliyun.odps.udf.UDTF类。这个类需要实现4个接口。

接口定义 描述
public void setup(ExecutionContext ctx) throws UDFException 初始化方法,在UDTF处理输入数据前,调用用户自定义的初始化行为。在每个Worker内setup会被先调用一次。
public void process(Object[] args) throws UDFException 这个方法由框架调用,SQL中每一条记录都会对应调用一次process,process的参数为SQL语句中指定的UDTF输入参数。输入参数以Object[]的形式传入,输出结果通过forward函数输出。用户需要在process函数内自行调用forward,以决定输出数据。
public void close() throws UDFException UDTF的结束方法,此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后。
public void forward(Object …o) throws UDFException 用户调用forward方法输出数据,每次forward代表输出一条记录。对应SQL语句UDTF的as子句指定的列。

下面将给出一个UDTF程序示例:

  1. package org.alidata.odps.udtf.examples;
  2. import com.aliyun.odps.udf.UDTF;
  3. import com.aliyun.odps.udf.UDTFCollector;
  4. import com.aliyun.odps.udf.annotation.Resolve;
  5. import com.aliyun.odps.udf.UDFException;
  6. // TODO define input and output types, e.g., "string,string->string,bigint".
  7. @Resolve({"string,bigint->string,bigint"})
  8. public class MyUDTF extends UDTF {
  9. @Override
  10. public void process(Object[] args) throws UDFException {
  11. String a = (String) args[0];
  12. Long b = (Long) args[1];
  13. for (String t: a.split("\s+")) {
  14. forward(t, b);
  15. }
  16. }
  17. }

注:

  • 以上只是程序示例,关于如何在ODPS中运行UDTF,方法与UDF类似,具体实现步骤请参考运行UDF

在SQL中可以这样使用这个UDTF,假设在ODPS上创建UDTF时注册函数名为user_udtf:

  1. select user_udtf(col0, col1) as (c0, c1) from my_table;

假设my_table的col0, col1的值为:

  1. +------+------+
  2. | col0 | col1 |
  3. +------+------+
  4. | A B | 1 |
  5. | C D | 2 |
  6. +------+------+

则select出的结果为:

  1. +----+----+
  2. | c0 | c1 |
  3. +----+----+
  4. | A | 1 |
  5. | B | 1 |
  6. | C | 2 |
  7. | D | 2 |
  8. +----+----+

使用说明

UDTF在SQL中的常用方式如下:

  1. select user_udtf(col0, col1, col2) as (c0, c1) from my_table;
  2. select user_udtf(col0, col1, col2) as (c0, c1) from
  3. (select * from my_table distribute by key sort by key) t;
  4. select reduce_udtf(col0, col1, col2) as (c0, c1) from
  5. (select col0, col1, col2 from
  6. (select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1
  7. distribute by col0 sort by col0, col1) t2;

但使用UDTF有如下使用限制:

  • 同一个SELECT子句中不允许有其他表达式
  1. select value, user_udtf(key) as mycol ...
  • UDTF不能嵌套使用
  1. select user_udtf1(user_udtf2(key)) as mycol...
  • 不支持在同一个select子句中与 group by / distribute by / sort by 联用
  1. select user_udtf(key) as mycol ... group by mycol

其他UDTF示例

在UDTF中,用户可以读取ODPS的 资源 。下面将介绍利用udtf读取ODPS资源的示例。

  1. 编写udtf程序,编译成功后导出jar包(udtfexample1.jar)。

    1. package com.aliyun.odps.examples.udf;
    2. import java.io.BufferedReader;
    3. import java.io.IOException;
    4. import java.io.InputStream;
    5. import java.io.InputStreamReader;
    6. import java.util.Iterator;
    7. import com.aliyun.odps.udf.ExecutionContext;
    8. import com.aliyun.odps.udf.UDFException;
    9. import com.aliyun.odps.udf.UDTF;
    10. import com.aliyun.odps.udf.annotation.Resolve;
    11. /**
    12. * project: example_project
    13. * table: wc_in2
    14. * partitions: p2=1,p1=2
    15. * columns: colc,colb
    16. */
    17. @Resolve({ "string,string->string,bigint,string" })
    18. public class UDTFResource extends UDTF {
    19. ExecutionContext ctx;
    20. long fileResourceLineCount;
    21. long tableResource1RecordCount;
    22. long tableResource2RecordCount;
    23. @Override
    24. public void setup(ExecutionContext ctx) throws UDFException {
    25. this.ctx = ctx;
    26. try {
    27. InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
    28. BufferedReader br = new BufferedReader(new InputStreamReader(in));
    29. String line;
    30. fileResourceLineCount = 0;
    31. while ((line = br.readLine()) != null) {
    32. fileResourceLineCount++;
    33. }
    34. br.close();
    35. Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
    36. tableResource1RecordCount = 0;
    37. while (iterator.hasNext()) {
    38. tableResource1RecordCount++;
    39. iterator.next();
    40. }
    41. iterator = ctx.readResourceTable("table_resource2").iterator();
    42. tableResource2RecordCount = 0;
    43. while (iterator.hasNext()) {
    44. tableResource2RecordCount++;
    45. iterator.next();
    46. }
    47. } catch (IOException e) {
    48. throw new UDFException(e);
    49. }
    50. }
    51. @Override
    52. public void process(Object[] args) throws UDFException {
    53. String a = (String) args[0];
    54. long b = args[1] == null ? 0 : ((String) args[1]).length();
    55. forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
    56. + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
    57. }
    58. }
  2. 添加资源到ODPS:

    1. Add file file_resource.txt;
    2. Add jar udtfexample1.jar;
    3. Add table table_resource1 as table_resource1;
    4. Add table table_resource2 as table_resource2;
  3. 在ODPS中创建UDTF函数(my_udtf):

    1. create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
  4. 在odps创建资源表table_resource1、table_resource2, 物理表tmp1。并插入相应的数据。

  5. 运行该udtf:

  1. select mp_udtf("10","20") as (a, b, fileResourceLineCount) from table_resource1;
  2. 返回:
  3. +-------+------------+-------+
  4. | a | b | fileResourceLineCount |
  5. +-------+------------+-------+
  6. | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
  7. | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
  8. +-------+------------+-------+

最后更新:2016-11-23 17:16:04

  上一篇:go 内建函数-下__SQL_大数据计算服务-阿里云
  下一篇:go 转义字符__附录_SQL_大数据计算服务-阿里云