函数管理
MaxCompute 支持用户自定义函数(UDF),包括 UDF(标量函数)、UDTF(表值函数)和 UDAF(聚合函数)。通过 Java SDK 的 Functions 接口,可以对函数进行创建、查询、更新和删除等操作。
前置条件
完整示例
以下示例演示如何从已上传的 JAR 资源创建一个 UDF 函数,并对其进行查询和删除操作:
- Java
- Python
- Go
import com.aliyun.odps.Function;
import com.aliyun.odps.Functions;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.AliyunAccount;
import java.util.Arrays;
import java.util.Iterator;
public class FunctionManagementExample {
public static void main(String[] args) throws OdpsException {
// 初始化 Odps 客户端
Odps odps = new Odps(new AliyunAccount("accessId", "accessKey"));
odps.setDefaultProject("my_project");
odps.setEndpoint("http://service.odps.aliyun.com/api");
Functions functions = odps.functions();
// 1. 创建函数
Function function = new Function();
function.setName("my_upper_udf");
function.setClassPath("com.example.udf.UpperCase");
function.setResources(Arrays.asList("my-udf-lib.jar"));
functions.create(function);
System.out.println("函数创建成功: my_upper_udf");
// 2. 检查函数是否存在
boolean exists = functions.exists("my_upper_udf");
System.out.println("函数是否存在: " + exists);
// 3. 获取函数详情
Function loaded = functions.get("my_upper_udf");
loaded.reload();
System.out.println("函数类路径: " + loaded.getClassPath());
System.out.println("关联资源: " + loaded.getResourceNames());
// 4. 遍历所有函数
System.out.println("项目中的所有函数:");
for (Function f : functions.iterable()) {
System.out.println(" - " + f.getName());
}
// 5. 更新函数(例如更换 JAR 资源版本)
Function updated = new Function();
updated.setName("my_upper_udf");
updated.setClassPath("com.example.udf.UpperCase");
updated.setResources(Arrays.asList("my-udf-lib-v2.jar"));
functions.update(updated);
System.out.println("函数更新成功");
// 6. 删除函数
functions.delete("my_upper_udf");
System.out.println("函数删除成功");
}
}
from odps import ODPS
# 初始化 ODPS 客户端
odps = ODPS('access_id', 'access_key', 'my_project', 'http://service.odps.aliyun.com/api')
# 1. 创建函数
odps.create_function(
'my_upper_udf',
class_type='com.example.udf.UpperCase',
resources=['my-udf-lib.jar']
)
print('函数创建成功: my_upper_udf')
# 2. 检查函数是否存在
exists = odps.exist_function('my_upper_udf')
print('函数是否存在:', exists)
# 3. 获取函数详情
func = odps.get_function('my_upper_udf')
func.reload()
print('函数类路径:', func.class_type)
print('关联资源:', [r.name for r in func.resources])
# 4. 遍历所有函数
print('项目中的所有函数:')
for f in odps.list_functions():
print(f' - {f.name}')
# 5. 更新函数(例如更换 JAR 资源版本)
func = odps.get_function('my_upper_udf')
func.resources = ['my-udf-lib-v2.jar']
func.update()
print('函数更新成功')
# 6. 删除函数
odps.delete_function('my_upper_udf')
print('函数删除成功')
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// 初始化 ODPS 客户端
acc := account.NewAliyunAccount("accessId", "accessKey")
odpsIns := odps.NewOdps(acc, "http://service.odps.aliyun.com/api")
odpsIns.SetDefaultProjectName("my_project")
functions := odps.NewFunctions(odpsIns)
// 1. 创建函数
fb := odps.NewFunctionBuilder()
function := fb.Name("my_upper_udf").
ClassPath("com.example.udf.UpperCase").
Resources([]string{"my-udf-lib.jar"}).
Build()
err := functions.Create("", "", function)
if err != nil {
panic(err)
}
fmt.Println("函数创建成功: my_upper_udf")
// 2. 检查函数是否存在
f, _ := functions.Get("my_upper_udf")
exists, err := f.Exist()
if err != nil {
panic(err)
}
fmt.Println("函数是否存在:", exists)
// 3. 获取函数详情
f, _ = functions.Get("my_upper_udf")
err = f.Load()
if err != nil {
panic(err)
}
fmt.Println("函数类路径:", f.ClassPath())
fmt.Println("关联资源:", f.Resources())
// 4. 更新函数(例如更换 JAR 资源版本)
f.SetClassPath("com.example.udf.UpperCase")
f.SetResources([]string{"my-udf-lib-v2.jar"})
err = functions.Update("", "", *f)
if err != nil {
panic(err)
}
fmt.Println("函数更新成功")
// 5. 删除函数
err = functions.Delete("my_upper_udf")
if err != nil {
panic(err)
}
fmt.Println("函数删除成功")
}
信息
Go SDK 暂不支持遍历函数列表,如需遍历请使用 Java 或 Python SDK。
代码说明
创建函数
创建函数时需要指定三个关键属性:
name:函数名称,在项目中唯一classPath:UDF 实现类的全限定类名resources:函数依赖的资源列表(通常是 JAR 包资源名)
- Java
- Python
- Go
Function function = new Function();
function.setName("my_upper_udf");
function.setClassPath("com.example.udf.UpperCase");
function.setResources(Arrays.asList("my-udf-lib.jar"));
functions.create(function);
odps.create_function(
'my_upper_udf',
class_type='com.example.udf.UpperCase',
resources=['my-udf-lib.jar']
)
fb := odps.NewFunctionBuilder()
function := fb.Name("my_upper_udf").
ClassPath("com.example.udf.UpperCase").
Resources([]string{"my-udf-lib.jar"}).
Build()
functions.Create("", "", function)
支持在指定项目或 schema 中创建:
- Java
- Python
- Go
// 在指定项目中创建
functions.create("target_project", function);
// 在指定项目和 schema 中创建
functions.create("target_project", "my_schema", function);
# 在指定项目中创建
odps.create_function('my_upper_udf', class_type='com.example.udf.UpperCase',
resources=['my-udf-lib.jar'], project='target_project')
# 在指定项目和 schema 中创建
odps.create_function('my_upper_udf', class_type='com.example.udf.UpperCase',
resources=['my-udf-lib.jar'], project='target_project', schema='my_schema')
// 在指定项目中创建
functions.Create("target_project", "", function)
// 在指定项目和 schema 中创建
functions.Create("target_project", "my_schema", function)
获取函数
- Java
- Python
- Go
// 从默认项目获取
Function f = functions.get("function_name");
// 从指定项目获取
Function f = functions.get("project_name", "function_name");
// 从指定项目和 schema 获取
Function f = functions.get("project_name", "schema_name", "function_name");
# 从默认项目获取
f = odps.get_function('function_name')
# 从指定项目获取
f = odps.get_function('function_name', project='project_name')
# 从指定项目和 schema 获取
f = odps.get_function('function_name', project='project_name', schema='schema_name')
// 从默认项目获取
f, err := functions.Get("function_name")
if err != nil {
panic(err)
}
// 加载函数详情
err = f.Load()
信息
Go SDK 的 Functions.Get 仅支持获取当前项目中的函数。如需操作其他项目,请创建对应项目的 Functions 实例。
获取到的 Function 对象是延迟加载的,访问详细属性前需要调用 reload()。
遍历函数
- Java
- Python
- Go
// 迭代器方式
Iterator<Function> it = functions.iterator();
while (it.hasNext()) {
Function f = it.next();
System.out.println(f.getName());
}
// for-each 方式
for (Function f : functions.iterable()) {
System.out.println(f.getName());
}
# 遍历所有函数
for f in odps.list_functions():
print(f.name)
# 按前缀过滤
for f in odps.list_functions(prefix='my_'):
print(f.name)
信息
Go SDK 暂不支持遍历函数列表,如需遍历请使用 Java 或 Python SDK。
删除函数
- Java
- Python
- Go
// 删除默认项目中的函数
functions.delete("function_name");
// 删除指定项目中的函数
functions.delete("project_name", "function_name");
// 删除指定项目和 schema 中的函数
functions.delete("project_name", "schema_name", "function_name");
# 删除默认项目中的函数
odps.delete_function('function_name')
# 删除指定项目中的函数
odps.delete_function('function_name', project='project_name')
# 删除指定项目和 schema 中的函数
odps.delete_function('function_name', project='project_name', schema='schema_name')
// 删除默认项目中的函数
err := functions.Delete("function_name")
信息
Go SDK 的 Functions.Delete 仅支持删除当前项目中的函数。如需操作其他项目,请创建 对应项目的 Functions 实例。
函数类型
MaxCompute 支持三种自定义函数类型:
| 类型 | 基类 | 说明 | 输入/输出 |
|---|---|---|---|
| UDF | com.aliyun.odps.udf.UDF | 标量函数 | 一行输入,一个值输出 |
| UDTF | com.aliyun.odps.udf.UDTF | 表值函数 | 一行输入,多行输出 |
| UDAF | com.aliyun.odps.udf.Aggregator | 聚合函数 | 多行输入,一个值输出 |
三种类型的函数在注册时使用相同的 API,区别仅在于 classPath 指向的实现类不同。
配置选项
Function 属性
| 属性 | 类型 | 必填 | 说明 |
|---|---|---|---|
| name | String | Y | 函数名称 |
| classPath | String | Y | 实现类的全限定名 |
| resources | List<String> | Y | 依赖的资源名称列表 |
| comment | String | N | 函数描述信息 |