Stream Upload
StreamUploadSession 作为 TableTunnel 的核心组件,是Stream Tunnel组件的一员部分。 与 UploadSession 不同的是,StreamUploadSession 专为流式数据上传场景而设计,支持写入即可见(auto-commit)特性,不需要用户手动提交会话。
使用流程
初始化配置
基础构造器
// 最小化配置示例
StreamUploadSession session = tableTunnel.buildStreamUploadSession(projectName, tableName)
.setSchemaName("user_logs") // 三层模型
.setPartitionSpec("dt=20231001/city=shanghai") // 分区表达式
.build();
高级配置参数
构造器方法 | 类型 | 默认值 | 说明 |
---|---|---|---|
setSlotNum(long) | long | 0 | 并行槽位数量(0表示自动分配) |
setCreatePartition(boolean) | boolean | false | 自动创建不存在的分区 |
setSchemaVersion(String) | String | 最新版本 | 指定schema版本号 |
allowSchemaMismatch(boolean) | boolean | true | 是否允许字段类型不匹配 |
核心操作 API
数据包管理
// 创建数据包,使用默认压缩方式(tunnel config 中的压缩算法,默认压缩算法为 ODPS_ZLIB)
StreamRecordPack pack = session.newRecordPack();
// 指定压缩方式(Snappy 算法)
StreamRecordPack compressedPack = session.newRecordPack(
new CompressOption(CompressAlgorithm.ODPS_SNAPPY, 0, 0))
);
备注
在 0.51.2 版本之前,默认压缩算法为 ODPS_RAW