Storage API 高性能写入
Storage API 基于 Apache Arrow 列式格式提供最高性能的数据写入能力,支持 Batch(批量)和 Streaming(流式)两种模式,适合大规模数据导入和高频实时写入场景。
备注
Go SDK 暂不支持 Storage API。
前置条件
- 已添加
odps-sdk-storage-api依赖 - 已初始化
MaxStorageClient实例 - 目标表已存在
- 项目中已引入 Apache Arrow 相关依赖
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-storage-api</artifactId>
<version>0.52.0</version>
</dependency>
完整示例
Batch 模式(批量写入)
Batch 模式需要显式调用 commit() 后数据才对外可见,支持事务回滚,适合需要原子性保证的场景。
- Java
- Python
import com.aliyun.odps.storage.api.MaxStorageClient;
import com.aliyun.odps.storage.api.TableIdentifier;
import com.aliyun.odps.storage.api.write.ArrowWriter;
import com.aliyun.odps.storage.api.write.TableArrowWriter;
import com.aliyun.odps.storage.api.write.TableWriteSession;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
public class StorageApiBatchWriteExample {
public static void main(String[] args) throws Exception {
// 1. 创建 MaxStorageClient
MaxStorageClient client = MaxStorageClient.builder()
.endpoint("https://service.cn-hangzhou.maxcompute.aliyun.com/api")
.credentialsProvider(credentialsProvider)
.build();
// 2. 指定目标表
TableIdentifier tableId = TableIdentifier.of("my_project", "my_table");
// 3. 创建 TableWriteSession(Batch 模式,默认)
try (TableWriteSession session = client.createTableWriteSessionBuilder(tableId)
.build()) {
// 4. 创建 ArrowWriter
try (ArrowWriter writer = session.createWriterBuilder("stream-1", 1).build()) {
TableArrowWriter arrowWriter = (TableArrowWriter) writer;
// 5. 创建 VectorSchemaRoot 并填充数据
try (VectorSchemaRoot root = arrowWriter.createVectorSchemaRoot()) {
root.allocateNew();
VarCharVector nameVector = (VarCharVector) root.getVector("name");
BigIntVector ageVector = (BigIntVector) root.getVector("age");
for (int i = 0; i < 10000; i++) {
nameVector.setSafe(i, ("user_" + i).getBytes());
ageVector.setSafe(i, 20 + i % 50);
}
root.setRowCount(10000);
// 6. 写入数据批次
writer.writeBatch(root);
}
// 7. flush 数据到服务端
writer.flush();
}
// 8. 提交事务,数据变为可见
session.commit();
System.out.println("Batch write committed successfully.");
}
}
}
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
table = o.get_table("my_table")
client = StorageApiArrowClient(o, table)
# 创建写入会话
resp = client.create_write_session()
session_id = resp.session_id
# 写入数据
batch = pa.record_batch(
[pa.array([1, 2, 3]), pa.array(["a", "b", "c"])],
schema=pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.string())])
)
writer = client.write_rows_arrow(session_id, stream_id=0, record_count=3)
writer.write(batch)
commit_msg, success = writer.finish()
# 提交
client.commit_write_session(session_id)
Streaming 模式(流式写入)
Streaming 模式下数据 flush 后立即可见,无需调用 commit(),适合实时写入场景。
- Java
- Python
import com.aliyun.odps.storage.api.MaxStorageClient;
import com.aliyun.odps.storage.api.TableIdentifier;
import com.aliyun.odps.storage.api.write.ArrowWriter;
import com.aliyun.odps.storage.api.write.TableArrowWriter;
import com.aliyun.odps.storage.api.write.TableWriteSession;
import com.aliyun.odps.storage.api.write.WriteMode;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
public class StorageApiStreamingWriteExample {
public static void main(String[] args) throws Exception {
// 1. 创建 MaxStorageClient
MaxStorageClient client = MaxStorageClient.builder()
.endpoint("https://service.cn-hangzhou.maxcompute.aliyun.com/api")
.credentialsProvider(credentialsProvider)
.build();
// 2. 指定目标表
TableIdentifier tableId = TableIdentifier.of("my_project", "my_table");
// 3. 创建 TableWriteSession(Streaming 模式)
try (TableWriteSession session = client.createTableWriteSessionBuilder(tableId)
.withWriteMode(WriteMode.STREAMING)
.build()) {
// 4. 创建 ArrowWriter
try (ArrowWriter writer = session.createWriterBuilder("stream-1", 1).build()) {
TableArrowWriter arrowWriter = (TableArrowWriter) writer;
// 5. 循环写入,每批 flush 后立即可见
for (int batch = 0; batch < 10; batch++) {
try (VectorSchemaRoot root = arrowWriter.createVectorSchemaRoot()) {
root.allocateNew();
VarCharVector nameVector = (VarCharVector) root.getVector("name");
BigIntVector ageVector = (BigIntVector) root.getVector("age");
for (int i = 0; i < 1000; i++) {
nameVector.setSafe(i, ("batch_" + batch + "_user_" + i).getBytes());
ageVector.setSafe(i, 20 + i % 50);
}
root.setRowCount(1000);
writer.writeBatch(root);
}
// flush 后数据立即可见
writer.flush();
System.out.println("Batch " + batch + " flushed and visible.");
}
}
// Streaming 模式无需 commit
}
}
}
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
table = o.get_table("my_table")
client = StorageApiArrowClient(o, table)
# 创建写入会话
resp = client.create_write_session()
session_id = resp.session_id
# 写入数据
batch = pa.record_batch(
[pa.array([1, 2, 3]), pa.array(["a", "b", "c"])],
schema=pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.string())])
)
writer = client.write_rows_arrow(session_id, stream_id=0, record_count=3)
writer.write(batch)
commit_msg, success = writer.finish()
# 提交
client.commit_write_session(session_id)
代码说明
两种写入模式
| 特性 | Batch 模式 | Streaming 模式 |
|---|---|---|
| 数据可见时机 | commit() 后 | flush() 后 |
| 事务支持 | 支持(可 abort 回滚) | 不支持(flush 后不可回滚) |
| 是否需要 commit | 必须 | 无需(空操作) |
| 适用场景 | ETL、批量导入 | 实时写入、流处理 |
Arrow VectorSchemaRoot 的创建和填充
VectorSchemaRoot 是 Arrow 格式的核心数据容器,代表一批列式数据:
- Java
- Python
TableArrowWriter arrowWriter = (TableArrowWriter) writer;
// 创建与表 Schema 匹配的 VectorSchemaRoot
try (VectorSchemaRoot root = arrowWriter.createVectorSchemaRoot()) {
// 分配内存
root.allocateNew();
// 获取列向量(按列名)
VarCharVector nameCol = (VarCharVector) root.getVector("name");
BigIntVector ageCol = (BigIntVector) root.getVector("age");
// 逐行填充数据
int rowCount = 5000;
for (int i = 0; i < rowCount; i++) {
nameCol.setSafe(i, ("value_" + i).getBytes());
ageCol.setSafe(i, (long) i);
}
// 设置实际行数
root.setRowCount(rowCount);
// 写入批次
writer.writeBatch(root);
}
import pyarrow as pa
# 使用 PyArrow 构建 RecordBatch
batch = pa.record_batch(
[
pa.array([f"value_{i}" for i in range(5000)]),
pa.array(list(range(5000)), type=pa.int64()),
],
schema=pa.schema([
pa.field("name", pa.string()),
pa.field("age", pa.int64()),
])
)
# 写入批次
writer = client.write_rows_arrow(session_id, stream_id=0, record_count=5000)
writer.write(batch)
commit_msg, success = writer.finish()
常用向量类型对应关系:
| MaxCompute 类型 | Arrow Vector 类型 |
|---|---|
| STRING/VARCHAR | VarCharVector |
| BIGINT | BigIntVector |
| INT | IntVector |
| DOUBLE | Float8Vector |
| FLOAT | Float4Vector |
| BOOLEAN | BitVector |
| DATETIME | TimeStampMilliVector |
| DECIMAL | DecimalVector |
多 Writer 并行写入
同一 Session 下可创建多个独立的 Writer,通过不同的 streamId 标识:
- Java
- Python
try (TableWriteSession session = client.createTableWriteSessionBuilder(tableId).build()) {
int numWriters = 4;
ExecutorService executor = Executors.newFixedThreadPool(numWriters);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numWriters; i++) {
final int writerIndex = i;
futures.add(executor.submit(() -> {
try (ArrowWriter writer = session.createWriterBuilder(
"stream-" + writerIndex, 1).build()) {
TableArrowWriter arrowWriter = (TableArrowWriter) writer;
try (VectorSchemaRoot root = arrowWriter.createVectorSchemaRoot()) {
root.allocateNew();
// 填充该 Writer 负责的数据分片...
root.setRowCount(batchSize);
writer.writeBatch(root);
}
writer.flush();
}
}));
}
for (Future<?> f : futures) {
f.get();
}
executor.shutdown();
// 所有 Writer 完成后统一提交
session.commit();
}
import concurrent.futures
import pyarrow as pa
table = o.get_table("my_table")
client = StorageApiArrowClient(o, table)
resp = client.create_write_session()
session_id = resp.session_id
def write_stream(stream_id, data_chunk):
batch = pa.record_batch(
data_chunk,
schema=pa.schema([pa.field("name", pa.string()), pa.field("age", pa.int64())])
)
writer = client.write_rows_arrow(session_id, stream_id=stream_id, record_count=len(data_chunk[0]))
writer.write(batch)
writer.finish()
# 多线程并行写入
num_writers = 4
with concurrent.futures.ThreadPoolExecutor(max_workers=num_writers) as pool:
for i in range(num_writers):
chunk = [pa.array([f"stream_{i}_row_{j}" for j in range(1000)]),
pa.array(list(range(1000)), type=pa.int64())]
pool.submit(write_stream, i, chunk)
# 所有 Writer 完成后统一提交
client.commit_write_session(session_id)
幂等重试
streamId + streamVersion 的组合用于标识一次写入操作的唯一性。对同一批数据使用相同的 streamId 和 streamVersion 重新写入,服务端会保证只保留一份结果:
- Java
- Python
// 初次写入
try (ArrowWriter writer = session.createWriterBuilder("stream-1", 1).build()) {
writer.writeBatch(root);
writer.flush();
}
// 如果写入失败,使用相同的 streamId 和 streamVersion 重试
try (ArrowWriter writer = session.createWriterBuilder("stream-1", 1).build()) {
writer.writeBatch(root); // 服务端保证幂等
writer.flush();
}
# 初次写入
writer = client.write_rows_arrow(session_id, stream_id=0, record_count=n)
writer.write(batch)
commit_msg, success = writer.finish()
# 如果写入失败,使用相同的 stream_id 重试
if not success:
writer = client.write_rows_arrow(session_id, stream_id=0, record_count=n)
writer.write(batch) # 服务端保证幂等
writer.finish()
配置选项
Session 构建参数
| 方法 | 说明 |
|---|---|
withPartition(PartitionSpec) | 写入指定分区 |
withOverwrite(boolean) | 是否覆盖已有数据(默认 false) |
withWriteMode(WriteMode) | 写入模式:BATCH(默认)/ STREAMING |
withSessionId(String) | 复用已有 Session |
Writer 构建参数
| 方法 | 默认值 | 说明 |
|---|---|---|
withBufferSize(long) | 64MB | 写入缓冲区大小(字节) |
withAutoFlushEnabled(boolean) | true | 缓冲区满时是否自动 flush |
withExecutorService(ExecutorService) | - | 异步 flush 线程池 |
异步 Flush
通过设置线程池,可以实现写入和网络发送的并行化,提升吞吐:
- Java
- Python
ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
ArrowWriter writer = session.createWriterBuilder("stream-1", 1)
.withExecutorService(flushExecutor)
.build();
# PyODPS Storage API 内部自动处理异步 flush
# 写入大量数据时自动优化网络发送
writer = client.write_rows_arrow(session_id, stream_id=0, record_count=n)
writer.write(batch)
writer.finish()
注意事项
- VectorSchemaRoot 生命周期:调用方负责管理
VectorSchemaRoot的关闭,必须使用 try-with-resources 或手动close()防止内存泄漏。 - writeBatch 后可复用:
writeBatch()方法内部会立即序列化数据,调用返回后VectorSchemaRoot可安全修改或重用。 - Batch 模式必须 commit:Batch 模式下如果 Session 关闭前未调用
commit(),会自动执行abort()丢弃所有数据。 - Streaming 模式不可回滚:Streaming 模式下 flush 后的数据无法撤回。
- streamId 唯一性:同一 Session 下不同 Writer 应使用不同的
streamId。 - 分区列不需写入:写入分区表时,分区列不包含在写入数据中,通过
withPartition()指定。