阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务DTS(Data Transmission Service),您可以将RDS MySQL同步至DataHub,帮助您快速实现使用流计算等大数据产品对数据实时分析。
前提条件
- DataHub实例的地域为华东1、华东2、华北2或华南1。
- DataHub实例中,已创建用作接收同步数据的项目(Project),详情请参见创建项目。
- RDS MySQL中待同步的表需具备主键或唯一约束。
功能限制
- 不支持全量数据初始化,即DTS不会将源RDS实例中同步对象的存量数据同步至目标DataHub实例。
- 仅支持表级别的数据同步。
- 不支持新增列的数据同步,即源数据表新增了某个列,该列的数据不会同步至目标DataHub实例。
- 数据同步的过程中,请勿对源库中待同步的表执行DDL变更,否则会导致同步失败。
支持同步的SQL操作
INSERT、UPDATE、DELETE。
操作步骤
Topic结构定义说明
DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。
说明 本案例中的业务字段为
id
、
name
、
address
,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段添加
dts_
的前缀。
结构定义说明:
旧版附加列名称 | 新版附加列名称 | 数据类型 | 说明 |
---|---|---|---|
dts_record_id |
new_dts_sync_dts_record_id |
String | 增量日志的记录ID,为该日志唯一标识。
说明
|
dts_operation_flag |
new_dts_sync_dts_operation_flag |
String | 操作类型,取值:
|
dts_instance_id |
new_dts_sync_dts_instance_id |
String | 数据库的server ID。暂不支持显示实际的值,目前固定为null 。 |
dts_db_name |
new_dts_sync_dts_db_name |
String | 数据库名称。 |
dts_table_name |
new_dts_sync_dts_table_name |
String | 表名。 |
dts_utc_timestamp |
new_dts_sync_dts_utc_timestamp |
String | 操作时间戳,即binlog的时间戳(UTC 时间)。 |
dts_before_flag |
new_dts_sync_dts_before_flag |
String | 所有列的值是否更新前的值,取值:Y或N。 |
dts_after_flag |
new_dts_sync_dts_after_flag |
String | 所有列的值是否更新后的值,取值:Y或N。 |
关于dts_before_flag和dts_after_flag的补充说明
对于不同的操作类型,增量日志中的dts_before_flag
和dts_after_flag
定义如下:
- INSERT
当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以
dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - UPDATE
当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的
dts_record_id
、dts_operation_flag
及dts_utc_timestamp
对应的值相同。第一条增量日志记录了更新前的值,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N。第二条增量日志记录了更新后的值,所以dts_before_flag
取值为N,dts_after_flag
取值为Y,示例如下。 - DELETE
当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以
dts_before_flag
取值为Y,dts_after_flag
取值为N,示例如下。
后续操作
配置完数据同步作业后,您可以对同步到DataHub实例中的数据执行计算分析。更多详情,请参见阿里云实时计算。