阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务DTS(Data Transmission Service),您可以将PolarDB-X同步至DataHub,可用于流计算等大数据产品对数据进行实时分析等场景。
前提条件
- PolarDB-X中的数据库须基于RDS MySQL创建。
- 待同步的表具备主键或唯一约束。
- DataHub实例的地域为华东1、华东2、华北2或华南1。
- DataHub实例中,已创建用作接收同步数据的Project,详情请参见创建Project。
功能限制
- 仅支持表级别的数据同步。
- 不支持全量数据初始化,即DTS不会将源PolarDB-X中同步对象的存量数据同步至目标DataHub实例中。
- 数据同步期间,请勿对PolarDB-X执行扩容、缩容、迁移热点表、变更拆分键和变更DDL等操作,否则将导致数据同步失败。
- 如果需要在数据同步期间切换PolarDB-X的网络类型,在您执行完网络类型切换操作后,请提交工单调整同步链路的网络连接信息。
支持同步的SQL操作
INSERT、UPDATE、DELETE。
操作步骤
Topic结构定义说明
DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。
说明 本案例中的业务字段为
id
、
name
、
address
,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段添加
dts_
的前缀。
结构定义说明:
旧版附加列名称 | 新版附加列名称 | 数据类型 | 说明 |
---|---|---|---|
dts_record_id |
new_dts_sync_dts_record_id |
String | 增量日志的记录ID,为该日志唯一标识。
说明
|
dts_operation_flag |
new_dts_sync_dts_operation_flag |
String | 操作类型,取值:
|
dts_instance_id |
new_dts_sync_dts_instance_id |
String | 数据库的server ID。暂不支持显示实际的值,目前固定为null 。 |
dts_db_name |
new_dts_sync_dts_db_name |
String | 数据库名称。 |
dts_table_name |
new_dts_sync_dts_table_name |
String | 表名。 |
dts_utc_timestamp |
new_dts_sync_dts_utc_timestamp |
String | 操作时间戳,即binlog的时间戳(UTC 时间)。 |
dts_before_flag |
new_dts_sync_dts_before_flag |
String | 所有列的值是否更新前的值,取值:Y或N。 |
dts_after_flag |
new_dts_sync_dts_after_flag |
String | 所有列的值是否更新后的值,取值:Y或N。 |
关于dts_before_flag和dts_after_flag的补充说明
对于不同的操作类型,增量日志中的dts_before_flag
和dts_after_flag
定义如下:
- INSERT
当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以
dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - UPDATE
当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的
dts_record_id
、dts_operation_flag
及dts_utc_timestamp
对应的值相同。第一条增量日志记录了更新前的值,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N。第二条增量日志记录了更新后的值,所以dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - DELETE
当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N,示例如下。
后续操作
配置完数据同步作业后,您可以对同步到DataHub实例中的数据执行计算分析。更多详情,请参见阿里云实时计算。