Temporal Tables represent a concept of a (parameterized) view on a changing history table that returns the content of a table at a specific point in time.
Flink can keep track of the changes applied to an underlying append-only table and allows for accessing the table’s content at a certain point in time within a query.
Let’s assume that we have the following table
RatesHistory represents an ever growing append-only table of currency exchange rates with respect to
Yen (which has a rate of
For example, the exchange rate for the period from
11:15 it was
Given that we would like to output all current rates at the time
10:58, we would need the following SQL query to compute a result table:
The correlated subquery determines the maximum time for the corresponding currency that is lower or equal than the desired time. The outer query lists the rates that have a maximum timestamp.
The following table shows the result of such a computation. In our example, the update to
10:45 is taken into account, however, the update to
11:15 and the new entry of
Pounds are not considered in the table’s version at time
The concept of Temporal Tables aims to simplify such queries, speed up their execution, and reduce Flink’s state usage. A Temporal Table is a parameterized view on an append-only table that interprets the rows of the append-only table as the changelog of a table and provides the version of that table at a specific point in time. Interpreting the append-only table as a changelog requires the specification of a primary key attribute and a timestamp attribute. The primary key determines which rows are overwritten and the timestamp determines the time during which a row is valid.
In the above example
currency would be a primary key for
RatesHistory table and
rowtime would be the timestamp attribute.
In Flink, a temporal table is represented by a Temporal Table Function.
In order to access the data in a temporal table, one must pass a time attribute that determines the version of the table that will be returned. Flink uses the SQL syntax of table functions to provide a way to express it.
Once defined, a Temporal Table Function takes a single time argument
timeAttribute and returns a set of rows.
This set contains the latest versions of the rows for all of the existing primary keys with respect to the given time attribute.
Assuming that we defined a temporal table function
Rates(timeAttribute) based on
RatesHistory table, we could query such a function in the following way:
Each query to
Rates(timeAttribute) would return the state of the
Rates for the given
Note: Currently, Flink doesn’t support directly querying the temporal table functions with a constant time attribute parameter. At the moment, temporal table functions can only be used in joins.
The example above was used to provide an intuition about what the function
See also the page about joins for continuous queries for more information about how to join with a temporal table.
The following code snippet illustrates how to create a temporal table function from an append-only table.
(2) registers this function under the name
Rates in our table environment,
which allows us to use the
Rates function in SQL.