Storage API 读取
Storage API 是 MaxCompute 提供的高性能数据读取接口,基于 Apache Arrow 列式内存格式,支持列裁剪、谓词下推、分区过滤和并行分片读取,适用于大规模数据处理场景。
备注
Go SDK 暂不支持 Storage API。
前置条件
- 添加
odps-sdk-storage-api模块依赖 - 拥有目标表的读取权限
- SDK 版本 >= 0.52.0
完整示例
- Java
- Python
import com.aliyun.odps.storage.api.MaxStorageClient;
import com.aliyun.odps.storage.api.TableIdentifier;
import com.aliyun.odps.storage.api.read.TableReadSession;
import com.aliyun.odps.storage.api.read.InputSplit;
import com.aliyun.odps.storage.api.read.SplitOptions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import java.util.Arrays;
import java.util.List;
public class StorageApiReadExample {
public static void main(String[] args) throws Exception {
// 1. 创建 MaxStorageClient
MaxStorageClient client = MaxStorageClient.builder()
.endpoint("https://service.cn-hangzhou.maxcompute.aliyun.com/api")
.credentialsProvider(credentialsProvider)
.build();
// 2. 指定表标识
TableIdentifier tableId = TableIdentifier.of("my_project", "user_behavior");
// 3. 创建 TableReadSession,配置列裁剪和过滤条件
TableReadSession session = client.createTableReadSessionBuilder(tableId)
.withColumns(Arrays.asList("user_id", "action", "timestamp"))
.withFilter("action = 'purchase' AND timestamp > '2025-01-01'")
.withSplitOptions(SplitOptions.newBuilder()
.withSplitSize(256 * 1024 * 1024L) // 每个 Split 约 256MB
.build())
.build();
// 4. 获取 Split 列表
List<InputSplit> splits = session.getSplits();
System.out.println("分片数量: " + splits.size());
// 5. 逐个 Split 读取数据
long totalRows = 0;
for (InputSplit split : splits) {
try (ArrowReader reader = session.createReaderBuilder(split).build()) {
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
int rowCount = root.getRowCount();
totalRows += rowCount;
// 按列获取 Vector 进行处理
BigIntVector userIdVector = (BigIntVector) root.getVector("user_id");
VarCharVector actionVector = (VarCharVector) root.getVector("action");
for (int i = 0; i < rowCount; i++) {
long userId = userIdVector.get(i);
String action = new String(actionVector.get(i));
// 处理业务逻辑...
}
}
}
}
System.out.println("总读取行数: " + totalRows);
}
}
from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiArrowClient
o = ODPS(access_id, secret_access_key, project='my_project', endpoint=endpoint)
table = o.get_table("my_table")
client = StorageApiArrowClient(o, table)
# 创建读取会话
resp = client.create_read_session(
required_data_columns=["id", "name"],
required_partitions=["dt=20231001"],
)
# 按分片并行读取
for split_index in range(resp.splits_count):
reader = client.read_rows_arrow(resp.session_id, split_index=split_index)
while True:
batch = reader.read()
if batch is None:
break
df = batch.to_pandas()
print(df)
代码说明
- 创建客户端:通过
MaxStorageClient.builder()配置 Endpoint 和认证信息,构建客户端实例。 - 指定表标识:使用
TableIdentifier.of(project, table)标识目标表。 - 构建 ReadSession:通过 Builder 配置读取参数(列裁剪、过滤条件、分片策略等),调用
build()创建 Session。 - 获取 Split 列表:
session.getSplits()返回所有可并行读取的数据分片。 - 读取 Arrow 数据:对每个 Split 创建
ArrowReader,循环调用loadNextBatch()读取数据批次,通过VectorSchemaRoot访问列式数据。