Upsert
对于 Transactional 表,用户可以使用 UpsertSession
来执行数据插入或更新操作。
目录
我们通过两个核心类,来实现完整的,可支撑高并发的 Upsert 操作,并提供了一个完整的使用示例。
UpsertSession
概述
UpsertSession
接口用于管理和操作数据插入或更新的会话。通过该接口,用户可以 获取会话的相关信息、提交或中止会话、创建新的记录对象等。
初始化
通常由TableTunnel#buildUpsertSession方法创建 Builder, Builder接口的build方法来构建 UpsertSession。
方法
获取Session ID
String getId();
返回当前会话的ID,可以基于这个ID进行重建会话操作。
获取当前 Quota
public String getQuotaName();
返回当 前使用的Quota名称,注意不是QuotaNickName。
获取Session状态
String getStatus() throws TunnelException;
返回当前会话的状态码,可能的状态包括:
- normal 正常
- committing 提交中
- committed 已提交
- expired 过期
- critical 错误
- aborted 已中止
获取表结构
TableSchema getSchema();
返回当前会话写入的表的表结构。
提交UpsertSession
void commit(boolean async) throws TunnelException;
提交当前会话。可以选择是否异步提交。 当选择异步提交时,提交操作像服务端发送提交请求后立即返回,不会等待数据提交完成,这也意味着数据不会立即可见。
中止UpsertSession
void abort() throws TunnelException;
中止当前会话。
清理客户端本地资源
void close();
清理客户端本地资源。
创建一个Record对象
Record newRecord();
创建并返回一个新的 Record
对象。
这个Record
是UpsertRecord
的实例,这个实例包含执行upsert操作必要的一些信息。
因此当使用Upsert操作时,永远记得使用这个方法,来获取Record
的实例。
构建UpsertStream
UpsertStream.Builder buildUpsertStream();
返回一个用于构建 UpsertStream
的 Builder
对象。
UpsertStream
是用于执行数据插入或更新操作的核心接口,详见 UpsertStream。
Builder 接口
UpsertSession.Builder
接口用于构建 UpsertSession
对象。
通常由TableTunnel#buildUpsertSession 方法创建。
获取和设置Upsert ID
String getUpsertId();
UpsertSession.Builder setUpsertId(String upsertId);
获取和设置 Upsert ID
。
获取和设置Schema名称
String getSchemaName();
UpsertSession.Builder setSchemaName(String schemaName);
获取和设置 Schema
名称。
获取和设置PartitionSpec
String getPartitionSpec();
UpsertSession.Builder setPartitionSpec(PartitionSpec spec);
UpsertSession.Builder setPartitionSpec(String spec);
获取和设置 PartitionSpec
。
获取和设置SlotNum
long getSlotNum();
UpsertSession.Builder setSlotNum(long slotNum);
获取和设置 SlotNum
。
获取和设置CommitTimeout
long getCommitTimeout();
UpsertSession.Builder setCommitTimeout(long commitTimeoutMs);
获取和设置提交超时时间(毫秒)。
设置Netty进行网络IO的线程数
UpsertSession.Builder setNetworkThreadNum(int threadNum);
设置Netty进行网络IO的线程池(EventLoop)的线程数,默认为1。
设置最大并发数
UpsertSession.Builder setConcurrentNum(int concurrentNum);
设置最大并发数(允许同时存在的Channel数量),默认为20,设为小于0为无限制。
设置连接超时时间
UpsertSession.Builder setConnectTimeout(long timeout);
设置建立连接的超时时间(毫秒),默认为180 * 1000。
设置请求响应超时时间
UpsertSession.Builder setReadTimeout(long timeout);
设置请求响应的超时时间(毫秒),默认为300 * 1000。
获取和设置Session生命周期
long getLifecycle();
UpsertSession.Builder setLifecycle(long lifecycle);
获取和设置Session生命周期(小时),有效取值范围为1 - 24,指定有效取值范围以外的值该参数会被忽略,使用服务端默认值。
构建UpsertSession
UpsertSession build() throws TunnelException, IOException;
构建并返回一个 UpsertSession
对象。
总结
通过 UpsertSession
接口和其 Builder
,用户可以方便地管理数据插入或更新的会话,获取会话信息,提交或中止会话,以及创建新的记录对象。
UpsertStream
概述
在当前版本,UpsertStreamImpl
类是 UpsertStream
接口唯一的实现类,因此本文主要介绍UpsertStreamImpl
的类接口。
通过该类,用户可以对Delta Table
进行数据的插入、更新、删除操作,并支持数据的缓冲和批量提交。
构造方法
UpsertStreamImpl(Builder builder)
public UpsertStreamImpl(Builder builder)throws IOException,TunnelException
通过 Builder
对象构建 UpsertStreamImpl
实例的构造方法,通常由 UpsertSession#buildUpsertStream()
方法调用。
方法
插入或更新记录
@Override
public void upsert(Record record)throws IOException,TunnelException
插入或更新一条记录。
参数列表:
record:注:参数record
应当是UpsertRecord
的实例
,这个实例可以通过 UpsertSession#newRecord()
方法创建。
UpsertRecord
内部维护了一些元数据隐藏列,因此用户不应当手动创建ArrayRecord
对象将其传入。