您可以使用DTS提供的SDK示例代码来消费订阅数据,本文介绍SDK中主要的类的接口定义。

SDK示例代码下载

详情请参见下载SDK

RegionContext接口定义

接口函数 说明
setAccessKey(accessKey) 配置待订阅实例所属的阿里云账号的AccessKey ID。
setSecret(AccessKeySecret) 配置待订阅实例所属的阿里云账号的AccessKey Secret。
setUsePublicIp(usePublicIp) 配置是否使用公网订阅数据。
说明 当前仅支持使用公网订阅数据,此处需配置为 true
context.setUseBinary(boolean useBinary) 配置是否启用二进制组包方式,取值为True或False,建议开启(设置为True)以提升消费性能。
context.setUseDrcNet(boolean useDrcNet) 配置是否开启网络优化功能,取值为True或False,建议开启(设置为True)以提升消费性能。

ClusterClient接口定义

接口函数 说明
void addConcurrentListener(ClusterListener arg0) 添加下游监听者以获取订阅通道中的增量数据。
说明 参数 ClusterListener arg0为类 ClusterListener的对象。
void askForGUID(String arg0) 获取订阅通道的增量数据,参数为数据订阅实例ID。
List<ClusterListener> getConcurrentListeners() 获取ClusterClient中的监听者列表,接口的返回类型为List <ClusterListener>
void start() 启动SDK客户端,开始订阅增量数据。
void stop() 停止SDK客户端,停止订阅增量数据。
说明 由于使用同一个线程执行拉取数据和回调notify,当notify的消费代码中有信号不可打断的功能时,stop函数可能无法正常关闭SDK客户端。

ClusterListener接口定义

void notify(List<ClusterMessage> arg0):定义订阅数据的消费方式。当SDK接收到数据时,会通过notify通知ClusterListner消费数据,然后将订阅数据打印到屏幕上。

ClusterMessage接口定义

说明 每个ClusterMessage保存一个事务的数据记录,事务中的每条记录通过Record保存。
接口函数 说明
Record getRecord() 从ClusterMessage中获取一条变更记录。该记录包含Binlog文件中的具体记录,例如begin、commit、update、insert等。
void ackAsConsumed 当消费完成后,需要调用该接口向DTS汇报一个ACK(确认信息),通知服务端更新消费位点,保障SDK异常重启后消费数据的完整性。
说明 当下游SDK异常宕机并重启后,会自动从上次异常退出的最后一个消费位点继续订阅并消费数据。

Record接口定义

String getAttribute(String key):获取Record中的属性值。您可以在调用该函数时传入下表中的参数来获取对应的属性值。
参数 说明
record_id 该条记录的ID。
说明 该ID在订阅过程中不一定处于递增状态。
instance 数据库实例的连接地址,格式为IP:Port。
source_type 数据库实例的引擎类型,固定为MySQL。
source_category 记录的类型,固定为full_recorded。
timestamp Binlog时间戳,即该条SQL语句在源库中执行的时间。
checkpoint Binlog的位点,格式为binlog_offset@binlog_file
说明 binlog_offset为该条记录在binlog文件中的偏移量, binlog_file为binlog文件的数字后缀,例如binlog文件名为mysql-bin.0008,那么binlog_file即为8。
record_type 操作类型,取值包括:insert、update、delete、replace、ddl、begin、commit、heartbeat。
说明 heartbeat是DTS定义的心跳表,每秒产生一条记录,用于检测订阅通道是否正常运行。
db 数据库名称。
table_name 数据表名称。
record_recording 编码方式。
primary 主键的列名。如果是联合主键,列名之间会用英文逗号(,)分隔。
fields_enc 每个字段值的编码,各字段之间用英文逗号(,)分隔。
说明 如果字段值为非字符的类型,则取值为空。

SDK示例代码中预置了下表的接口函数,用于获取Record中的属性值。

接口函数 说明
Type getOpt() 获取操作类型。
String getCheckpoint() 获取Binlog的位点。
String gettimestamp() 获取Binlog中记录的时间戳。
String getDbname() 获取数据库名。
String getTablename() 获取数据表名。
String getPrimaryKeys() 获取主键列名。
DBType getDbType() 获取数据库类型。
String getServerId() 获取数据库实例的连接地址。
int getFieldCount() 获取字段Field的个数。
List<Field> getFieldList() 获取所有字段的定义、变更前和变更后的镜像值。Field对象的定义详情请参见Field接口定义
Boolean isFirstInLogevent() 判断该记录是否为数据库批量变更中的第一条事务日志,返回值:True或False。

Field接口定义

接口函数 说明
String getEncoding() 获取该字段值的编码格式。
String getFieldname() 获取该字段的名称。
Type getType() 获取该字段的数据类型。
ByteString getValue() 获取该字段的值,返回类型为ByteString,当值为空时,返回NULL
Boolean isPrimary() 判断该字段是否为主键列,返回值:True或False。