写入数据
概述
Storage API 支持两种写入模式:
- Batch(批量)模式(默认):数据写入后需要显式调用
commit()才对外可见,支持事务回滚,适合需要原子性保证的场景 - Streaming(流式)模式:数据调用
flush()后立即可见,无需commit(),适合实时写入场景
写入流程分为三个阶段:
- 通过
TableWriteSessionBuilder配置写入参数,调用build()创建TableWriteSession - 通过
session.createWriterBuilder(streamId, streamVersion)创建TableWriterBuilder,调用build()获取ArrowWriter - 使用
ArrowWriter写入 Arrow 格式的数据批次,完成后调用flush();批量模式还需调用session.commit()
客户端通过 MaxStorageClient.createTableWriteSessionBuilder() 进入写入流程。如果表包含 Blob 列,请参考 Blob 数据读写。
MaxStorageClient
└─ createTableWriteSessionBuilder(tableId)
└─ TableWriteSessionBuilder.build()
└─ TableWriteSession
└─ createWriterBuilder(streamId, streamVersion)
└─ TableWriterBuilder.build()
└─ TableArrowWriter
├─ writeBatch(VectorSchemaRoot)
└─ flush()
└─ commit() // Batch 模式必须
TableWriteSessionBuilder
TableWriteSessionBuilder 用于配置写入参数,通过 client.createTableWriteSessionBuilder(tableId) 获取。
配置选项
withPartition — 写入指定分区
设置写入目标分区。对于分区表,必须指定分区才能写入。
public TableWriteSessionBuilder withPartition(PartitionSpec partitionSpec)
参数:
partitionSpec:分区规格,例如new PartitionSpec("dt='20250101'")
示例:
TableWriteSession session = client.createTableWriteSessionBuilder(tableId)
.withPartition(new PartitionSpec("dt='20250101'"))
.build();
withOverwrite — 覆盖写入
是否覆盖目标表或分区中的现有数据。
public TableWriteSessionBuilder withOverwrite(boolean overwrite)
参数:
overwrite:true表示覆盖写入;false(默认)表示追加写入
示例:
// 覆盖写入指定分区
TableWriteSession session = client.createTableWriteSessionBuilder(tableId)
.withPartition(new PartitionSpec("dt='20250101'"))
.withOverwrite(true)
.build();
withWriteMode — 写入模式
设置写入模式(批量或流式)。
public TableWriteSessionBuilder withWriteMode(WriteMode writeMode)
参数:
writeMode:写入模式枚举值:WriteMode.BATCH(默认):批量模式,提交后数据可见WriteMode.STREAMING:流式模式,flush 后立即可见
示例:
// 流式写入模式
TableWriteSession session = client.createTableWriteSessionBuilder(tableId)
.withWriteMode(WriteMode.STREAMING)
.build();
withSessionId — 复用已有 Session
通过 Session ID 复用之前已创建的写入 Session。
public TableWriteSessionBuilder withSessionId(String sessionId)
build — 构建 Session
public TableWriteSession build()
返回值:TableWriteSession 实例
TableWriteSession
TableWriteSession 代表一个写入会话,管理写入的完整生命周期(创建、写入、提交/回滚)。
createWriterBuilder
创建写入 Builder,用于构建实际的 Arrow Writer。
public TableWriterBuilder createWriterBuilder(String streamId, long streamVersion)
参数:
| 参数 | 类型 | 说明 |
|---|---|---|
streamId | String | Stream 唯一标识符,同一 Session 下不同 Writer 应使用不同的 ID |
streamVersion | long | Stream 版本号,必须 ≥ 1,用于幂等重试 |
返回值:TableWriterBuilder 实例
streamId + streamVersion 的组合用于标识一次写入操作的唯一性,可利用此机制实现写入的幂等重试:对同一批数据,使用相同的 streamId 和 streamVersion 重新创建 Writer 并写入,服务 端会保证只保留一份结果。
commit
提交写入事务,使所有已写入的数据对外可见。
public void commit()
- Batch 模式:必须调用
commit()才能使数据可见 - Streaming 模式:
commit()是空操作,数据已在flush()时可见
异常:
ClientException:Session 已关闭时抛出
abort
中止写入事务,丢弃所有已写入的数据。
public void abort()
- Batch 模式:调用后放弃所有未提交的数据
- Streaming 模式:无效操作,数据已对外可见无法回滚
close
关闭 Session,若未显式 commit(),则自动执行 abort() 丢弃数据。
public void close()
推荐使用 try-with-resources 保证资源释放:
try (TableWriteSession session = client.createTableWriteSessionBuilder(tableId).build()) {
// 写 入数据 ...
session.commit(); // 必须在 close() 前调用
}
// close() 自动调用,若未 commit() 则 abort()
getId
获取 Session 的唯一标识符。
public String getId()
getWriteMode
获取当前 Session 的写入模式。
public WriteMode getWriteMode()
TableWriterBuilder
TableWriterBuilder 用于配置 Arrow Writer 的参数,通过 session.createWriterBuilder(streamId, version) 获取。
配置选项
withBufferSize — 缓冲区大小
设置写入缓冲区大小(字节),缓冲区满时触发自动 flush。
public TableWriterBuilder withBufferSize(long bufferSize)
参数:
bufferSize:缓冲区大小,默认为 64MB(64 * 1024 * 1024)
withAutoFlushEnabled — 自动 Flush
是否启用自动 Flush(缓冲区满时自动发送数据)。
public TableWriterBuilder withAutoFlushEnabled(boolean enabled)
参数:
enabled:默认为true
withExecutorService — 异步 Flush 线程池
设置用于异步 Flush 的线程池,实现写入与网络发送的并行化。
public TableWriterBuilder withExecutorService(ExecutorService executorService)
参数:
executorService:线程池,建议使用Executors.newSingleThreadExecutor()
示例:
ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
ArrowWriter writer = session.createWriterBuilder("stream-1", 1)
.withExecutorService(flushExecutor)
.build();
withBatchBlobUploadEnabled — 批量 Blob 上传
启用批量 Blob 上传模式(即 Blob 写入模式 A),适用于包含 Blob 字段的表。开启后,writeBatch() 时自动批量上传 Blob 并回填引用。
public TableWriterBuilder withBatchBlobUploadEnabled(boolean enabled)
build — 构建 ArrowWriter
public ArrowWriter build()
返回值:ArrowWriter(实际类型为 TableArrowWriter)
TableArrowWriter
TableArrowWriter 是 Arrow 格式的写入器实现,继承 ArrowWriter 接口。
createVectorSchemaRoot
创建与该 Writer 匹配的 VectorSchemaRoot 对象,用于填充数据。
public VectorSchemaRoot createVectorSchemaRoot()
返回值:空的 VectorSchemaRoot,Schema 与目标表一致
调用方负责管理 VectorSchemaRoot 的生命周期,必须使用 try-with-resources 或手动调用 close() 以防止内存泄漏。
writeBatch
写入一个 Arrow 数据批次到缓冲区。
public void writeBatch(VectorSchemaRoot root)
参数:
root:包含一批数据的VectorSchemaRoot,方法返回后可安全复用
writeBatch() 内部会立即将 VectorSchemaRoot 序列化为字节数组缓存到内存,因此调用返回后 root 对象可以安全地修改或重用,不会影响已缓存的数据。
flush
将缓冲区中的所有数据发送到服务端。
public void flush()
- Batch 模式:数据发送到服务端,但需要
session.commit()后才对外可见 - Streaming 模式:数据发送后立即对外可见
close
关闭写入器,释放资源。关闭前会自动执行最后一次 flush()。
bytesWritten
返回已写入的总字节数(序列化后)。
public long bytesWritten()