跳到主要内容

DownloadSession

DownloadSession 用于从 MaxCompute 表下载数据,支持分块并行读取、列投影和 Arrow 格式。

获取实例

通过 TableTunnel.buildDownloadSession 构建(推荐):

DownloadSession session = tunnel.buildDownloadSession("my_project", "my_table")
.setPartitionSpec(new PartitionSpec("dt=20231001"))
.build();

DownloadSessionBuilder 配置

方法参数类型必需默认值说明
setProjectName(String)String-项目名称
setTableName(String)String-表名称
setSchemaName(String)StringnullSchema 名称(三层模型)
setPartitionSpec(PartitionSpec)PartitionSpecnull分区表达式
setShardId(Long)Longnull指定分片 ID
setAsyncMode(boolean)booleanfalse启用异步初始化
setWaitAsyncBuild(boolean)booleanfalse阻塞等待异步会话就绪
setDownloadId(String)Stringnull复用已有会话 ID

异步模式示例

DownloadSessionBuilder builder = tunnel.buildDownloadSession("my_project", "my_table")
.setAsyncMode(true);
DownloadSession session = builder.build();
boolean ready = builder.wait(session, 5, 300); // 每5秒检查,最长5分钟

方法列表

getSchema

获取表结构信息。

public TableSchema getSchema()

返回值TableSchema 对象


getRecordCount

获取可下载的总记录数。

public long getRecordCount()

getId

获取会话 ID。

public String getId()

getStatus

获取会话状态。

public DownloadStatus getStatus()

getSplitCount

获取数据分片数量(用于 Arrow 读取)。

public long getSplitCount()

getQuotaName

获取本次下载使用的 Quota 名称。

public String getQuotaName()

getArrowSchema

获取 Arrow 格式的表结构。

public Schema getArrowSchema()

返回值:Arrow Schema 对象


openRecordReader

打开数据读取器。

// 基础形式
public TunnelRecordReader openRecordReader(long start, long count) throws TunnelException, IOException

// 布尔压缩开关
public TunnelRecordReader openRecordReader(long start, long count, boolean compress) throws TunnelException, IOException

// 带压缩选项
public TunnelRecordReader openRecordReader(long start, long count, CompressOption option) throws TunnelException, IOException

// 指定列裁剪
public TunnelRecordReader openRecordReader(long start, long count, CompressOption compress, List<Column> columns) throws TunnelException, IOException

// 列投影 + 压缩 + 版本检查
public TunnelRecordReader openRecordReader(long start, long count, CompressOption option, List<Column> columns, boolean disableModifiedCheck) throws TunnelException, IOException
参数类型约束说明
startlong>= 0读取起始位置(行号)
countlong>= 1读取记录数量
compressboolean-是否启用压缩
optionCompressOption-压缩配置
columnsList<Column>非空需下载的列集合
disableModifiedCheckboolean-禁用数据版本校验

返回值TunnelRecordReader 对象,建议使用 try-with-resources

示例

long total = session.getRecordCount();
try (TunnelRecordReader reader = session.openRecordReader(0, total)) {
Record record;
while ((record = reader.read()) != null) {
System.out.println(record.get("user_id"));
}
}

openBufferedRecordReader

打开带缓冲区的记录读取器。

public RecordReader openBufferedRecordReader(long start, long count, long batchSize, long bufferSize, CompressOption compress, List<Column> columns, boolean disableModifiedCheck) throws TunnelException, IOException
参数类型约束说明
startlong>= 0读取起始位置(行号)
countlong>= 1读取记录数量
batchSizelong>= 1每批次读取的记录数
bufferSizelong>= 1缓冲区大小
compressCompressOption-压缩配置
columnsList<Column>非空需下载的列集合
disableModifiedCheckboolean-禁用数据版本校验

返回值RecordReader 对象,建议使用 try-with-resources


openArrowRecordReader

打开 Arrow 格式数据读取器,适合高性能列式数据处理。

// 基础形式
public ArrowRecordReader openArrowRecordReader(long start, long count) throws TunnelException, IOException

// 按分片索引读取
public ArrowRecordReader openArrowRecordReader(long splitIndex) throws TunnelException, IOException

// 指定压缩
public ArrowRecordReader openArrowRecordReader(long start, long count, CompressOption compress) throws TunnelException, IOException

// 指定列
public ArrowRecordReader openArrowRecordReader(long start, long count, List<Column> columns) throws TunnelException, IOException

// 指定列和内存分配器
public ArrowRecordReader openArrowRecordReader(long start, long count, List<Column> columns, BufferAllocator allocator) throws TunnelException, IOException

// 完整参数
public ArrowRecordReader openArrowRecordReader(long start, long count, List<Column> columns, BufferAllocator allocator, CompressOption compress) throws TunnelException, IOException
参数类型说明
startlong起始位置
countlong读取数量
splitIndexlong分片索引(配合 getSplitCount() 使用)
compressCompressOption压缩配置
columnsList<Column>需下载的列集合
allocatorBufferAllocatorArrow 内存分配器

示例

try (ArrowRecordReader reader = session.openArrowRecordReader(0, 50000)) {
VectorSchemaRoot root = reader.read();
FieldVector idVector = root.getVector("user_id");
for (int i = 0; i < root.getRowCount(); i++) {
System.out.println(idVector.getObject(i));
}
}

分块并行下载示例

long totalRecords = session.getRecordCount();
int parallelism = 8;
ExecutorService pool = Executors.newFixedThreadPool(parallelism);

for (int i = 0; i < parallelism; i++) {
long start = i * (totalRecords / parallelism);
long count = (i == parallelism - 1)
? totalRecords - start
: totalRecords / parallelism;

pool.submit(() -> {
try (TunnelRecordReader reader = session.openRecordReader(start, count)) {
Record record;
while ((record = reader.read()) != null) {
processRecord(record);
}
}
});
}