Upsert
对于 Transactional 表,用户可以使用 UpsertSession 来执行数据插入或更新操作。
目录
我们通过两个核心类,来实现完整的,可支撑高并发的 Upsert 操作,并提供了一个完整的使用示例。
UpsertSession
概述
UpsertSession 接口用于管理和操作数据插入或更新的会话。通过该接口,用户可以获取会话的相关信息、提交或中止会话、创建新的记录对象等。
初始化
通常由TableTunnel#buildUpsertSession方法创建 Builder, Builder接口的build方法来构建 UpsertSession。
方法
获取Session ID
String getId();
返回当前会话的ID,可以基于这个ID进行重建会话操作。
获取当前 Quota
public String getQuotaName();
返回当前使用的Quota名称,注意不是QuotaNickName。
获取Session状态
String getStatus() throws TunnelException;
返回当前会话的状态码,可能的状态包括:
- normal 正常
- committing 提交中
- committed 已提交
- expired 过期
- critical 错误
- aborted 已中止
获取表结构
TableSchema getSchema();
返回当前会话写入的表的表结构。
提交UpsertSession
void commit(boolean async) throws TunnelException;
提交当前会话。可以选择是否异步提交。 当选择异步提交时,提交操作像服务端发送提交请求后立即返回,不会等待数据提交完成,这也意味着数据不会立即可见。
中止UpsertSession
void abort() throws TunnelException;
中止当前会话。
清理客户端本地资源
void close();
清理客户端本地资源。
创建一个Record对象
Record newRecord();
创建并返回一个新的 Record 对象。
这个Record是UpsertRecord的实例,这个实例包含执行upsert操作必要的一些信息。
因此当使用Upsert操作时,永远记得使用这个方法,来获取Record的实例。
构建UpsertStream
UpsertStream.Builder buildUpsertStream();
返回一个用于构建 UpsertStream 的 Builder 对象。
UpsertStream 是用于执行数据插入或更新操作的核心接口,详见 UpsertStream。
Builder 接口
UpsertSession.Builder 接口用于构建 UpsertSession 对象。
通常由TableTunnel#buildUpsertSession 方法创建。
获取和设置Upsert ID
String getUpsertId();
UpsertSession.Builder setUpsertId(String upsertId);
获取和设置 Upsert ID。
获取和设置Schema名称
String getSchemaName();
UpsertSession.Builder setSchemaName(String schemaName);
获取和设置 Schema 名称。
获取和设置PartitionSpec
String getPartitionSpec();
UpsertSession.Builder setPartitionSpec(PartitionSpec spec);
UpsertSession.Builder setPartitionSpec(String spec);
获取和设置 PartitionSpec。
获取和设置SlotNum
long getSlotNum();
UpsertSession.Builder setSlotNum(long slotNum);
获取和设置 SlotNum。
获取和设置CommitTimeout
long getCommitTimeout();
UpsertSession.Builder setCommitTimeout(long commitTimeoutMs);
获取和设置提交超时 时间(毫秒)。
设置Netty进行网络IO的线程数
UpsertSession.Builder setNetworkThreadNum(int threadNum);
设置Netty进行网络IO的线程池(EventLoop)的线程数,默认为1。
设置最大并发数
UpsertSession.Builder setConcurrentNum(int concurrentNum);
设置最大并发数(允许同时存在的Channel数量),默认为20,设为小于0为无限制。
设置连接超时时间
UpsertSession.Builder setConnectTimeout(long timeout);
设置建立连接的超时时间(毫秒),默认为180 * 1000。
设置请求响应超时时间
UpsertSession.Builder setReadTimeout(long timeout);
设置请求响应的超时时间(毫秒),默认为300 * 1000。
获取和设置Session生命周期
long getLifecycle();
UpsertSession.Builder setLifecycle(long lifecycle);
获取和设置Session生命周期(小时),有效取值范围为1 - 24,指定有效取值范围以外的值该参数会被忽略,使用服务端默认值。
构建UpsertSession
UpsertSession build() throws TunnelException, IOException;
构建并返回一个 UpsertSession 对象。
总结
通过 UpsertSession 接口和其 Builder,用户可以方便地管理数据插入或更新的会话,获取会话信息,提交或中止会话,以及创建新的记录对象。
UpsertStream
概述
在当前版本,UpsertStreamImpl 类是 UpsertStream接口唯一的实现类,因此本文主要介绍UpsertStreamImpl
的类接口。
通过该类,用户可以对Delta Table进行数据的插入、更新、删除操作,并支持数据的缓冲和批量提交。
构造方法
UpsertStreamImpl(Builder builder)
public UpsertStreamImpl(Builder builder)throws IOException,TunnelException
通过 Builder 对象构建 UpsertStreamImpl
实例的构造方法,通常由 UpsertSession#buildUpsertStream() 方法调用。
方法
插入或更新记录
@Override
public void upsert(Record record)throws IOException,TunnelException
插入或更新一条记录。
参数列表:
record:注:参数record应当是UpsertRecord的实例
,这个实例可以通过 UpsertSession#newRecord() 方法创建。
UpsertRecord内部维护了一些元数据隐藏列,因此用户不应当手动创建ArrayRecord对象将其传入。
插入或更新部分列
@Override
public void upsert(Record record,List<String> upsertCols)throws IOException,TunnelException
插入或更新一条记录的部分列。
record : UpsertRecord
的实例,这个实例可以通过 UpsertSession#newRecord() 方法创建。
upsertCols : 一个包含要更新的列名的列表。
删除记录
@Override
public void delete(Record record)throws IOException,TunnelException
删除一条记录。
record : UpsertRecord
的实例,这个实例可以通过 UpsertSession#newRecord() 方法创建。