跳到主要内容

SQLExecutor

概述

SQLExecutor 是 MaxCompute 为执行 SQL 提供的统一的接口,用户可以通过这一接口,提交离线作业和 MCQA 作业,并提供了丰富的配置和方法,来满足用户多样的需求。

目录

  1. 构建SQLExecutor实例
  2. 执行SQL查询
  3. 获取查询结果
  4. 其他接口

构建SQLExecutor实例

odps-sdk 使用SQLExecutorBuilder类来构建SQLExecutor实例,SQLExecutorBuilder 采用构造者模式,提供了一系列方法来对SQLExecutor进行配置。 通过这些方法,用户可以设置执行模式、任务名称、服务名称等参数,通常每个参数都有默认值,用户可以按需使用。

构造器方法

使用SQLExecutorBuilder.builder方法可以构建一个SQLExecutorBuilder实例。调用build 方法可以构建SQLExecutor实例。全部配置项参考配置选项

public SQLExecutor build()throws OdpsException

返回值

  • SQLExecutor实例。

示例代码

SQLExecutor executor = SQLExecutorBuilder.builder()
.odps(odps)
.taskName("my_task")
.build();

配置选项

SQLExecutorBuilder类提供了多种配置选项,以下是各配置选项的详细说明。

设置ODPS对象(必选)

使用odps 方法可以设置ODPS对象。构建ODPS对象的方式请参考构建 ODPS 客户端

public SQLExecutorBuilder odps(Odps odps)

参数

  • odpsOdps对象。

设置任务名称

使用taskName方法可以设置任务名称。对于 MCQA 1.0 作业,默认 taskName 为 console_sqlrt_task。 对于 MCQA 2.0 作业,默认 taskName 为 AnonymousMCQATask。 对于离线作业,无法配置 taskName,会使用 AnonymousSQLTask 作为 taskName。

public SQLExecutorBuilder taskName(String taskName)

参数

  • taskName:任务名称。

设置Tunnel Endpoint

使用tunnelEndpoint方法可以设置Tunnel Endpoint。

这个接口常用于当用户网络环境受限,需要进行网络打通时,由odps endpoint自动路由的tunnel endpoint无法直接访问,需要设置指定tunnel endpoint时使用。

public SQLExecutorBuilder tunnelEndpoint(String tunnelEndpoint)

参数

  • tunnelEndpoint:Tunnel Endpoint。

设置Quota名称

使用quotaName方法可以设置计算资源组的名称。 对于MCQA 2.0的作业,此项为必填项,需要指定相应的交互式资源组名称,否则在创建SQLExecutor时将抛出异常。

public SQLExecutorBuilder quotaName(String quotaName)

参数

  • quotaName:Quota名称(quota nick name)。

设置执行模式

使用executeMode方法可以设置执行模式。

public SQLExecutorBuilder executeMode(ExecuteMode executeMode)

参数

  • executeMode:执行模式。可选项为 1. INTERACTIVE:交互式执行。(MCQA);2. OFFLINE:离线执行。

启用 MCQA V2

public SQLExecutorBuilder enableMcqaV2(boolean mcqaV2)
  • 参数mcqaV2 - 布尔值,表示是否启用 MCQA V2。

启用命令API

使用enableCommandApi方法可以启用命令API。

CommandApiSQLExecutor 在标准SQL语法之外,额外提供的一组命令支持。用来执行像desc table,list table这样的非标准SQL命令。 目前 MaxCompute SQL 也在逐步支持类似扩展命令,因此当前版本部分扩展命令,开启这个选项后,行为会与不开启这个选项后不一致。

public SQLExecutorBuilder enableCommandApi(boolean useCommandApi)

参数

  • useCommandApi:是否启用命令API。

启用三层模型支持

使用enableOdpsNamespaceSchema方法可以启用ODPS命名空间三层模型(Schema)。

三层模型是 MaxCompute 推出的新模式,将原project.table扩展为为project.schema.table模式

public SQLExecutorBuilder enableOdpsNamespaceSchema(boolean odpsNamespaceSchema)

参数

  • odpsNamespaceSchema:是否开启三层模型支持。

是否启用 InstanceTunnel

在获取作业结果时,是否使用 instance tunnel 来取结果。默认值为true

当不开启instance tunnel时,获取结果时的Record的中每条数据类型均为String ,忽略各列在ODPS中的真实类型,且会有最大长度限制(通常为10000),且无法指定offsetlimit等信息。 但通常执行速度更快,用户可以根据使用场景选择是否关闭instance tunnel

public SQLExecutorBuilder useInstanceTunnel(boolean useInstanceTunnel)
  • 参数useInstanceTunnel - 布尔值,表示是否启用 InstanceTunnel。

