Temporal Tables represent a concept of a (parameterized) view on a changing table that returns the content of a table at a specific point in time.
The changing table can either be a changing history table which tracks the changes (e.g. database changelogs) or a changing dimension table which materializes the changes (e.g. database tables).
For the changing history table, Flink can keep track of the changes and allows for accessing the content of the table at a certain point in time within a query. In Flink, this kind of table is represented by a Temporal Table Function.
For the changing dimension table, Flink allows for accessing the content of the table at processing time within a query. In Flink, this kind of table is represented by a Temporal Table.
Let’s assume that we have the following table RatesHistory
.
RatesHistory
represents an ever growing append-only table of currency exchange rates with respect to Yen
(which has a rate of 1
).
For example, the exchange rate for the period from 09:00
to 10:45
of Euro
to Yen
was 114
. From 10:45
to 11:15
it was 116
.
Given that we would like to output all current rates at the time 10:58
, we would need the following SQL query to compute a result table:
The correlated subquery determines the maximum time for the corresponding currency that is lower or equal than the desired time. The outer query lists the rates that have a maximum timestamp.
The following table shows the result of such a computation. In our example, the update to Euro
at 10:45
is taken into account, however, the update to Euro
at 11:15
and the new entry of Pounds
are not considered in the table’s version at time 10:58
.
The concept of Temporal Tables aims to simplify such queries, speed up their execution, and reduce Flink’s state usage. A Temporal Table is a parameterized view on an append-only table that interprets the rows of the append-only table as the changelog of a table and provides the version of that table at a specific point in time. Interpreting the append-only table as a changelog requires the specification of a primary key attribute and a timestamp attribute. The primary key determines which rows are overwritten and the timestamp determines the time during which a row is valid.
In the above example currency
would be a primary key for RatesHistory
table and rowtime
would be the timestamp attribute.
In Flink, this is represented by a Temporal Table Function.
On the other hand, some use cases require to join a changing dimension table which is an external database table.
Let’s assume that LatestRates
is a table (e.g. stored in) which is materialized with the latest rate. The LatestRates
is the materialized history RatesHistory
. Then the content of LatestRates
table at time 10:58
will be:
The content of LatestRates
table at time 12:00
will be:
In Flink, this is represented by a Temporal Table.
In order to access the data in a temporal table, one must pass a time attribute that determines the version of the table that will be returned. Flink uses the SQL syntax of table functions to provide a way to express it.
Once defined, a Temporal Table Function takes a single time argument timeAttribute
and returns a set of rows.
This set contains the latest versions of the rows for all of the existing primary keys with respect to the given time attribute.
Assuming that we defined a temporal table function Rates(timeAttribute)
based on RatesHistory
table, we could query such a function in the following way:
Each query to Rates(timeAttribute)
would return the state of the Rates
for the given timeAttribute
.
Note: Currently, Flink doesn’t support directly querying the temporal table functions with a constant time attribute parameter. At the moment, temporal table functions can only be used in joins.
The example above was used to provide an intuition about what the function Rates(timeAttribute)
returns.
See also the page about joins for continuous queries for more information about how to join with a temporal table.
The following code snippet illustrates how to create a temporal table function from an append-only table.
Line (1)
creates a rates
temporal table function,
which allows us to use the function rates
in the Table API.
Line (2)
registers this function under the name Rates
in our table environment,
which allows us to use the Rates
function in SQL.
Attention This is only supported in Blink planner.
In order to access data in temporal table, currently one must define a TableSource
with LookupableTableSource
. Flink uses the SQL syntax of FOR SYSTEM_TIME AS OF
to query temporal table, which is proposed in SQL:2011.
Assuming that we defined a temporal table called LatestRates
, we can query such a table in the following way:
Note: Currently, Flink doesn’t support directly querying the temporal table with a constant time. At the moment, temporal table can only be used in joins. The example above is used to provide an intuition about what the temporal table LatestRates
returns.
See also the page about joins for continuous queries for more information about how to join with a temporal table.
See also the page about how to define LookupableTableSource.