Changelog-Data-Capture Format Format: Deserialization Schema
Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
注意: 支持解析 Debezium Avro 消息和输出 Debezium 消息已经规划在路线图上了。
为了设置 Debezium Format,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 SQL JAR 包的 SQL Client 的两个项目的依赖项信息。
Maven 依赖 | SQL Client JAR |
---|---|
flink-json |
内置 |
注意: 请参考 Debezium 文档,了解如何设置 Debezium Kafka Connect 用来将变更日志同步到 Kafka 主题。
Debezium 为变更日志提供了统一的格式,这是一个从 MySQL product 表捕获的更新操作的简单示例:
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
注意: 请参考 Debezium 文档,了解每个字段的含义。
MySQL 产品表有4列(id
、name
、description
、weight
)。上面的 JSON 消息是 products
表上的一条更新事件,其中 id = 111
的行的 weight
值从 5.18
更改为 5.15
。假设此消息已同步到 Kafka 主题 products_binlog
,则可以使用以下 DDL 来使用此主题并解析更改事件。
在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 'value.converter.schemas.enable'
,用来在消息体中包含 schema 信息。然后,Debezium JSON 消息可能如下所示:
{
"schema": {...},
"payload": {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
}
为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 'debezium-json.schema-include' = 'true'
(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。
在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format |
必选 | (none) | String | 指定要使用的格式,此处应为 'debezium-json' 。 |
debezium-json.schema-include |
可选 | false | Boolean | 设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。 |
debezium-json.ignore-parse-errors |
可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 。 |
debezium-json.timestamp-format.standard |
可选 | 'SQL' |
String | 声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601' :
|
如果你正在使用 Debezium PostgreSQL Connector 捕获变更到 Kafka,请确保被监控表的 REPLICA IDENTITY 已经被配置成 FULL
了,默认值是 DEFAULT
。
否则,Flink SQL 将无法正确解析 Debezium 数据。
当配置为 FULL
时,更新和删除事件将完整包含所有列的之前的值。当为其他配置时,更新和删除事件的 “before” 字段将只包含 primary key 字段的值,或者为 null(没有 primary key)。
你可以通过运行 ALTER TABLE <your-table-name> REPLICA IDENTITY FULL
来更改 REPLICA IDENTITY
的配置。
请阅读 Debezium 关于 PostgreSQL REPLICA IDENTITY 的文档 了解更多。
目前,Debezium Format 使用 JSON Format 进行反序列化。有关数据类型映射的更多详细信息,请参考 JSON Format 文档。