跳到主要内容

Tunnel 下载

TableTunnel 是 MaxCompute 的标准数据通道,适用于大批量数据的导出场景。支持压缩传输、多线程并行下载,无行数限制。

前置条件

  • 已初始化 Odps 客户端对象
  • 添加 odps-sdk-tunnel 模块依赖
  • 对目标表具有读取权限
  • SDK 版本 >= 0.48.0

完整示例

import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.TunnelRecordReader;

public class TunnelDownloadExample {

public static void main(String[] args) throws Exception {
// 假设 odps 客户端已初始化
Odps odps = getOdpsClient();

// 1. 创建 TableTunnel 对象
TableTunnel tunnel = new TableTunnel(odps);

// 2. 创建 DownloadSession
DownloadSession session = tunnel.buildDownloadSession()
.setProjectName("my_project")
.setTableName("user_behavior")
.setPartitionSpec(new PartitionSpec("dt=20250101"))
.build();

// 3. 获取总行数
long totalRecords = session.getRecordCount();
System.out.println("总记录数: " + totalRecords);

// 4. 打开 Reader 并读取数据
try (TunnelRecordReader reader = session.openRecordReader(0, totalRecords)) {
while (reader.hasNext()) {
Record record = reader.next();
System.out.println("user_id=" + record.get("user_id")
+ ", action=" + record.get("action"));
}
}
}
}

代码说明

  1. 创建 TableTunnel:基于已初始化的 Odps 对象创建 TableTunnel 实例,作为 Tunnel 操作的入口。
  2. 构建 DownloadSession:通过 Builder 模式设置项目名、表名、分区等参数,调用 build() 创建会话。会话创建时服务端会准备数据快照。
  3. 获取记录总数session.getRecordCount() 返回可下载的记录总行数,用于规划分块下载。
  4. 打开 ReaderopenRecordReader(start, count) 指定起始行号和读取行数,返回 TunnelRecordReader
  5. 迭代读取:通过 hasNext() / next() 逐条读取 Record 对象。

配置选项

DownloadSession 构建参数

方法类型必需默认值说明
setProjectName(String)String-目标项目名称
setTableName(String)String-目标表名称
setPartitionSpec(PartitionSpec)PartitionSpecnull分区规格,分区表必须指定
setShardId(Long)Longnull指定分片 ID
setAsyncMode(boolean)booleanfalse异步初始化模式(大表推荐)
setWaitAsyncBuild(boolean)booleanfalse阻塞等待异步会话就绪

openRecordReader 参数

参数类型约束说明
startlong>= 0读取起始行号
countlong>= 1读取记录数量
optionCompressOption-压缩配置(默认启用 zlib 压缩)
columnsList<Column>非空指定下载的列集合(列裁剪)
disableModifiedCheckboolean-禁用数据版本校验

CompressOption 压缩选项

压缩算法说明
CompressOption.CompressAlgorithm.ODPS_RAW不压缩
CompressOption.CompressAlgorithm.ODPS_ZLIBzlib 压缩(默认)
CompressOption.CompressAlgorithm.ODPS_SNAPPYSnappy 压缩(更快,压缩比略低)

多线程并行下载

对于大数据量场景,可将数据按行号范围拆分为多个块,使用线程池并行下载:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;

public class ParallelDownloadExample {

public static void main(String[] args) throws Exception {
Odps odps = getOdpsClient();
TableTunnel tunnel = new TableTunnel(odps);

DownloadSession session = tunnel.buildDownloadSession()
.setProjectName("my_project")
.setTableName("large_table")
.build();

long totalRecords = session.getRecordCount();
int parallelism = 8; // 根据 CPU 核心数和网络带宽设置

ExecutorService pool = Executors.newFixedThreadPool(parallelism);
List<Future<?>> futures = new ArrayList<>();

for (int i = 0; i < parallelism; i++) {
long start = i * (totalRecords / parallelism);
long count = (i == parallelism - 1)
? totalRecords - start
: totalRecords / parallelism;

futures.add(pool.submit(() -> {
try (TunnelRecordReader reader = session.openRecordReader(start, count,
new CompressOption(CompressOption.CompressAlgorithm.ODPS_ZLIB, 1, 0))) {
while (reader.hasNext()) {
Record record = reader.next();
processRecord(record);
}
} catch (Exception e) {
throw new RuntimeException("下载失败, start=" + start, e);
}
}));
}

// 等待所有下载任务完成
for (Future<?> future : futures) {
future.get();
}
pool.shutdown();
System.out.println("下载完成,共 " + totalRecords + " 条记录");
}
}

异步创建模式(大表)

对于数据量特别大的表,Session 创建可能需要较长时间。可以使用异步模式避免阻塞:

TableTunnel.DownloadSessionBuilder builder = tunnel.buildDownloadSession()
.setProjectName("big_data_project")
.setTableName("huge_table")
.setAsyncMode(true);

DownloadSession session = builder.build();

// 轮询等待会话就绪,每 5 秒检查一次,最长等待 300 秒
boolean ready = builder.wait(session, 5, 300);
if (ready) {
System.out.println("会话就绪,记录数: " + session.getRecordCount());
// 开始下载...
} else {
System.err.println("会话创建超时");
}

注意事项

  • 不支持 Transactional 表:Tunnel Download 目前不支持 Transactional 表的下载,请使用 Storage API
  • 超时机制:默认 300 秒无数据传输时,服务端会断开连接。对于处理逻辑较重的场景,建议缩小每次的 count 值。
  • 数据一致性:DownloadSession 创建时会生成数据快照,读取期间数据不会变化。
  • 分区表必须指定分区:对于分区表,setPartitionSpec() 是必需的,否则会抛出异常。
  • 资源释放:务必使用 try-with-resources 语法关闭 TunnelRecordReader,避免连接泄漏。
  • 并行度设置:建议根据 CPU 核心数和网络带宽合理设置,一般 4-16 个线程较为合适。

相关文档