This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Legacy Features #
As Flink SQL has matured there are some features that have been replaced with more modern and better functioning substitutes. These legacy features remain documented here for those users that have not yet or are unable to, upgrade to the more modern variant.
Temporal Table Function #
The temporal table function is the legacy way of defining something akin to a versioned table that can be used in a temporal table join. Please define temporal joins using versioned tables in new queries.
Unlike a versioned table, temporal table functions can only be defined on top of append-only streams — it does not support changelog inputs. Additionally, a temporal table function cannot be defined in pure SQL DDL.
Defining a Temporal Table Function #
Temporal table functions can be defined on top of append-only streams using the Table API. The table is registered with one or more key columns, and a time attribute used for versioning.
Suppose we have an append-only table of currency rates that we would like to register as a temporal table function.
SELECT * FROM currency_rates;
update_time currency rate
============= ========= ====
09:00:00 Yen 102
09:00:00 Euro 114
09:00:00 USD 1
11:15:00 Euro 119
11:49:00 Pounds 108
Using the Table API, we can register this stream using currency
for the key and update_time
as
the versioning time attribute.
TemporalTableFunction rates = tEnv
.from("currency_rates").
.createTemporalTableFunction("update_time", "currency");
tEnv.registerFunction("rates", rates);
rates = tEnv
.from("currency_rates").
.createTemporalTableFunction("update_time", "currency")
tEnv.registerFunction("rates", rates)
Temporal Table Function Join #
Once defined, a temporal table function is used as a standard table function. Append-only tables (left input/probe side) can join with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes, to retrieve the value for a key as it was at a particular point in time.
Consider an append-only table orders
that tracks customers' orders in different currencies.
SELECT * FROM orders;
order_time amount currency
========== ====== =========
10:15 2 Euro
10:30 1 USD
10:32 50 Yen
10:52 3 Euro
11:04 5 USD
Given these tables, we would like to convert orders to a common currency — USD.
SELECT
SUM(amount * rate) AS amount
FROM
orders,
LATERAL TABLE (rates(order_time))
WHERE
rates.currency = orders.currency
Table result = orders
.joinLateral($("rates(order_time)"), $("orders.currency = rates.currency"))
.select($("(o_amount * r_rate).sum as amount"));
val result = orders
.joinLateral($"rates(order_time)", $"orders.currency = rates.currency")
.select($"(o_amount * r_rate).sum as amount"))