Flink SQL operates over dynamic tables that evolve, which may either be append-only or updating. Versioned tables represent a special type of updating table that remembers the past values for each key.
Attention The Legacy planner does not support versioned tables.
Dynamic tables define relations over time. Often, particularly when working with metadata, a key’s old value does not become irrelevant when it changes.
Flink SQL can define versioned tables over any dynamic table with a
PRIMARY KEY constraint and time attribute.
A primary key constraint in Flink means that a column or set of columns of a table or view are unique and non-null.
The primary key semantic on a upserting table means the materialized changes for a particular key (
DELETE) represent the changes to a single row over time. The time attribute on a upserting table defines when each change occurred.
Taken together, Flink can track the changes to a row over time and maintain the period for which each value was valid for that key.
Suppose a table tracks the prices of different products in a store.
Given this set of changes, we track how the price of a scooter changes over time.
It is initially $11.11 at
00:01:00 when added to the catalog.
The price then rises to $12.99 at
12:00:00 before being deleted from the catalog at
If we queried the table for various products’ prices at different times, we would retrieve different results. At
10:00:00 the table would show one set of prices.
13:00:00, we would find another set of prices.
Versioned tables are defined implicitly for any tables whose underlying sources or formats directly define changelogs.
Examples include the upsert Kafka source as well as database changelog
formats such as debezium and canal.
As discussed above, the only additional requirement is the
CREATE table statement must contain a
PRIMARY KEY and an event-time attribute.
Flink also supports defining versioned views if the underlying query contains a unique key constraint and event-time attribute. Imagine an append-only table of currency rates.
currency_rates contains a row for each currency — with respect to USD —
and receives a new row each time the rate changes.
JSON format does not support native changelog semantics, so Flink can only read this table as append-only.
Flink interprets each row as an
INSERT to the table, meaning we cannot define a
PRIMARY KEY over currency.
However, it is clear to us (the query developer) that this table has all the necessary information to define a versioned table.
Flink can reinterpret this table as a versioned table by defining a deduplication query
which produces an ordered changelog stream with an inferred primary key (currency) and event time (update_time).
Flink has a special optimization step that will efficiently transform this query into a versioned table usable in subsequent queries. In general, the results of a query with the following format produces a versioned table:
ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
PARTITION BY col1[, col2...]: Specifies the partition columns, i.e. the deduplicate key. These columns form the primary key of the subsequent versioned table.
ORDER BY time_attr DESC: Specifies the ordering column, it must be a time attribute.
WHERE rownum = 1: The
rownum = 1is required for Flink to recognize this query is to generate a versioned table.