Blob 数据下载
备注
Blob 数据下载目前支持 Java SDK 和 Python SDK,Go SDK 暂不支持。
MaxCompute 的 BLOB 类型用于存储非结构化的二进制大对象数据(如图片、音频、文档等)。Blob 在表中以引用(Reference)的形式存储,实际的二进制数据需要通过 BlobManager(Java)或 StorageApiClient.read_blobs()(Python)单独下载。
前置条件
- 添加
odps-sdk-storage-api模块依赖 - 已初始化
MaxStorageClient - 目标表包含
BLOB类型列 - SDK 版本 >= 0.54.0
完整示例
- Java
- Python
import com.aliyun.odps.storage.api.MaxStorageClient;
import com.aliyun.odps.storage.api.TableIdentifier;
import com.aliyun.odps.storage.api.blob.Blob;
import com.aliyun.odps.storage.api.blob.BlobManager;
import com.aliyun.odps.storage.api.blob.BlobDataIterator;
import com.aliyun.odps.storage.api.read.TableReadSession;
import com.aliyun.odps.storage.api.read.InputSplit;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class BlobDownloadExample {
public static void main(String[] args) throws Exception {
// 1. 创建客户端
MaxStorageClient client = MaxStorageClient.builder()
.endpoint("https://service.cn-hangzhou.maxcompute.aliyun.com/api")
.credentialsProvider(credentialsProvider)
.build();
// 2. 读取包含 Blob 列的表
TableIdentifier tableId = TableIdentifier.of("my_project", "image_table");
TableReadSession session = client.createTableReadSessionBuilder(tableId)
.withColumns(Arrays.asList("id", "image_data"))
.build();
// 3. 获取 BlobManager
BlobManager blobManager = client.openBlobManager();
// 4. 读取表数据并收集 Blob 引用
for (InputSplit split : session.getSplits()) {
ArrowReader arrowReader = session.createReaderBuilder(split).build();
RecordReader recordReader = arrowReader.getAsRecordReader();
List<Blob> blobs = new ArrayList<>();
List<Long> ids = new ArrayList<>();
recordReader.forEach(record -> {
Blob blob = (Blob) record.get("image_data");
if (blob != null) {
blobs.add(blob);
ids.add((Long) record.get("id"));
}
});
// 5. 批量下载 Blob 数据
try (BlobDataIterator iterator = blobManager.batchDownload(blobs)) {
int index = 0;
while (iterator.hasNext()) {
try (InputStream data = iterator.next()) {
byte[] content = data.readAllBytes();
// 保存到本地文件
Path outputPath = Paths.get("/tmp/images/" + ids.get(index) + ".jpg");
Files.write(outputPath, content);
System.out.println("已下载: " + outputPath + " (" + content.length + " bytes)");
}
index++;
}
}
arrowReader.close();
recordReader.close();
}
}
}
from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiArrowClient
import os
# 1. 初始化 ODPS 客户端
odps = ODPS("<accessId>", "<accessKey>", "<projectName>", "<endpoint>")
# 2. 获取表对象并创建客户端
table = odps.get_table("image_table")
client = StorageApiArrowClient(odps, table)
# 3. 创建读取会话,指定包含 Blob 列的字段
read_resp = client.create_read_session(
required_data_columns=["id", "image_data"]
)
session_id = read_resp.session_id
# 4. 读取表数据并收集 Blob 引用
blob_refs = []
ids = []
for split_index in range(read_resp.splits_count):
reader = client.read_rows_arrow(session_id, split_index=split_index)
while True:
batch = reader.read()
if batch is None:
break
id_list = batch.column("id").to_pylist()
blob_list = batch.column("image_data").to_pylist()
for i, ref in enumerate(blob_list):
if ref is not None:
blob_refs.append(
ref.decode("utf-8") if isinstance(ref, bytes) else ref
)
ids.append(id_list[i])
# 5. 批量下载 Blob 数据
os.makedirs("/tmp/images", exist_ok=True)
for idx, (data, mime_type) in enumerate(client.read_blobs(blob_references=blob_refs)):
output_path = f"/tmp/images/{ids[idx]}.jpg"
with open(output_path, "wb") as f:
f.write(data)
print(f"已下载: {output_path} ({len(data)} bytes)")
代码说明
- 创建客户端:与 Storage API 读取相同,通过
MaxStorageClient.builder()构建。 - 读 取表数据:使用
TableReadSession读取包含 Blob 列的表,获取每行的 Blob 引用。 - 获取 BlobManager:通过
client.openBlobManager()获取 Blob 管理器实例。 - 收集 Blob 引用:通过行接口(
RecordReader)读取表数据,从 Record 中获取Blob对象。 - 批量下载:调用
blobManager.batchDownload(blobs)批量下载,返回BlobDataIterator迭代器。 - 处理数据:从迭代器中逐个获取
InputStream,读取实际的二进制内容。
配置选项
BlobManager 方法
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
download(Blob) | Blob 引用对象 | InputStream | 下载单个 Blob,调用方必须关闭流 |
batchDownload(List<Blob>) | Blob 引用列表 | BlobDataIterator | 批量下载,返回顺序与输入一致 |
Blob 构造方式
| 方法 | 说明 |
|---|---|
Blob.fromReference(String) | 从引用字符串构造 Blob 对象 |
| Record 中直接获取 | 行接口自动返回 Blob 对象,无需手动构造 |
单条下载
适用于 Blob 数量较少或需要按需下载的场景:
- Java
- Python
BlobManager blobManager = client.openBlobManager();
// 从 Record 中获取 Blob 引用
Blob blobRef = (Blob) record.get("image_data");
// 单条下载
try (InputStream data = blobManager.download(blobRef)) {
byte[] content = data.readAllBytes();
System.out.println("Blob 大小: " + content.length + " bytes");
}
# 从 Arrow 数据中获取 Blob 引用
blob_ref = ref.decode("utf-8") if isinstance(ref, bytes) else ref
# 单条下载(传入单元素列表)
for data, mime_type in client.read_blobs(blob_references=[blob_ref]):
print(f"Blob 大小: {len(data)} bytes")
批量下载
适用于大量 Blob 的高效下载,单次请求传输多个 Blob:
- Java
- Python
BlobManager blobManager = client.openBlobManager();
// 收集多个 Blob 引用
List<Blob> blobs = new ArrayList<>();
// ... 从表数据中收集 ...
// 批量下载
try (BlobDataIterator iterator = blobManager.batchDownload(blobs)) {
while (iterator.hasNext()) {
try (InputStream data = iterator.next()) {
byte[] content = data.readAllBytes();
// 处理 content...
}
}
} catch (BlobDownloadException e) {
// 获取失败的 Blob 引用
Blob failedBlob = e.getFailedBlobRef();
System.err.println("下载失败: " + failedBlob.getReference());
}
# 收集多个 Blob 引用
blob_refs = []
# ... 从表数据中收集 ...
# 批量下载
try:
for data, mime_type in client.read_blobs(blob_references=blob_refs):
# 处理 data...
pass
except Exception as e:
print(f"下载失败: {e}")
Arrow 列接口读取 Blob
如果使用 Arrow 列接口而非行接口,需要手动从 VarBinaryVector 中解析 Blob 引用:
- Java
- Python
import org.apache.arrow.vector.VarBinaryVector;
import java.nio.charset.StandardCharsets;
for (InputSplit split : session.getSplits()) {
try (ArrowReader reader = session.createReaderBuilder(split).build()) {
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
VarBinaryVector blobVector = (VarBinaryVector) root.getVector("image_data");
List<Blob> blobRefs = new ArrayList<>();
for (int row = 0; row < root.getRowCount(); row++) {
if (!blobVector.isNull(row)) {
// 读取时 VarBinaryVector 中是引用字符串的 UTF-8 字节
String reference = new String(blobVector.get(row), StandardCharsets.UTF_8);
blobRefs.add(Blob.fromReference(reference));
}
}
if (!blobRefs.isEmpty()) {
try (BlobDataIterator iterator = blobManager.batchDownload(blobRefs)) {
while (iterator.hasNext()) {
try (InputStream data = iterator.next()) {
byte[] content = data.readAllBytes();
// 处理 content...
}
}
}
}
}
}
}
for split_index in range(read_resp.splits_count):
reader = client.read_rows_arrow(session_id, split_index=split_index)
while True:
batch = reader.read()
if batch is None:
break
blob_vector = batch.column("image_data")
blob_refs = []
for row in range(len(batch)):
ref = blob_vector[row].as_py()
if ref is not None:
# Arrow 读取时 VarBinary 中是引用字符串的 UTF-8 字节
blob_refs.append(
ref.decode("utf-8") if isinstance(ref, bytes) else ref
)
if blob_refs:
for data, mime_type in client.read_blobs(blob_references=blob_refs):
# 处理 data...
pass