跳到主要内容

StreamUploadSession

StreamUploadSession 专为流式数据上传设计,支持写入即可见(auto-commit)特性,无需手动提交会话。

获取实例

通过 TableTunnel.buildStreamUploadSession 构建:

StreamUploadSession session = tunnel.buildStreamUploadSession("my_project", "my_table")
.setPartitionSpec("dt=20231001/city=shanghai")
.build();

Builder 配置

方法参数类型默认值说明
setProjectName(String)String-项目名称(必填)
setTableName(String)String-表名称(必填)
setSchemaName(String)StringnullSchema 名称(三层模型)
setPartitionSpec(String)Stringnull分区表达式
setSlotNum(long)long0并行槽位数(0 自动分配)
setCreatePartition(boolean)booleanfalse自动创建不存在的分区
setSchemaVersion(String)String最新版本指定 Schema 版本号
allowSchemaMismatch(boolean)booleantrue是否允许字段类型不匹配
setDynamicPartition(boolean)booleanfalse设置是否启用动态分区写入

方法列表

getId

获取会话 ID。

public String getId()

getSchema

获取表结构信息。

public TableSchema getSchema()

getSchemaVersion

获取表 Schema 版本号。

public String getSchemaVersion()

getQuotaName

获取本次流式上传使用的 Quota 名称。

public String getQuotaName()

getLastBatchId

获取最后一个已提交的 batch ID。

public long getLastBatchId() throws TunnelException

getLastBatchCommitTime

获取最后一次 batch 提交时间(毫秒时间戳)。

public long getLastBatchCommitTime() throws TunnelException

newRecordPack

创建流式数据包。

public StreamRecordPack newRecordPack() throws IOException
public StreamRecordPack newRecordPack(CompressOption option) throws IOException
参数类型说明
optionCompressOption压缩选项,默认 ODPS_ZLIB

压缩算法可选值ODPS_RAWODPS_ZLIBODPS_SNAPPYODPS_LZ4_FRAME


newRecord

创建空记录对象。

public Record newRecord()

StreamRecordPack 接口

append

向数据包中追加一条记录。

public void append(Record record) throws IOException

getRecordCount

获取当前包中的记录数。

public long getRecordCount()

getDataSize

获取当前数据大小(由于多层缓冲区,值变化可能不连续)。

public long getDataSize()

flush

将数据发送到服务端。flush 成功后 pack 可复用。

// 基础提交
public String flush() throws IOException

// 带选项提交
public FlushResult flush(FlushOption flushOption) throws IOException

FlushOption 配置

FlushOption option = new FlushOption().timeout(3000); // 3秒超时

FlushResult 结构

字段类型说明
traceIdString请求追踪 ID
recordCountlong本次 flush 的记录数
flushSizelong本次 flush 的数据大小
batchIdlong批次 ID

reset

重置缓冲区(flush 成功后自动调用)。

public void reset() throws IOException

使用示例

StreamUploadSession session = tunnel.buildStreamUploadSession("my_project", "my_table")
.setCreatePartition(true)
.setPartitionSpec("dt=20231001")
.build();

StreamRecordPack pack = session.newRecordPack();

while (hasMoreData()) {
Record record = session.newRecord();
record.set("id", 1001);
record.set("action", "purchase");
pack.append(record);

if (pack.getDataSize() > 64 * 1024 * 1024) { // 64MB 触发提交
pack.flush();
}
}
// 最后一批
if (pack.getRecordCount() > 0) {
pack.flush();
}

注意事项

  • 单个 StreamUploadSession 支持创建多个 RecordPack,但需避免跨线程并发操作同一个 pack
  • 长时间未 flush 的数据包会持续占用内存
  • 修改表结构后需重新创建会话以获取最新 Schema