UpsertStream
概述
在当前版本,UpsertStreamImpl
类是 UpsertStream
接口唯一的实现类,因此本文主要介绍UpsertStreamImpl
的类接口。
通过该类,用户可以对Delta Table
进行数据的插入、更新、删除操作,并支持数据的缓冲和批量提交。
构造方法
UpsertStreamImpl(Builder builder)
public UpsertStreamImpl(Builder builder)throws IOException,TunnelException
通过 Builder
对象构建 UpsertStreamImpl
实例的构造方法,通常由 UpsertSession#buildUpsertStream()
方法调用。
方法
插入或更新记录
@Override
public void upsert(Record record)throws IOException,TunnelException
插入或更新一条记录。
参数列表:
record:注:参数record
应当是UpsertRecord
的实例
,这个实例可以通过 UpsertSession#newRecord()
方法创建。
UpsertRecord
内部维护了一些元数据隐藏列,因此用户不应当手动创建ArrayRecord
对象将其传入。
插入或更新部分列
@Override
public void upsert(Record record,List<String> upsertCols)throws IOException,TunnelException
插入或更新一条记录的部分列。
record : UpsertRecord
的实例,这个实例可以通过 UpsertSession#newRecord()
方法创建。
upsertCols : 一个包含要更新的列名的列表。
删除记录
@Override
public void delete(Record record)throws IOException,TunnelException
删除一条记录。
record : UpsertRecord
的实例,这个实例可以通过 UpsertSession#newRecord()
方法创建。
刷新缓冲区
@Override
public void flush()throws IOException,TunnelException
刷新缓冲区,将缓冲区中的数据提交到服务器。
关闭流
@Override
public void close()throws IOException,TunnelException
关闭流,提交所有未提交的数据。
重置缓冲区
@Override
public void reset()throws IOException
重置缓冲区,清空所有未提交的数据。
重试逻辑
UpsertStream 通过用户传入 Listener
监听器来处理重试逻辑。当用户不传入 Listener
时,UpsertStream
将使用一个默认的 Listener
实例(UpsertSessionImpl.DefaultUpsertSteamListener
)
这个默认的Listener
实例,遵循TableTunnel
默认的重试逻辑。
十分推荐当用户实现自定义Listener
时,继承UpsertSessionImpl.DefaultUpsertSteamListener
,以使TableTunnel
的重试逻辑生效。
Builder 类
UpsertStreamImpl.Builder
类是构建 UpsertStreamImpl
实例的基石,它提供了灵活的配置选项以便用户根据需求定制 UpsertStream
的行为。
此构造器主要 通过 UpsertSession#buildUpsertStream()
方法被调用,允许用户在创建 UpsertStream
时精确控制多个参数,
以优化数据插入或更新的性能和资源使用。
方法详情
设置会话对象
- 方法签名:
public Builder setSession(UpsertSessionImpl session)
- 功能说明:
为
UpsertStream
指定关联的UpsertSessionImpl
实例。这是UpsertSession#buildUpsertStream()的默认步骤,通常不需要用户手动调用。
获取会话对象
- 方法签名:
public UpsertSessionImpl getSession()
- 功能说明:
返回已设置的
UpsertSessionImpl
实例,供外部查询或验证使用。
设置最大缓冲区大小
- 方法签名:
public Builder setMaxBufferSize(long maxBufferSize)
- 功能说明: 允许用户设定数据缓冲的最大容量(单位:字节)。这直接影响到内存使用和批量写入的效率,较大的缓冲区可以减少I/O操作次数,但会占用更多内存。
获取最大缓冲区大小
- 方法签名:
public long getMaxBufferSize()
- 功能说明: 查询当前设置的最大缓冲区大小。
设置槽缓冲区大小
- 方法签名:
public Builder setSlotBufferSize(long slotBufferSize)
- 功能说明: 为每个桶设置缓冲区大小(单位:字节)。