Tunnel Upload 批量写入
Tunnel Upload 是 MaxCompute 最经典的批量数据写入方式,通过 UploadSession 管理写入生命周期,支持原子提交确保数据一致性。
前置条件
- 已初始化
Odps客户端实例 - 已创建
TableTunnel实例(参考 TableTunnel 文档) - 目标表已存在;若为分区表,分区需已创建或指定已有分区
完整示例
- Java
- Python
- Go
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
public class TunnelUploadExample {
public static void main(String[] args) throws Exception {
// 1. 初始化 ODPS 客户端
AliyunAccount account = new AliyunAccount("<accessId>", "<accessKey>");
Odps odps = new Odps(account);
odps.setDefaultProject("<projectName>");
odps.setEndpoint("<endpoint>");
// 2. 获取 TableTunnel 实例
TableTunnel tunnel = odps.tableTunnel();
// 3. 创建 UploadSession
String projectName = "<projectName>";
String tableName = "<tableName>";
TableTunnel.UploadSession session = tunnel.createUploadSession(projectName, tableName);
System.out.println("Session ID: " + session.getId());
// 4. 打开 RecordWriter(指定 blockId)
long blockId = 0;
RecordWriter writer = session.openRecordWriter(blockId, true); // 启用压缩
// 5. 写入数据
for (int i = 0; i < 1000; i++) {
Record record = session.newRecord();
record.setString("name", "user_" + i);
record.setBigint("age", (long) (20 + i % 50));
writer.write(record);
}
// 6. 关闭 Writer
writer.close();
// 7. 提交(带 Block 校验)
session.commit(new Long[]{blockId});
System.out.println("Upload committed successfully.");
}
}
# 记录写入
records = [[1, 'Alice'], [2, 'Bob']]
with table.open_writer(partition='dt=20231001', create_partition=True) as writer:
writer.write(records)
# 使用 write_table(支持 DataFrame)
import pandas as pd
df = pd.DataFrame({'id': [1, 2], 'name': ['Alice', 'Bob']})
o.write_table('my_table', df, partition='dt=20231001', create_partition=True)
session, _ := tunnelIns.CreateUploadSession(
project.Name(), "my_table",
tunnel.SessionCfg.WithPartitionKey("dt=20231001"),
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
// 打开 writer
writer, _ := session.OpenRecordWriter(0)
// 写入数据
record := []data.Data{data.BigInt(1), data.String("Alice")}
writer.Write(record)
record2 := []data.Data{data.BigInt(2), data.String("Bob")}
writer.Write(record2)
// 关闭并提交
writer.Close()
session.Commit([]int{0})
代码说明
写入流程
整个 Tunnel Upload 写入流程分为以下步骤:
- 创建 Session:通过
TableTunnel.createUploadSession()创建上传会话,获取会话 ID - 打开 Writer:通过
session.openRecordWriter(blockId)获取写入器,需指定 Block ID - 写入记录:构建 Record 对象并逐条写入
- 关闭 Writer:调用
writer.close()完成当前 Block 的数据传输 - 提交 Session:调用
session.commit()使数据对外可见
Block 管理
Block 是 Tunnel Upload 的核心概念:
- Block ID 范围:0 ~ 19999,同一 Session 内唯一标识一个数据块
- 唯一写入者:同一时刻每个 Block ID 只能有一个活跃的 Writer
- 覆盖语义:对同一个 Block ID 重新写入会覆盖之前的数据(未 commit 前)
- 大小建议:每个 Block 建议不小于 64MB,上限为 100GB
多 Block 并行上传
利用不同的 Block ID,可以实现多线程并行写入:
- Java
- Python
- Go
TableTunnel.UploadSession session = tunnel.createUploadSession(projectName, tableName);
int threadCount = 4;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<?>> futures = new ArrayList<>();
for (int t = 0; t < threadCount; t++) {
final long blockId = t;
futures.add(executor.submit(() -> {
try {
RecordWriter writer = session.openRecordWriter(blockId, true);
for (int i = 0; i < 10000; i++) {
Record record = session.newRecord();
record.setString("name", "thread_" + blockId + "_row_" + i);
record.setBigint("value", (long) i);
writer.write(record);
}
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
// 等待所有线程完成
for (Future<?> f : futures) {
f.get();
}
executor.shutdown();
// 提交所有 Block
Long[] blocks = new Long[]{0L, 1L, 2L, 3L};
session.commit(blocks);
import concurrent.futures
# PyODPS 自动管理 Block,支持多线程写入
def write_chunk(thread_id, records):
with table.open_writer(partition='dt=20250101', create_partition=True) as writer:
writer.write(records)
# 准备分片数据
thread_count = 4
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as pool:
for t in range(thread_count):
records = [
[f"thread_{t}_row_{i}", i] for i in range(10000)
]
pool.submit(write_chunk, t, records)
session, _ := tunnelIns.CreateUploadSession(
project.Name(), "my_table",
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
threadCount := 4
var wg sync.WaitGroup
for t := 0; t < threadCount; t++ {
wg.Add(1)
go func(blockId int) {
defer wg.Done()
writer, _ := session.OpenRecordWriter(blockId)
for i := 0; i < 10000; i++ {
record := []data.Data{
data.String(fmt.Sprintf("thread_%d_row_%d", blockId, i)),
data.BigInt(int64(i)),
}
writer.Write(record)
}
writer.Close()
}(t)
}
wg.Wait()
// 提交所有 Block
session.Commit([]int{0, 1, 2, 3})
使用 TunnelBufferedWriter
如果不想手动管理 Block ID,可以使用 TunnelBufferedWriter,它内部自动管理缓冲区和 Block 分配:
- Java
- Python
- Go
TableTunnel.UploadSession session = tunnel.createUploadSession(projectName, tableName);
RecordWriter writer = session.openBufferedWriter(true); // 启用压缩
for (int i = 0; i < 100000; i++) {
Record record = session.newRecord();
record.setString("name", "user_" + i);
record.setBigint("age", (long) (20 + i % 50));
writer.write(record);
}
writer.close();
session.commit();
# PyODPS 默认自动管理缓冲区和 Block 分配
records = [[f"user_{i}", 20 + i % 50] for i in range(100000)]
with table.open_writer() as writer:
writer.write(records)
session, _ := tunnelIns.CreateUploadSession(
project.Name(), "my_table",
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
writer, _ := session.OpenRecordWriter(0)
for i := 0; i < 100000; i++ {
record := []data.Data{
data.String(fmt.Sprintf("user_%d", i)),
data.BigInt(int64(20 + i%50)),
}
writer.Write(record)
}
writer.Close()
session.Commit([]int{0})
配置选项
压缩算法
| 算法 | 说明 |
|---|---|
CompressOption.CompressAlgorithm.ODPS_RAW | 不压缩 |
CompressOption.CompressAlgorithm.ODPS_ZLIB | ZLIB 压缩(支持 level 和 strategy) |
CompressOption.CompressAlgorithm.ODPS_SNAPPY | Snappy 压缩 |
CompressOption.CompressAlgorithm.ODPS_LZ4_FRAME | LZ4 Frame 压缩 |
- Java
- Python
- Go
CompressOption option = new CompressOption(
CompressOption.CompressAlgorithm.ODPS_LZ4_FRAME, 0, 0);
RecordWriter writer = session.openRecordWriter(blockId, option);
# PyODPS 默认启用压缩,也可指定压缩算法
with table.open_writer(compress_algo='zlib') as writer:
writer.write(records)
session, _ := tunnelIns.CreateUploadSession(
project.Name(), "my_table",
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
分区表写入
- Java
- Python
- Go
// 写入指定分区
PartitionSpec partitionSpec = new PartitionSpec("dt='20250101'");
TableTunnel.UploadSession session = tunnel.createUploadSession(
projectName, tableName, partitionSpec);
with table.open_writer(partition='dt=20250101', create_partition=True) as writer:
writer.write(records)
session, _ := tunnelIns.CreateUploadSession(
project.Name(), "my_table",
tunnel.SessionCfg.WithPartitionKey("dt=20250101"),
)
覆盖写入
- Java
- Python
- Go
// 覆盖表中已有数据
boolean overwrite = true;
TableTunnel.UploadSession session = tunnel.createUploadSession(
projectName, tableName, overwrite);
# 覆盖表中已有数据
with table.open_writer(overwrite=True) as writer:
writer.write(records)
// 覆盖表中已有数据
session, _ := tunnelIns.CreateUploadSession(
project.Name(), "my_table",
tunnel.SessionCfg.Overwrite(),
)
注意事项
- Session 有效期:每个 UploadSession 在服务端的生命周期为 24 小时,超时后需重新创建。
- Writer 超时:RecordWriter 如果 120 秒内没有网络活动,服务端将主动断开连接,需重新打开 Writer。
- Block 大小:建议每个 Block 数据量不小于 64MB,避免产生大量小文件。单个 Block 上限 100GB。
- 提交后不可变:
commit()成功后数据立即对外可见,且不可回滚。 - Block ID 唯一性:同一个 Block ID 同时只能有一个 Writer 在写入,否则会导致数据混乱。
- Session 不可复用:一个 Session 只能 commit 一次,后续写入需创建新的 Session。