您可以使用DTS提供的SDK示例代码来消费订阅数据,本文介绍SDK中主要的类的接口定义。
SDK示例代码下载
详情请参见下载SDK。
RegionContext接口定义
接口函数 | 说明 |
---|---|
setAccessKey(accessKey) |
配置待订阅实例所属的阿里云账号的AccessKey ID。 |
setSecret(AccessKeySecret) |
配置待订阅实例所属的阿里云账号的AccessKey Secret。 |
setUsePublicIp(usePublicIp) |
配置是否使用公网订阅数据。
说明 当前仅支持使用公网订阅数据,此处需配置为
true 。
|
context.setUseBinary(boolean useBinary) |
配置是否启用二进制组包方式,取值为True或False,建议开启(设置为True)以提升消费性能。 |
context.setUseDrcNet(boolean useDrcNet) |
配置是否开启网络优化功能,取值为True或False,建议开启(设置为True)以提升消费性能。 |
ClusterClient接口定义
接口函数 | 说明 |
---|---|
void addConcurrentListener(ClusterListener arg0) |
添加下游监听者以获取订阅通道中的增量数据。
说明 参数
ClusterListener arg0 为类
ClusterListener 的对象。
|
void askForGUID(String arg0) |
获取订阅通道的增量数据,参数为数据订阅实例ID。 |
List<ClusterListener> getConcurrentListeners() |
获取ClusterClient中的监听者列表,接口的返回类型为List <ClusterListener> 。 |
void start() |
启动SDK客户端,开始订阅增量数据。 |
void stop() |
停止SDK客户端,停止订阅增量数据。
说明 由于使用同一个线程执行拉取数据和回调notify,当notify的消费代码中有信号不可打断的功能时,stop函数可能无法正常关闭SDK客户端。
|
ClusterListener接口定义
void notify(List<ClusterMessage> arg0)
:定义订阅数据的消费方式。当SDK接收到数据时,会通过notify通知ClusterListner消费数据,然后将订阅数据打印到屏幕上。
ClusterMessage接口定义
说明 每个ClusterMessage保存一个事务的数据记录,事务中的每条记录通过Record保存。
接口函数 | 说明 |
---|---|
Record getRecord() |
从ClusterMessage中获取一条变更记录。该记录包含Binlog文件中的具体记录,例如begin、commit、update、insert等。 |
void ackAsConsumed |
当消费完成后,需要调用该接口向DTS汇报一个ACK(确认信息),通知服务端更新消费位点,保障SDK异常重启后消费数据的完整性。
说明 当下游SDK异常宕机并重启后,会自动从上次异常退出的最后一个消费位点继续订阅并消费数据。
|
Record接口定义
String getAttribute(String key)
:获取Record中的属性值。您可以在调用该函数时传入下表中的参数来获取对应的属性值。
参数 | 说明 |
---|---|
record_id |
该条记录的ID。
说明 该ID在订阅过程中不一定处于递增状态。
|
instance |
数据库实例的连接地址,格式为IP:Port。 |
source_type |
数据库实例的引擎类型,固定为MySQL。 |
source_category |
记录的类型,固定为full_recorded。 |
timestamp |
Binlog时间戳,即该条SQL语句在源库中执行的时间。 |
checkpoint |
Binlog的位点,格式为binlog_offset@binlog_file 。
说明
binlog_offset 为该条记录在binlog文件中的偏移量,
binlog_file 为binlog文件的数字后缀,例如binlog文件名为mysql-bin.0008,那么binlog_file即为8。
|
record_type |
操作类型,取值包括:insert、update、delete、replace、ddl、begin、commit、heartbeat。
说明 heartbeat是DTS定义的心跳表,每秒产生一条记录,用于检测订阅通道是否正常运行。
|
db |
数据库名称。 |
table_name |
数据表名称。 |
record_recording |
编码方式。 |
primary |
主键的列名。如果是联合主键,列名之间会用英文逗号(,)分隔。 |
fields_enc |
每个字段值的编码,各字段之间用英文逗号(,)分隔。
说明 如果字段值为非字符的类型,则取值为空。
|
SDK示例代码中预置了下表的接口函数,用于获取Record中的属性值。
接口函数 | 说明 |
---|---|
Type getOpt() |
获取操作类型。 |
String getCheckpoint() |
获取Binlog的位点。 |
String gettimestamp() |
获取Binlog中记录的时间戳。 |
String getDbname() |
获取数据库名。 |
String getTablename() |
获取数据表名。 |
String getPrimaryKeys() |
获取主键列名。 |
DBType getDbType() |
获取数据库类型。 |
String getServerId() |
获取数据库实例的连接地址。 |
int getFieldCount() |
获取字段Field的个数。 |
List<Field> getFieldList() |
获取所有字段的定义、变更前和变更后的镜像值。Field对象的定义详情请参见Field接口定义。 |
Boolean isFirstInLogevent() |
判断该记录是否为数据库批量变更中的第一条事务日志,返回值:True或False。 |
Field接口定义
接口函数 | 说明 |
---|---|
String getEncoding() |
获取该字段值的编码格式。 |
String getFieldname() |
获取该字段的名称。 |
Type getType() |
获取该字段的数据类型。 |
ByteString getValue() |
获取该字段的值,返回类型为ByteString,当值为空时,返回NULL 。 |
Boolean isPrimary() |
判断该字段是否为主键列,返回值:True或False。 |