Ogg
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Ogg Format #

Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema

Oracle GoldenGate (a.k.a ogg) is a managed service providing a real-time data mesh platform, which uses replication to keep data highly available, and enabling real-time analysis. Customers can design, execute, and monitor their data replication and stream data processing solutions without the need to allocate or manage compute environments. Ogg provides a format schema for changelog and supports to serialize messages using JSON.

Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into Flink SQL system. This is useful in many cases to leverage this feature, such as

  • synchronizing incremental data from databases to other systems
  • auditing logs
  • real-time materialized views on databases
  • temporal join changing history of a database table and so on.

Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Ogg JSON, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Ogg messages.

Dependencies #

Ogg Json #

In order to use the Ogg the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
Built-in

Note: please refer to Ogg Kafka Handler documentation about how to set up an Ogg Kafka handler to synchronize changelog to Kafka topics.

How to use Ogg format #

Ogg provides a unified format for changelog, here is a simple example for an update operation captured from an Oracle PRODUCTS table in JSON format:

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "op_type": "U",
  "op_ts": "2020-05-13 15:40:06.000000",
  "current_ts": "2020-05-13 15:40:07.000000",
  "primary_keys": [
    "id"
  ],
  "pos": "00000000000000000000143",
  "table": "PRODUCTS"
}

Note: please refer to Debezium documentation about the meaning of each field.

The Oracle PRODUCTS table has 4 columns (id, name, description and weight). The above JSON message is an update change event on the PRODUCTS table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. Assuming this messages is synchronized to Kafka topic products_ogg, then we can use the following DDL to consume this topic and interpret the change events.

CREATE TABLE topic_products (
  -- schema is totally the same to the Oracle "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
  'connector' = 'kafka',
  'topic' = 'products_ogg',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'ogg-json'
)

After registering the topic as a Flink table, then you can consume the Ogg messages as a changelog source.

-- a real-time materialized view on the Oracle "PRODUCTS"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight)
FROM topic_products
GROUP BY name;

-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT *
FROM topic_products;

Available Metadata #

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Attention Format metadata fields are only available if the corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose metadata fields for its value format.

Key Data Type Description
table STRING NULL Contains fully qualified table name. The format of the fully qualified table name is: CATALOG NAME.SCHEMA NAME.TABLE NAME
primary-keys ARRAY<STRING> NULL An array variable holding the column names of the primary keys of the source table. The primary-keys field is only include in the JSON output if the includePrimaryKeys configuration property is set to true.
ingestion-timestamp TIMESTAMP_LTZ(6) NULL The timestamp at which the connector processed the event. Corresponds to the current_ts field in the Ogg record.
event-timestamp TIMESTAMP_LTZ(6) NULL The timestamp at which the source system created the event. Corresponds to the op_ts field in the Ogg record.

The following example shows how to access Ogg metadata fields in Kafka:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  primary_keys ARRAY<STRING> METADATA FROM 'value.primary-keys' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'ogg-json'
);

Format Options #

Option Required Default Type Description
format
required (none) String Specify what format to use, here should be 'ogg-json'.
ogg-json.ignore-parse-errors
optional false Boolean Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
ogg-json.timestamp-format.standard
optional 'SQL' String Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
  • Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.
  • Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.
ogg-json.map-null-key.mode
optional 'FAIL' String Specify the handling mode when serializing null keys for map data. Currently supported values are 'FAIL', 'DROP' and 'LITERAL':
  • Option 'FAIL' will throw exception when encountering map with null key.
  • Option 'DROP' will drop null key entries for map data.
  • Option 'LITERAL' will replace null key with string literal. The string literal is defined by ogg-json.map-null-key.literal option.
ogg-json.map-null-key.literal
optional 'null' String Specify string literal to replace null key when 'ogg-json.map-null-key.mode' is LITERAL.
ogg-json.encode.ignore-null-fields
optional false Boolean Encode only non-null fields. By default, all fields will be included.

Data Type Mapping #

Currently, the Ogg format uses JSON format for serialization and deserialization. Please refer to JSON Format documentation for more details about the data type mapping.