JindoDistCp API 使用说明§

下载 JindoSDK 包§

下载最新的 tar.gz 包 jindosdk-x.x.x.tar.gz (下载页面)。

安装依赖包§

目前 JindoSDK/JindoDistCP 支持主流 Intel X86 的 Linux 和 Mac(不支持 Windows系统,Mac M1 系列也暂不支持)

以6.6.3版本为例,在 maven pom.xml 中添加 JindoDistCP 的依赖


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun.jindodata</groupId>
    <artifactId>jindo-distcp-example</artifactId>
    <version>1.0</version>

    <properties>
        <jindodata.version>6.6.3</jindodata.version>
        <hadoop.version>2.8.5</hadoop.version>
    </properties>

    <repositories>
        <!-- Add JindoData Maven Repository -->
        <repository>
            <id>jindodata</id>
            <url>https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
        </repository>
    </repositories>

    <dependencies>
        <!-- add jindo-distcp -->
        <dependency>
            <groupId>com.aliyun.jindodata</groupId>
            <artifactId>jindo-distcp</artifactId>
            <version>${jindodata.version}</version>
        </dependency>

        <!-- add hadoop with provided scope, your runtime should install hadoop dependency. -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>

示例代码§

以拷贝 HDFS 文件夹到阿里云 OSS 为例。

import com.aliyun.jindodata.distcp.util.DistCpCounter;
import com.aliyun.jindodata.distjob.request.DistJobResponse;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;


public class DistcpExample {
  static public void main(String[] args) throws IOException {

    Configuration conf = new Configuration();

    conf.set("fs.oss.impl", "com.aliyun.jindodata.oss.JindoOssFileSystem");
    conf.set("fs.AbstractFileSystem.oss.impl", "com.aliyun.jindodata.oss.OSS");
    // set accessKey, secret, endpoint and so on.
    conf.set("fs.oss.credentials.provider", "com.aliyun.jindodata.oss.auth.SimpleCredentialsProvider");
    conf.set("fs.oss.accessKeyId", "XXX");
    conf.set("fs.oss.accessKeySecret", "XXX");

    DistCpRequest request = new DistCpRequest();
    request.setSrcDir("hdfs:///user/root/random-data");
    request.setDestDir("oss://dls-bucket.cn-xxx.oss-dls.aliyuncs.com/dest");
    request.setWorkTempDir("hdfs:///user/root/tmp");
    request.setConf(conf);

    // Jindo DistCp Service
    DistCpService service = new DistCpService();
    service.buildAllJobs(request);
    service.runAllJobs();

    DistJobResponse response = service.collectAllJobResult();
    printResponse(response);
  }

  static public void printResponse(DistJobResponse response) {
    System.out.println("ErrorCode: " + response.getErrorCode());
    System.out.println("ErrorMsg: " + response.getErrorMsg());
    System.out.println("FileCopied: " + response.getCount(DistCpCounter.FILES_COPIED.getName()));
    System.out.println("BytesCopied: " + response.getCount(DistCpCounter.BYTES_COPIED.getName()));
  }
}

请求参数§

相关方法的具体描述可参考《使用Jindo DistCp》

方法 参数类型 说明 默认值 兼容
void setSrc(String src) 必选 设置源目录,支持的前缀有
hdfs://
oss://
s3://
cos://
obs://
- 4.3.0+
void setDest(String dest) 必选 设置目标目录,支持的前缀有
hdfs://
oss://
s3://
cos://
obs://
- 4.3.0+
void setConf(Configuration conf) 必选 设置 Configuration - 4.3.0+
void setMode(DistCpMode mode) 可选 设置 DistCp 模式,支持的模式有
COPY(拷贝)
DIFF(差异比较)
UPADTE(增量拷贝)
DistCpMode.COPY 4.3.0+
void setBandWidthLimit(int bandWidthLimit) 可选 设置单个节点的带宽限制,单位M -1 4.3.0+
void setOutputCodec(String outputCodec) 可选 设置压缩类型,支持编解码器有 gzip、gz、lzo、lzop、lzop、snappy keep(不更改压缩类型) 4.3.0+
void setStorageClass(String storageClass) 可选 设置目标存储策略,支持Standard、IA、Archive、ColdArchive Standard 4.3.0+
void setIncludeMatcherFile(String includeMatcherFile) 可选 设置包含符合规则的文件 - 4.3.0+
void setExcludeMatcherFile(String excludeMatcherFile) 可选 设置包含过滤规则的文件 - 4.3.0+
void setNumWorkers(int numWorkers) 可选 设置DistCp任务的并发度,对应MR任务中的 mapreduce.job.maps 10 4.3.0+
void setBatchPerJob(int batchPerJob) 可选 设置每个 distcp job 处理的文件数量 10 4.3.0+
void setBatchPerTask(int batchPerTask) 可选 设置每个 distcp task 处理的文件数量 10 4.3.0+
void setWorkTempDir(String workTempDir) 可选 设置临时目录 /tmp 4.3.0+
void setDisableChecksum(boolean disableChecksum) 可选 设置是否关闭 checksum 检查 false 4.3.0+
void setDeleteOnSuccess(boolean deleteOnSuccess) 可选 设置是否删除源文件,用于移动数据 false 4.3.0+
void setEnableTransaction(boolean enableTransaction) 可选 设置是否开启事务,以保证Job级别的原子性 false 4.3.0+
void setIgnoreFailures(boolean ignoreFailures) 可选 设置是否忽略拷贝任务中抛出的异常,避免中断任务 False 4.3.0+
void setEnableCMS(boolean enableCMS) 可选 是否开启监控告警 false 4.3.0+
void setEnablePreserveMetaStatus(boolean enablePreserveMetaStatus) 可选 设置是否开启保存元数据信息 false 4.4.0+

响应参数§

方法 说明 默认值
int getErrorCode() 获取错误码 0
String getErrorMsg() 获取错误原因 -
long getStartTime() 获取任务的开始时间,单位秒 -
long getEndTime() 获取任务的结束时间,单位秒 -
getCount(String key) 获取计数器值,key为计数器名称, -

计数器说明§

任务计数器 说明
COPY_FAILED copy失败的文件数
CHECKSUM_DIFF checksum校验失败的文件数,并计入COPY_FAILED
FILES_EXPECTED 预期的copy文件数量
BYTES_EXPECTED 预期的copy字节数
FILES_COPIED copy成功的文件数
BYTES_COPIED copy成功的字节数
FILES_SKIPPED update增量更新时跳过的文件数
BYTES_SKIPPED update增量更新时跳过的字节数
DIFF_FILES 不相同的文件数
SAME_FILES 经校验完全相同的文件数
DST_MISS 目标路径不存在的文件数,并计入DIFF_FILES
LENGTH_DIFF 源文件和目标文件大小不一致的数量,并计入DIFF_FILES
CHECKSUM_DIFF checksum校验失败的文件数,并计入DIFF_FILES
DIFF_FAILED diff操作异常的文件数,具体报错参见job日志