跳到主要内容

UDF 开发模块

odps-sdk-udf 是 MaxCompute 用户自定义函数的开发支持模块,提供 UDF(标量函数)、UDTF(表值函数)、UDAF(聚合函数)的基类和运行时接口。该模块还支持向量化(Vectorized)批处理模式,通过基于 Apache Arrow 的批量数据处理显著提升执行性能。

Maven 依赖

<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>${odps.sdk.version}</version>
<scope>provided</scope>
</dependency>

注意:UDF 开发时使用 provided scope,因为运行时环境由 MaxCompute 计算集群提供。

核心功能

功能说明
标量函数 (UDF)一对一映射,输入一行输出一个值
表值函数 (UDTF)一对多映射,输入一行可输出多行
聚合函数 (UDAF)多对一映射,对多行数据进行聚合计算
向量化处理基于 Arrow 的批处理模式,减少函数调用开销
自定义 Extractor自定义外部数据源读取逻辑
自定义 Outputer自定义外部数据源写入逻辑
数据分片通过 InputSplitter 自定义数据分片策略

核心类

说明
UDF标量函数基类,实现 evaluate 方法定义计算逻辑
UDTF表值函数基类,通过 process 方法处理输入并通过 forward 输出多行
Aggregator聚合函数基类,实现 iterate/merge/terminate 定义聚合逻辑
VectorizedExtractor向量化数据读取器,批量处理 Arrow 格式数据
VectorizedOutputer向量化数据写入器,批量输出 Arrow 格式数据
VectorizedStorageHandler向量化存储处理器,组合 Extractor 和 Outputer
InputSplitter数据分片接口,自定义外部数据的分片策略
ExecutionContext运行时上下文,提供任务参数和资源访问能力

配置

与 Core 模块的关系

UDF 模块专注于函数开发,不依赖 odps-sdk-core。函数的注册和管理(如 CREATE FUNCTION)通过 Core 模块的 Function 类完成:

// 使用 Core 模块注册 UDF(非 UDF 开发代码)
odps.functions().create(functionInfo);

开发环境配置

UDF 开发项目的典型依赖配置:

<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>${odps.sdk.version}</version>
<scope>provided</scope>
</dependency>
<!-- 向量化 UDF 需要 Arrow 依赖 -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
</dependencies>

使用示例

标量函数 (UDF)

import com.aliyun.odps.udf.UDF;

public class MyPlus extends UDF {
public Long evaluate(Long a, Long b) {
if (a == null || b == null) {
return null;
}
return a + b;
}
}

注册并使用:

CREATE FUNCTION my_plus AS 'com.example.MyPlus' USING 'my_udf.jar';
SELECT my_plus(col1, col2) FROM my_table;

表值函数 (UDTF)

import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;

@Resolve({"string->string,string"})
public class SplitKV extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
String input = (String) args[0];
if (input == null) return;
String[] parts = input.split("=", 2);
if (parts.length == 2) {
forward(parts[0], parts[1]);
}
}
}

聚合函数 (UDAF)

import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.Writable;

@Resolve({"bigint->bigint"})
public class MySum extends Aggregator {
@Override
public Writable newBuffer() {
return new LongWritable(0L);
}

@Override
public void iterate(Writable buffer, Writable[] args) {
LongWritable sum = (LongWritable) buffer;
LongWritable value = (LongWritable) args[0];
if (value != null) {
sum.set(sum.get() + value.get());
}
}

@Override
public void merge(Writable buffer, Writable partial) {
LongWritable sum = (LongWritable) buffer;
LongWritable partialSum = (LongWritable) partial;
sum.set(sum.get() + partialSum.get());
}

@Override
public Writable terminate(Writable buffer) {
return buffer;
}
}

向量化处理

向量化模式通过批量处理 Arrow 格式数据来提升性能,适用于大数据量场景:

import com.aliyun.odps.udf.VectorizedExtractor;
import org.apache.arrow.vector.VectorSchemaRoot;

public class MyVectorizedExtractor extends VectorizedExtractor {
@Override
public VectorSchemaRoot extract() throws IOException {
// 批量读取并返回 Arrow 格式数据
// 返回 null 表示数据读取完毕
return root;
}
}

相关文档