This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Ogg Format #
Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema
Oracle GoldenGate (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图
- 关联维度数据库的变更历史,等等
Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
Dependencies #
Ogg Json #
In order to use the Ogg the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Built-in |
注意: 请参考 Ogg Kafka Handler documentation, 了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。
How to use Ogg format #
Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle PRODUCTS
表捕获的更新操作的简单示例:
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}
注意:请参考 Debezium documentation 了解每个字段的含义.
Oracle PRODUCTS
表 有 4 列 (id
, name
, description
and weight
). 上面的 JSON 消息是 PRODUCTS
表上的一条更新事件,其中 id = 111
的行的
weight
值从 5.18
更改为 5.15
. 假设此消息已同步到 Kafka 的 Topic products_ogg
, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。
CREATE TABLE topic_products (
-- schema is totally the same to the Oracle "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_ogg',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'ogg-json'
)
再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。
-- a real-time materialized view on the Oracle "PRODUCTS"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight)
FROM topic_products
GROUP BY name;
-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT *
FROM topic_products;
Available Metadata #
The following format metadata can be exposed as read-only (VIRTUAL
) columns in a table definition.
Attention Format metadata fields are only available if the corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose metadata fields for its value format.
Key | Data Type | Description |
---|---|---|
table |
STRING NULL |
Contains fully qualified table name. The format of the fully qualified table name is: CATALOG NAME.SCHEMA NAME.TABLE NAME |
primary-keys |
ARRAY<STRING> NULL |
An array variable holding the column names of the primary keys of the source table. The primary-keys field is only include in the JSON output if the includePrimaryKeys configuration property is set to true. |
ingestion-timestamp |
TIMESTAMP_LTZ(6) NULL |
The timestamp at which the connector processed the event. Corresponds to the current_ts field in the Ogg record. |
event-timestamp |
TIMESTAMP_LTZ(6) NULL |
The timestamp at which the source system created the event. Corresponds to the op_ts field in the Ogg record. |
The following example shows how to access Ogg metadata fields in Kafka:
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'ogg-json'
);
Format Options #
选项 | 要求 | 默认 | 类型 | 描述 |
---|---|---|---|---|
format |
必填 | (none) | String | 指定要使用的格式,此处应为 'ogg-json' . |
ogg-json.ignore-parse-errors |
选填 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 |
ogg-json.timestamp-format.standard |
可选 | 'SQL' |
String | 声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601' :
|
ogg-json.map-null-key.mode |
选填 | 'FAIL' |
String | 指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL' , 'DROP' 和 'LITERAL' :
|
ogg-json.map-null-key.literal |
选填 | 'null' | String | 当 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
ogg-json.encode.ignore-null-fields |
选填 | false | Boolean | 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 |
Data Type Mapping #
目前, Ogg format 使用 JSON format 进行序列化和反序列化。有关数据类型映射的更多详细信息,请参考 JSON Format 文档。