跳到主要内容

TableWriteSession

TableWriteSession 是 Storage API 的写入会话,支持以 Arrow 格式向 MaxCompute 表写入数据。提供批量(Batch)和流式(Streaming)两种写入模式。

获取实例

通过 MaxStorageClient.createTableWriteSessionBuilder() 创建 Builder 并构建:

TableIdentifier tableId = TableIdentifier.of("my_project", "my_table");
TableWriteSession session = client.createTableWriteSessionBuilder(tableId)
.withWriteMode(WriteMode.BATCH)
.build();

WriteMode 枚举

枚举值说明
WriteMode.BATCH批量模式(默认),commit() 后数据可见,支持事务回滚
WriteMode.STREAMING流式模式,flush() 后数据立即可见,不支持回滚

TableWriteSessionBuilder

通过 client.createTableWriteSessionBuilder(tableId) 获取。

withWriteMode

设置写入模式。

public TableWriteSessionBuilder withWriteMode(WriteMode writeMode)

参数

参数类型说明
writeModeWriteModeBATCH(默认)或 STREAMING

withPartition

设置写入目标分区。分区表必须指定。

public TableWriteSessionBuilder withPartition(PartitionSpec partitionSpec)

参数

参数类型说明
partitionSpecPartitionSpec分区规格,如 new PartitionSpec("dt='20250101'")

withOverwrite

是否覆盖目标表或分区中的现有数据。

public TableWriteSessionBuilder withOverwrite(boolean overwrite)

参数

参数类型说明
overwritebooleantrue 覆盖写入;false(默认)追加写入

withSessionId

复用已有 Session。

public TableWriteSessionBuilder withSessionId(String sessionId)

build

public TableWriteSession build()

返回值TableWriteSession 实例


TableWriteSession 方法

createWriterBuilder

创建写入 Builder。

public TableWriterBuilder createWriterBuilder(String streamId, long streamVersion)

参数

参数类型说明
streamIdStringStream 唯一标识,同一 Session 下不同 Writer 使用不同 ID
streamVersionlongStream 版本号,必须 >= 1,用于幂等重试

返回值TableWriterBuilder 实例

streamId + streamVersion 组合用于幂等重试:相同组合重新写入时服务端只保留一份结果。


commit

提交写入事务,使所有已写入数据对外可见。

public void commit()
  • Batch 模式:必须调用才能使数据可见
  • Streaming 模式:空操作,数据已在 flush() 时可见

abort

中止写入事务,丢弃所有已写入数据。

public void abort()
  • Batch 模式:放弃所有未提交数据
  • Streaming 模式:无效操作

close

关闭 Session。若未显式 commit(),自动执行 abort()

public void close()

getId

public String getId()

返回值:Session 唯一标识符


getWriteMode

public WriteMode getWriteMode()

返回值:当前 Session 的写入模式


TableWriterBuilder

通过 session.createWriterBuilder(streamId, version) 获取。

方法说明
withBufferSize(long)缓冲区大小(字节),默认 64MB
withAutoFlushEnabled(boolean)是否启用自动 Flush,默认 true
withExecutorService(ExecutorService)异步 Flush 线程池
withBatchBlobUploadEnabled(boolean)启用批量 Blob 上传
build()构建 ArrowWriter(实际为 TableArrowWriter

TableArrowWriter

createVectorSchemaRoot

创建与表 Schema 匹配的空 VectorSchemaRoot

public VectorSchemaRoot createVectorSchemaRoot()

writeBatch

写入一个 Arrow 数据批次到缓冲区。调用返回后 root 可安全复用。

public void writeBatch(VectorSchemaRoot root)

flush

将缓冲区数据发送到服务端。

public void flush()

bytesWritten

返回已写入的总字节数。

public long bytesWritten()

使用示例

TableIdentifier tableId = TableIdentifier.of("my_project", "my_table");

try (TableWriteSession session = client.createTableWriteSessionBuilder(tableId)
.withPartition(new PartitionSpec("dt='20250101'"))
.build()) {

try (ArrowWriter writer = session.createWriterBuilder("stream-1", 1).build()) {
TableArrowWriter arrowWriter = (TableArrowWriter) writer;
try (VectorSchemaRoot root = arrowWriter.createVectorSchemaRoot()) {
root.allocateNew();
VarCharVector nameVec = (VarCharVector) root.getVector("name");
for (int i = 0; i < 1000; i++) {
nameVec.setSafe(i, ("user_" + i).getBytes());
}
root.setRowCount(1000);
writer.writeBatch(root);
}
writer.flush();
}
session.commit();
}