设置InstanceTunnel取结果时的链接超时时间

public SQLExecutorBuilder tunnelSocketTimeout(int tunnelSocketTimeout)
  • 参数tunnelSocketTimeout - 整数,表示超时时间(毫秒)。

设置InstanceTunnel取结果时的读取超时时间

public SQLExecutorBuilder tunnelReadTimeout(int tunnelReadTimeout)
  • 参数tunnelReadTimeout - 整数,表示超时时间(毫秒)。

设置使用 InstanceTunnel 获取结果的最大重试次数

public SQLExecutorBuilder tunnelGetResultMaxRetryTime(int tunnelGetResultMaxRetryTime)
  • 参数tunnelGetResultMaxRetryTime - 整数,表示最大重试次数。

配置当前使用的执行器池

通常由 SQLExecutorPool 自动调用

SQLExecutorBuilder setPool(SQLExecutorPool pool)
  • 参数pool - SQLExecutorPool 对象,表示执行器池。

设置恢复实例

用来从指定实例恢复SQLExecutor

对于MCQA 1.0作业,instance为Session实例,这项操作表示重新链接到给定的Session。 对于其他类型作业,instance为SQLTask实例,这项操作表示重新链接到给定的SQLTask

public SQLExecutorBuilder recoverFrom(Instance instance)
  • 参数instance - Instance 对象,表示恢复的实例。

设置离线作业的优先级

public SQLExecutorBuilder offlineJobPriority(Integer offlineJobPriority)
  • 参数offlineJobPriority - 整数,表示离线作业的优先级。

加速失败回退策略(仅MCQA1.0)

当执行失败时,是否回退到离线查询,采用什么策略回退。

public SQLExecutorBuilder fallbackPolicy(FallbackPolicy fallbackPolicy)
  • 参数fallbackPolicy - FallbackPolicy 对象,表示回退策略。

启用或禁用重新连接Session功能(仅MCQA1.0)

在MCQA1.0模式下,当发生当前链接找不到Session时(可能是Session被停止或超时),是否启用重新连接功能。 Session 是MCQA1.0独有的概念,详细信息可以参考MCQA 1.0

public SQLExecutorBuilder enableReattach(boolean enableReattach)
  • 描述:启用或禁用重新连接功能。
  • 参数enableReattach - 布尔值,表示是否启用重新连接。
  • 返回值SQLExecutorBuilder 实例。

设置属性(仅MCQA1.0)

配置创建MCQA1.0 Session时,使用的的properties

public SQLExecutorBuilder properties(Map<String, String> properties)

参数

  • properties:属性的键值对。

设置服务名称(仅MCQA1.0)

使用serviceName方法可以设置使用MCQA1.0时,链接的Session名称。 Session 是MCQA1.0独有的概念,详细信息可以参考MCQA 1.0

public SQLExecutorBuilder serviceName(String serviceName)

参数

  • serviceNameSession名称。

允许MCQA1.0执行非Select操作(仅MCQA1.0)

当关闭这项选项时,MCQA1.0作业在遇到非选择操作,会回退为离线作业。

public SQLExecutorBuilder sessionSupportNonSelect(boolean sessionSupportNonSelect)
  • 参数sessionSupportNonSelect - 布尔值,表示是否支持非选择操作。

设置附加超时时间(仅MCQA1.0)

使用attachTimeout方法可以设置当使用MCQA1.0时,链接Session时的超时时间。Session 是MCQA1.0独有的概念,详细信息可以参考MCQA 1.0

public SQLExecutorBuilder attachTimeout(Long timeout)

参数

  • timeout:超时时间。单位为毫秒。

设置运行集群的名称(仅MCQA1.0)

实际上,这个接口大部分情况下无用,因为用户没有手段得知运行集群的名称。通常仅作为内部排查问题使用。

public SQLExecutorBuilder runningCluster(String runningCluster)
  • 参数runningCluster - 字符串,表示运行集群的名称。

设置Quota(仅MCQA2.0)

设置Quota名称相比,这个方法可以配置一个已经获取好的Quota示例,这可以避免通过quotaName从服务端获取Quota的过程,使通过缓存Quota来提高性能成为可能。

public SQLExecutorBuilder quota(Quota quota)
  • 参数quota - Quota 实例,通过 Quotas#getWlmQuota 获得,或通过 Quota#setMcqaConnHeader(String) 方法加载。

设置RegionId(仅MCQA2.0)

MCQA 2.0 作业会通过 设置Quota名称 获取的 quotaName 来从服务端获取 Quota,这使用的是 Project 所在的 RegionId, 然而,我们允许用户通过这个方法,指定 Quota 所在的 RegionId,尽管像其他 region 的 Quota 提及作业有可能失败。

