SQL and the relational algebra have not been designed with streaming data in mind. As a consequence, there are few conceptual gaps between relational algebra (and SQL) and stream processing.
This page discusses these differences and explains how Flink can achieve the same semantics on unbounded data as a regular database engine on bounded data.
The following table compares traditional relational algebra and stream processing with respect to input data, execution, and output results.
|Relational Algebra / SQL
|Relations (or tables) are bounded (multi-)sets of tuples.
|A stream is an infinite sequences of tuples.
|A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data.
|A streaming query cannot access all data when is started and has to "wait" for data to be streamed in.
|A batch query terminates after it produced a fixed sized result.
|A streaming query continuously updates its result based on the received records and never completes.
Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called Materialized Views. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes outdated when the base tables of its definition query are modified. Eager View Maintenance is a technique to update a materialized view as soon as its base tables are updated.
The connection between eager view maintenance and SQL queries on streams becomes obvious if we consider the following:
DELETE DML statements, often called changelog stream.
With these points in mind, we introduce following concept of Dynamic tables in the next section.
Dynamic tables are the core concept of Flink’s Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic tables are changing over time. They can be queried like static batch tables. Querying dynamic tables yields a Continuous Query. A continuous query never terminates and produces a dynamic table as result. The query continuously updates its (dynamic) result table to reflect the changes on its (dynamic) input tables. Essentially, a continuous query on a dynamic table is very similar to a query that defines a materialized view.
It is important to note that the result of a continuous query is always semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables.
The following figure visualizes the relationship of streams, dynamic tables, and continuous queries:
Note: Dynamic tables are foremost a logical concept. Dynamic tables are not necessarily (fully) materialized during query execution.
In the following, we will explain the concepts of dynamic tables and continuous queries with a stream of click events that have the following schema:
In order to process a stream with a relational query, it has to be converted into a
Table. Conceptually, each record of the stream is interpreted as an
INSERT modification on the resulting table. Essentially, we are building a table from an
INSERT-only changelog stream.
The following figure visualizes how the stream of click event (left-hand side) is converted into a table (right-hand side). The resulting table is continuously growing as more records of the click stream are inserted.
Note: A table which is defined on a stream is internally not materialized.
A continuous query is evaluated on a dynamic table and produces a new dynamic table as result. In contrast to a batch query, a continuous query never terminates and updates its result table according to the updates on its input tables. At any point in time, the result of a continuous query is semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables.
In the following we show two example queries on a
clicks table that is defined on the stream of click events.
The first query is a simple
GROUP-BY COUNT aggregation query. It groups the
clicks table on the
user field and counts the number of visited URLs. The following figure shows how the query is evaluated over time as the
clicks table is updated with additional rows.
When the query is started, the
clicks table (left-hand side) is empty. The query starts to compute the result table, when the first row is inserted into the
clicks table. After the first row
[Mary, ./home] was inserted, the result table (right-hand side, top) consists of a single row
[Mary, 1]. When the second row
[Bob, ./cart] is inserted into the
clicks table, the query updates the result table and inserts a new row
[Bob, 1]. The third row
[Mary, ./prod?id=1] yields an update of an already computed result row such that
[Mary, 1] is updated to
[Mary, 2]. Finally, the query inserts a third row
[Liz, 1] into the result table, when the fourth row is appended to the
The second query is similar to the first one but groups the
clicks table in addition to the
user attribute also on an hourly tumbling window before it counts the number of URLs (time-based computations such as windows are based on special time attributes, which are discussed later.). Again, the figure shows the input and output at different points in time to visualize the changing nature of dynamic tables.
As before, the input table
clicks is shown on the left. The query continuously computes results every hour and updates the result table. The clicks table contains four rows with timestamps (
12:59:59. The query computes two results rows from this input (one for each
user) and appends them to the result table. For the next window between
clicks table contains three rows, which results in another two rows being appended to the result table. The result table is updated, as more rows are appended to
clicks over time.
Although the two example queries appear to be quite similar (both compute a grouped count aggregate), they differ in one important aspect:
Whether a query produces an append-only table or an updated table has some implications:
Many, but not all, semantically valid queries can be evaluated as continuous queries on streams. Some queries are too expensive to compute, either due to the size of state that they need to maintain or because computing updates is too expensive.
RANK based on the time of the last click. As soon as the
clicks table receives a new row, the
lastAction of the user is updated and a new rank must be computed. However since two rows cannot have the same rank, all lower ranked rows need to be updated as well.
The Query Configuration page discusses parameters to control the execution of continuous queries. Some parameters can be used to trade the size of maintained state for result accuracy.
A dynamic table can be continuously modified by
DELETE changes just like a regular database table. It might be a table with a single row, which is constantly updated, an insert-only table without
DELETE modifications, or anything in between.
When converting a dynamic table into a stream or writing it to an external system, these changes need to be encoded. Flink’s Table API and SQL support three ways to encode the changes of a dynamic table:
Append-only stream: A dynamic table that is only modified by
INSERT changes can be converted into a stream by emitting the inserted rows.
Retract stream: A retract stream is a stream with two types of messages, add messages and retract messages. A dynamic table is converted into an retract stream by encoding an
INSERT change as add message, a
DELETE change as retract message, and an
UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.
UPDATE changes as upsert messages and
DELETE changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that
UPDATE changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.
The API to convert a dynamic table into a
DataStream is discussed on the Common Concepts page. Please note that only append and retract streams are supported when converting a dynamic table into a
TableSink interface to emit a dynamic table to an external system are discussed on the TableSources and TableSinks page.