Ogg

Ogg Format #

Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema

Oracle GoldenGate (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。

Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等

Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。

Dependencies #

Ogg Json #

In order to use the Ogg the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
Built-in

注意: 请参考 Ogg Kafka Handler documentation, 了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。

How to use Ogg format #

Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle PRODUCTS 表捕获的更新操作的简单示例:

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "op_type": "U",
  "op_ts": "2020-05-13 15:40:06.000000",
  "current_ts": "2020-05-13 15:40:07.000000",
  "primary_keys": [
    "id"
  ],
  "pos": "00000000000000000000143",
  "table": "PRODUCTS"
}

注意:请参考 Debezium documentation 了解每个字段的含义.

Oracle PRODUCTS 表 有 4 列 (id, name, description and weight). 上面的 JSON 消息是 PRODUCTS 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15. 假设此消息已同步到 Kafka 的 Topic products_ogg, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。

CREATE TABLE topic_products (
  -- schema is totally the same to the Oracle "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
  'connector' = 'kafka',
  'topic' = 'products_ogg',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'ogg-json'
)

再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。

-- a real-time materialized view on the Oracle "PRODUCTS"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight)
FROM topic_products
GROUP BY name;

-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT *
FROM topic_products;

Available Metadata #

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Attention Format metadata fields are only available if the corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose metadata fields for its value format.

Key Data Type Description
table STRING NULL Contains fully qualified table name. The format of the fully qualified table name is: CATALOG NAME.SCHEMA NAME.TABLE NAME
primary-keys ARRAY<STRING> NULL An array variable holding the column names of the primary keys of the source table. The primary-keys field is only include in the JSON output if the includePrimaryKeys configuration property is set to true.
ingestion-timestamp TIMESTAMP_LTZ(6) NULL The timestamp at which the connector processed the event. Corresponds to the current_ts field in the Ogg record.
event-timestamp TIMESTAMP_LTZ(6) NULL The timestamp at which the source system created the event. Corresponds to the op_ts field in the Ogg record.

The following example shows how to access Ogg metadata fields in Kafka:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'ogg-json'
);

Format Options #

选项 要求 默认 类型 描述
format
必填 (none) String 指定要使用的格式,此处应为 'ogg-json'.
ogg-json.ignore-parse-errors
选填 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
ogg-json.timestamp-format.standard
可选 'SQL' String 声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601'
  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。
ogg-json.map-null-key.mode
选填 'FAIL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL', 'DROP''LITERAL':
  • Option 'FAIL' 将抛出异常。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ogg-json.map-null-key.literal 定义。
ogg-json.map-null-key.literal
选填 'null' String 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
ogg-json.encode.ignore-null-fields
选填 false Boolean 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。

Data Type Mapping #

目前, Ogg format 使用 JSON format 进行序列化和反序列化。有关数据类型映射的更多详细信息,请参考 JSON Format 文档