UploadSession 上传会话
UploadSession 作为 TableTunnel 的核心组件,是Batch Tunnel组件的一员部分。 专为用户提供了一种高效、灵活的手段来应对数据上传挑战,贯穿从初始化、数据写入到完成上传的整个生命周期管理。 本文档详尽解析了UploadSession的使用方法,包括如何初始化新会话或获取现有会话、运用RecordWriter与TunnelBufferedWriter实现数据写入操作、完成数据上传的提交流程, 以及探索RecordPack和Apache Arrow格式的进阶应用,旨在帮助开发者充分利用MaxCompute的高性能数据上传能力。 通过深入理解并实践这些指南,用户能够针对不同场景定制最合适的上传策略,确保数据传输既高效又可靠。
目录
初始化
前提条件
确保你已经有一个有效的 TableTunnel
实例。如果尚未创建,请参考 TableTunnel 文档
获取实例化方法。
创建新的上传会话
对于非分区表
-
基本创建
TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName);
-
带覆盖选项创建
如果希望新上传的数据覆盖表中已存在的数据,则需设置
overwrite
参数为true
。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName, overwrite);
-
指定Namespace Schema
若表属于特定Namespace Schema下,还需提供
schemaName
。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, schemaName, tableName, overwrite);
对于分区表
-
基础用法
需要提供完整的分区规格 (
PartitionSpec
),且分区必须是最末级的。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName, partitionSpec);
-
带覆盖与分区
同样可以指定是否覆盖已有数据。
TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName, partitionSpec, overwrite);
-
指定Namespace Schema
若表属于特定Namespace Schema下,还需提供
schemaName
。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, schemaName, tableName, partitionSpec, overwrite);
获取现有上传会话
已创建的上传会话可以通过其唯一的 sessionId
获取:
-
非分区表
TableTunnel.UploadSession existingSession = tableTunnel.getUploadSession(projectName, tableName, sessionId);
-
分区表
TableTunnel.UploadSession existingSession = tableTunnel.getUploadSession(projectName, tableName, partitionSpec, sessionId);
多线程/进程共享会话
为了提高并发效率,可以在多线程或多进程中共享一个上传会话。这需要指定会话的总共享数 (shares
)
和当前实例的唯一标识 (shareId
):
TableTunnel.UploadSession sharedSession = tableTunnel.getUploadSession(projectName,tableName,sessionId,shares,shareId);
对于分区表,同样适用:
TableTunnel.UploadSession sharedSession = tableTunnel.getUploadSession(projectName,tableName,partitionSpec,sessionId,shares,shareId);
注意事项
- 在调用上述方法前,务必确保
TableTunnel
实例是可用的,并且已正确配置了项目名等信息。 - 分区表的上传会话创建要求提供完整的分区路径。
- 多线程/进程共享同一会话时,合理分配
shares
和shareId
是必要的,以避免资源冲突和数据混乱。
以上步骤指导你完成了一个 UploadSession
的初始化及复用,无论是创建新的上传任务还是继续之前中断的任务,这些方法都将是基础且重要的操作。
使用 RecordWriter 写入数据
RecordWriter
是用于向 MaxCompute 表中写入数 据的核心组件。
通过 UploadSession
提供的一系列方法,你可以根据需求选择不同的压缩策略、超时设定以及自定义版本控制来高效地上传数据。
每个Session在服务端的生命周期为24小时。
核心概念
blockId:会话级别,用于标识数据块ID,使用者应当保证同一个blockId只有一个写入者,否则会导致数据混乱。通过指定不同的blockId,可以实现多线程写入操作,提升写入效率。
使用 TunnelRecordWriter
TunnelRecordWriter 与服务端维护一个长链接,每次调用write(record)
方法,都会像服务端发送新的数据。
当写入数据结束时,一定需要调用close
方法,关闭 RecordWriter
断开链接。
不压缩数据
如果你不需要对上传数据进行压缩,可以使用以下方法打开 RecordWriter
:
RecordWriter writer = uploadSession.openRecordWriter(blockId);
其中,blockId
是用户自定义的一个0到19999之间的数字,用于标识本次上传的数据块。
启用压缩
若希望在数据传输过程中进行压缩,可以指定压缩参数:
RecordWriter writer = uploadSession.openRecordWriter(blockId,true);
或者更具体地指定压缩算法:
CompressOption compressOption = new CompressOption(CompressOption.CompressAlgorithm.GZIP,0,0);
RecordWriter writer = uploadSession.openRecordWriter(blockId,compressOption);
使用 TunnelBufferedWriter
TunnelBufferedWriter
维护了一个缓冲区,只有当调用TunnelBufferedWriter#flush()
方法时 ,才与服务端建立链接。
TunnelBufferedWriter
向用户屏蔽了 blockId 的概念,使写入操作更像传统文件写入。它通过内部维护一个缓冲区,当数据量超过缓冲区,或用户手动调用
TunnelBufferedWriter#flush()
操作时,将缓存数据作为一个新的数据块(blockId)发送到服务端。获取blockId的原理为根据shareId的递增值,因此当同一个Session开启了多个TunnelBufferedWriter时,需要参考
多线程/进程共享会话 正确的维护 shares
和 shareId
基础使用
RecordWriter bufferedWriter = uploadSession.openBufferedWriter();
启用压缩
RecordWriter bufferedWriter = uploadSession.openBufferedWriter(true);
或指定压缩选项:
CompressOption compressOption = new CompressOption(CompressOption.CompressAlgorithm.SNAPPY,0,0);
RecordWriter bufferedWriter = uploadSession.openBufferedWriter(compressOption);