This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Legacy Features #
As Flink SQL has matured there are some features that have been replaced with more modern and better functioning substitutes. These legacy features remain documented here for those users that have not yet or are unable to, upgrade to the more modern variant.
Temporal Table Function #
The temporal table function is the legacy way of defining something akin to a versioned table that can be used in a temporal table join. Please define temporal joins using versioned tables in new queries.
Unlike a versioned table, temporal table functions can only be defined on top of append-only streams — it does not support changelog inputs. Additionally, a temporal table function cannot be defined in pure SQL DDL.
Defining a Temporal Table Function #
Temporal table functions can be defined on top of append-only streams using the Table API. The table is registered with one or more key columns, and a time attribute used for versioning.
Suppose we have an append-only table of currency rates that we would like to register as a temporal table function.
SELECT * FROM currency_rates; update_time currency rate ============= ========= ==== 09:00:00 Yen 102 09:00:00 Euro 114 09:00:00 USD 1 11:15:00 Euro 119 11:49:00 Pounds 108
Using the Table API, we can register this stream using
currency for the key and
the versioning time attribute.
TemporalTableFunction rates = tEnv .from("currency_rates"). .createTemporalTableFunction("update_time", "currency"); tEnv.registerFunction("rates", rates);
rates = tEnv .from("currency_rates"). .createTemporalTableFunction("update_time", "currency") tEnv.registerFunction("rates", rates)
Temporal Table Function Join #
Once defined, a temporal table function is used as a standard table function. Append-only tables (left input/probe side) can join with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes, to retrieve the value for a key as it was at a particular point in time.
Consider an append-only table
orders that tracks customers' orders in different currencies.
SELECT * FROM orders; order_time amount currency ========== ====== ========= 10:15 2 Euro 10:30 1 USD 10:32 50 Yen 10:52 3 Euro 11:04 5 USD
Given these tables, we would like to convert orders to a common currency — USD.
SELECT SUM(amount * rate) AS amount FROM orders, LATERAL TABLE (rates(order_time)) WHERE rates.currency = orders.currency
Table result = orders .joinLateral($("rates(order_time)"), $("orders.currency = rates.currency")) .select($("(o_amount * r_rate).sum as amount"));
val result = orders .joinLateral($"rates(order_time)", $"orders.currency = rates.currency") .select($"(o_amount * r_rate).sum as amount"))