Upload
UploadSession 作为 TableTunnel 的核心组件,是Batch Tunnel组件的一员部分。 专为用户提供了一种高效、灵活的手段来应对数据上传挑战,贯穿从初始化、数据写入到完成上传的整个生命周期管理。 本文档详尽解析了UploadSession的使用方法,包括如何初始化新会话或获取现有会话、运用RecordWriter与TunnelBufferedWriter实现数据写入操作、完成数据上传的提交流程, 以及探索RecordPack和Apache Arrow格式的进阶应用,旨在帮助开发者充分利用MaxCompute的高性能数据上传能力。 通过深入理解并实践这些指南,用户能够针对不同场景定制最合适的上传策略,确保数据传输既高效又可靠。
目录
初始化
前提条件
确保你已经有一个有效的 TableTunnel
实例。如果尚未创建,请参考 TableTunnel 文档
获取实例化方法。
创建新的上传会话
对于非分区表
-
基本创建
TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName);
-
带覆盖选项创建
如果希望新上传的数据覆盖表中已存在的数据,则需设置
overwrite
参数为true
。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName, overwrite);
-
指定Namespace Schema
若表属于特定Namespace Schema下,还需提供
schemaName
。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, schemaName, tableName, overwrite);
对于分区表
-
基础用法
需要提供完整的分区规格 (
PartitionSpec
),且分区必须是最末级的。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName, partitionSpec);
-
带覆盖与分区
同样可以指定是否覆盖已有数据。
TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, tableName, partitionSpec, overwrite);
-
指定Namespace Schema
若表属于特定Namespace Schema下,还需提供
schemaName
。TableTunnel.UploadSession uploadSession = tableTunnel.createUploadSession(projectName, schemaName, tableName, partitionSpec, overwrite);
获取现有上传会话
已创建的上传会话可以通过其唯一的 sessionId
获取:
-
非分区表
TableTunnel.UploadSession existingSession = tableTunnel.getUploadSession(projectName, tableName, sessionId);
-
分区表
TableTunnel.UploadSession existingSession = tableTunnel.getUploadSession(projectName, tableName, partitionSpec, sessionId);
多线程/进程共享会话
为了提高并发效率,可以在多线程或多进程中共享一个上传会话。这需要指定会话的总共享数 (shares
)
和当前实例的唯一标识 (shareId
):
TableTunnel.UploadSession sharedSession = tableTunnel.getUploadSession(projectName,tableName,sessionId,shares,shareId);
对于分区表,同样适用:
TableTunnel.UploadSession sharedSession = tableTunnel.getUploadSession(projectName,tableName,partitionSpec,sessionId,shares,shareId);
注意事项
- 在调用上述方法前,务必确保
TableTunnel
实例是可用的,并且已正确配置了项目名等信息。 - 分区表的上传会话创建要求提供完整的分区路径。
- 多线程/进程共享同一会话时,合理分配
shares
和shareId
是必要的,以避免资源冲突和数据混乱。
以上步骤指导你完成了一个 UploadSession
的初始化及复用,无论是创建新的上传任务还是继续之前中断的任务,这些方法都将是基础且重要的操作。
使用 RecordWriter 写入数据
RecordWriter
是用于向 MaxCompute 表中写入数据的核心组件。
通过 UploadSession
提供的一系列方法,你可以根据需求选择不同的压缩策略、超时设定以及自定义版本控制来高效地上传数据。
每个Session在服务端的生命周期为24小时。
核心概念
blockId:会话级别,用于标识数据块ID,使用者应当保证同一个blockId只有一个写入者,否则会导致数据混乱。通过指定不同的blockId,可以实现多线程写入操作,提升写入效率。
使用 TunnelRecordWriter
TunnelRecordWriter 与服务端维护一个长链接,每次调用write(record)
方法,都会像服务端发送新的数据。
当写入数据结束时,一定需要调用close
方法,关闭 RecordWriter
断开链接。
不压缩数据
如果你不需要对上传数据进行压缩,可以使用以下方法打开 RecordWriter
:
RecordWriter writer = uploadSession.openRecordWriter(blockId);
其中,blockId
是用户自定义的一个0到19999之间的数字,用于标识本次上传的数据块。
启用压缩
若希望在数据传输过程中进行压缩,可以指定压缩参数:
RecordWriter writer = uploadSession.openRecordWriter(blockId,true);
或者更具体地指定压缩算法:
CompressOption compressOption = new CompressOption(CompressOption.CompressAlgorithm.GZIP,0,0);
RecordWriter writer = uploadSession.openRecordWriter(blockId,compressOption);
使用 TunnelBufferedWriter
TunnelBufferedWriter
维护了一个缓冲区,只有当调用TunnelBufferedWriter#flush()
方法时,才与服务端建立链接。
TunnelBufferedWriter
向用户屏蔽了 blockId 的概念,使写入操作更像传统文件写入。它通过内部维护一个缓冲区,当数据量超过缓冲区,或用户手动调用
TunnelBufferedWriter#flush()
操作时,将缓存数据作为一个新的数据块(blockId)发送到服务端。获取blockId的原理为根据shareId的递增值,因此当同一个Session开启了多个TunnelBufferedWriter时,需要参考
多线程/进程共享会话 正确的维护 shares
和 shareId
基础使用
RecordWriter bufferedWriter = uploadSession.openBufferedWriter();
启用压缩
RecordWriter bufferedWriter = uploadSession.openBufferedWriter(true);
或指定压缩选项:
CompressOption compressOption = new CompressOption(CompressOption.CompressAlgorithm.SNAPPY,0,0);
RecordWriter bufferedWriter = uploadSession.openBufferedWriter(compressOption);
设置超时
timeout
表示超时时间 单位 ms
, <=0
代表无超时.
推荐值: (BufferSizeInMB / UploadBandwidthInMB) * 1000 * 120%
long timeout = (1024/100)*1000*1.2; // 示例超时计算,假设缓冲区大小1MB,上传带宽100MB/s
public RecordWriter openBufferedWriter(CompressOption compressOption,long timeout)
设置超时与BlockVersionProvider
为了更好地控制上传过程,可以设置超时时间和自定义Block版本控制逻辑:
long timeout = (1024/100)*1000*1.2; // 示例超时计算,假设缓冲区大小1MB,上传带宽100MB/s
BlockVersionProvider versionProvider = new CustomBlockVersionProvider(); // 自定义版本提供逻辑
RecordWriter bufferedWriter = uploadSession.openBufferedWriter(compressOption,timeout,versionProvider);
注意事项
- 在使用
openRecordWriter
方法时,确保blockId
的唯一性和合理性,以免造成数据混乱。 - 一个Block大小上限100GB,建议大于64M的数据。
- 上传数据时,
RecordWriter
如果120秒内没有网络动作,服务端将主动关闭连接,此时Writer将不可用,请重新打开一个新的Writer写入。 - 如果不像手动维护
blockId
和缓冲操 作,可以使用TunnelBufferedWriter
。 - 自定义
BlockVersionProvider
可以为每个数据块提供特定的版本控制逻辑,但需谨慎使用,确保不会引发版本冲突。
通过上述方法,你可以灵活地根据数据特点和上传需求选择合适的 RecordWriter
初始化方式,进而高效地将数据上传至MaxCompute表中。
完成写入
完成数据上传后,必须通过调用 commit
方法来确认,以确保数据最终被提交到 MaxCompute
表中。此步骤是确保数据持久化和可见性的关键操作。
基础提交
对于无需块级验证的情况,可使用简化版的提交方法:
/**
* 简化提交流程,不执行数据块校验。
*
* @throws TunnelException 上传过程中出现的隧道异常。
* @throws IOException IO相关异常。
*/
public void commit()throws TunnelException,IOException;
带数据块校验的提交
若需要确保上传数据的完整性,可以提供已成功上传的数据块列表进行校验:
/**
* 提交上传任务,并进行数据块完整性校验。
*
* @param blocks 用户记录的已成功上传的数据块ID列表,用于与服务端核对。
* @throws TunnelException 如果提交的块列表与服务端记录不匹配,则抛出异常。
* @throws IOException IO相关异常。
*/
public void commit(Long[]blocks)throws TunnelException,IOException;
在实际操作中,根据是否需要进行块级完整性验证,选择合适的 commit
方法来安全且高效地完成上传任务。
其他实用功能
会话信息与管理
以下方法帮助您获取上传会话的详细信息及管理上传过程:
-
获取会话ID:
/**
* 返回当前上传会话的唯一标识ID。
*
* @return 上传会话ID。
*/
public String getId(); -
获取表结构:
/**
* 获取与上传会话关联的表结构信息。
*
* @return 表的结构定义。
*/
public TableSchema getSchema(); -
查询配额名称:
/**
* 获取用于本次上传的 quota 名称,注意并非是 quotaNickName。
*/
public String getQuotaName(); -
获取会话状态:
/**
* 刷新并获取当前上传会话的状态。
*/
public UploadStatus getStatus() throws TunnelException, IOException;
当前返回值包括以下状态码:
- UNKNOWN:Server端刚创建一个Session时设置的初始值。
- NORMAL:创建Upload对象成功。
- CLOSING:当调用complete方法(结束上传)时,服务端会先把状态置为CLOSING。
- CLOSED:完成结束上传(即把数据移动到结果表所在目录)后。
- EXPIRED:上传超时。
- CRITICAL:服务出错。
配置与记录创建
-
访问配置信息:
/**
* 提供上传会话使用的配置对象,以便于获取或修改配置项。
*/
public Configuration getConfig(); -
创建记录对象:
/**
* 创建一个新的Record对象,准备用于数据上传。
*
* @return 一个新的Record实例,根据表结构初始化。
*/
public Record newRecord();
这些辅助方法使得在上传数据前后或过程中,能够便捷地管理会话信息、检查状态、以及准备数据记录,从而提高开发效率和灵活性。