public SQLExecutorBuilder regionId(String regionId)
  • 参数regionId - 区域名,表示 Quota 所在的区域,通常不需要设置。

执行SQL查询

SQLExecutor 的 run 方法可以用来运行 SQL 查询。

方法

void run(String sql, Map<String, String> hint) throws OdpsException;

参数:

  • sql: SQL语句。
  • hint: 查询所需的提示参数。比如:odps.sql.type.system.odps2 = true等。

示例

SQLExecutor executor = ...; // 实例化 SQLExecutor
String sql = "SELECT * FROM table_name";
Map<String, String> hint = new HashMap<>();
executor.run(sql, hint);

获取查询结果

信息

当关闭instance tunnel时,得到的ArrayRecord结果均为String 类型,忽略各列在ODPS中的真实类型,且会有最大长度限制(通常为10000),且指定offsetlimit 等信息不为null时,会报错。

获取结果列表

这组接口,返回的结果为一个ArrayRecord列表。程序会将结果全部读到内存里,所以如果结果量太大的话,可能会导致内存溢出。

List<Record> getResult() throws OdpsException, IOException;

List<Record> getResult(Long countLimit) throws OdpsException, IOException;

List<Record> getResult(Long offset, Long countLimit, Long sizeLimit) throws OdpsException, IOException;

List<Record> getResult(Long offset, Long countLimit, Long sizeLimit, boolean limitEnabled) throws OdpsException, IOException;

参数

  • countLimit:结果数量限制。默认无限制,但仍然会受限于limitEnabled参数。
  • offset:结果偏移量。默认值为0
  • countLimit:结果数量限制。默认无限制
  • limitEnabled :是否拉取全部结果,默认为false,即结果会被限制为1w条。如果开启,则会拉取全部结果。拉取全部结果需要用户具有更高权限(具有sql所访问的所有源表的查询、下载权限)。

获取结果迭代器

这组接口,返回的结果为一个ResultSet迭代器,ResultSet实现了Iterator<Record>Iterable<Record>。 程序会将结果分批读到内存里,减少了内存溢出的可能。

ResultSet getResultSet() throws OdpsException, IOException;

ResultSet getResultSet(Long countLimit) throws OdpsException, IOException;

ResultSet getResultSet(Long offset, Long countLimit, Long sizeLimit) throws OdpsException, IOException;

ResultSet getResultSet(Long offset, Long countLimit, Long sizeLimit, boolean limitEnabled) throws OdpsException, IOException;

参数

  • countLimit:结果数量限制。默认无限制,但仍然会受限于limitEnabled参数。
  • offset:结果偏移量。默认值为0
  • countLimit:结果数量限制。默认无限制
  • limitEnabled :是否拉取全部结果,默认为false,即结果会被限制为1w条。如果开启,则会拉取全部结果。拉取全部结果需要用户具有更高权限(具有sql所访问的所有源表的查询、下载权限)。

其他接口

关闭SQLExecutor

默认行为关闭该executor,如果为连接池模式,则归还Executor到连接池中。

void close();

获取Executor的ID

每个SQLExecutor实例都有一个ID,用于标识该实例。值为创建该实例时生成的UUID。

String getId();

获取当前查询的任务名称

String getTaskName();

获取当前查询ID

返回 null 表示 Executor 尚未初始化,或未执行作业。 对于 MCQA 1.0 作业,返回值为instanceId + '_' + subqueryId,对于其他作业,返回值为 instanceId。

String getQueryId();

获取子查询ID

返回 -1 表示 Executor 尚未初始化,或未执行作业。 对于 MCQA 1.0 作业,返回值为子查询的id 对于其他作业,返回值始终为 -1

int getSubQueryId();

获取当前查询的Logview

默认Logview有效期为7天,当生成logview出错时,该接口会返回null,不会抛出异常。

String getLogView();

获取当前查询的实例

当使用MCQA1.0时,该接口返回的是Session实例,否则返回上一个查询的SQLTask实例。返回值可能为null。

Instance getInstance();

检查该 Executor 是否活跃

离线查询模式,该接口永远返回false。 MCQA1.0查询模式,该接口返回的是Session状态。 MCQA2.0查询模式,该接口永远返回false

boolean isActive();

取消当前查询

void cancel() throws OdpsException;

获取当前查询的进度信息

List<Instance.StageProgress> getProgress() throws OdpsException;

获取当前查询的执行日志

List<String> getExecutionLog();

获取当前查询的概要信息

String getSummary() throws OdpsException;

判断当前执行的SQL在语法结构上是否具备结果集

boolean hasResultSet();

判断当前 SQL 是否在交互模式中

boolean isRunningInInteractiveMode();