The Table API and SQL are integrated in a joint API. The central concept of this API is a Table
which serves as input and output of queries. This document shows the common structure of programs with Table API and SQL queries, how to register a Table
, how to query a Table
, and how to emit a Table
.
All Table API and SQL programs for batch and streaming follow the same pattern. The following code example shows the common structure of Table API and SQL programs.
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...); // or
tableEnv.registerExternalCatalog("extCat", ...);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);
// execute
env.execute();
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)
// execute
env.execute()
Note: Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the Integration with DataStream and DataSet API section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
The TableEnvironment
is a central concept of the Table API and SQL integration. It is responsible for:
Table
in the internal catalogDataStream
or DataSet
into a Table
ExecutionEnvironment
or StreamExecutionEnvironment
A Table
is always bound to a specific TableEnvironment
. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
A TableEnvironment
is created by calling the static TableEnvironment.getTableEnvironment()
method with a StreamExecutionEnvironment
or an ExecutionEnvironment
and an optional TableConfig
. The TableConfig
can be used to configure the TableEnvironment
or to customize the query optimization and translation process (see Query Optimization).
// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
A TableEnvironment
has an internal catalog of tables, organized by table name. Table API or SQL queries can access tables which are registered in the catalog, by referencing them by name.
A TableEnvironment
allows you to register a table from various sources:
Table
object, usually the result of a Table API or SQL query.TableSource
, which accesses external data, such as a file, database, or messaging system.DataStream
or DataSet
from a DataStream or DataSet program.Registering a DataStream
or DataSet
as a table is discussed in the Integration with DataStream and DataSet API section.
A Table
is registered in a TableEnvironment
as follows:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table is the result of a simple projection query
Table projTable = tableEnv.scan("X").project(...);
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("X").project(...)
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)
Note: A registered Table
is treated similarly to a VIEW
as known from relational database systems, i.e., the query that defines the Table
is not optimized but will be inlined when another query references the registered Table
. If multiple queries reference the same registered Table
, it will be inlined for each referencing query and executed multiple times, i.e., the result of the registered Table
will not be shared.
A TableSource
provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, …), a file with specific encoding (CSV, Apache [Parquet, Avro, ORC], …), or a messaging system (Apache Kafka, RabbitMQ, …).
Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the Table Sources and Sinks page for a list of supported TableSources and instructions for how to build a custom TableSource
.
A TableSource
is registered in a TableEnvironment
as follows:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// create a TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
An external catalog can provide information about external databases and tables such as their name, schema, statistics, and information for how to access data stored in an external database, table, or file.
An external catalog can be created by implementing the ExternalCatalog
interface and is registered in a TableEnvironment
as follows:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// create an external catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
Once registered in a TableEnvironment
, all tables defined in a ExternalCatalog
can be accessed from Table API or SQL queries by specifying their full path, such as catalog.database.table
.
Currently, Flink provides an InMemoryExternalCatalog
for demo and testing purposes. However, the ExternalCatalog
interface can also be used to connect catalogs like HCatalog or Metastore to the Table API.
The Table API is a language-integrated query API for Scala and Java. In contrast to SQL, queries are not specified as Strings but are composed step-by-step in the host language.
The API is based on the Table
class which represents a table (streaming or batch) and offers methods to apply relational operations. These methods return a new Table
object, which represents the result of applying the relational operation on the input Table
. Some relational operations are composed of multiple method calls such as table.groupBy(...).select()
, where groupBy(...)
specifies a grouping of table
, and select(...)
the projection on the grouping of table
.
The Table API document describes all Table API operations that are supported on streaming and batch tables.
The following example shows a simple Table API aggregation query:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders")
// compute revenue for all customers from France
Table revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// emit or convert Table
// execute query
Note: The Scala Table API uses Scala Symbols, which start with a single tick ('
) to reference the attributes of a Table
. The Table API uses Scala implicits. Make sure to import org.apache.flink.api.scala._
and org.apache.flink.table.api.scala._
in order to use Scala implicit conversions.
Flink’s SQL integration is based on Apache Calcite, which implements the SQL standard. SQL queries are specified as regular Strings.
The SQL document describes Flink’s SQL support for streaming and batch tables.
The following example shows how to specify a query and return the result as a Table.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sql(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sql("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query
Table API and SQL queries can be easily mixed because both return Table
objects:
Table
object returned by a SQL query.TableEnvironment
and referencing it in the FROM
clause of the SQL query.In order to emit a Table
, it can be written to a TableSink
. A TableSink
is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
A batch Table
can only be written to a BatchTableSink
, while a streaming table requires either an AppendStreamTableSink
, a RetractStreamTableSink
, or an UpsertStreamTableSink
.
Please see the documentation about Table Sources & Sinks for details about available sinks and instructions for how to implement a custom TableSink
.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// write the result Table to the TableSink
result.writeToSink(sink);
// execute the program
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// write the result Table to the TableSink
result.writeToSink(sink)
// execute the program
Table API and SQL queries are translated into DataStream or DataSet programs depending on whether their input is a streaming or batch input. A query is internally represented as a logical query plan and is translated in two phases:
A Table API or SQL query is translated when:
Table
is emitted to a TableSink
, i.e., when Table.writeToSink()
is called.Table
is converted into a DataStream
or DataSet
(see Integration with DataStream and DataSet API).Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when StreamExecutionEnvironment.execute()
or ExecutionEnvironment.execute()
is called.
Table API and SQL queries can be easily integrated with and embedded into DataStream and DataSet programs. For instance, it is possible to query an external table (for example from a RDBMS), do some pre-processing, such as filtering, projecting, aggregating, or joining with meta data, and then further process the data with either the DataStream or DataSet API (and any of the libraries built on top of these APIs, such as CEP or Gelly). Inversely, a Table API or SQL query can also be applied on the result of a DataStream or DataSet program.
This interaction can be achieved by converting a DataStream
or DataSet
into a Table
and vice versa. In this section, we describe how these conversions are done.
The Scala Table API features implicit conversions for the DataSet
, DataStream
, and Table
classes. These conversions are enabled by importing the package org.apache.flink.table.api.scala._
in addition to org.apache.flink.api.scala._
for the Scala DataStream API.
A DataStream
or DataSet
can be registered in a TableEnvironment
as a Table. The schema of the resulting table depends on the data type of the registered DataStream
or DataSet
. Please check the section about mapping of data types to table schema for details.
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
Note: The name of a DataStream
Table
must not match the ^_DataStreamTable_[0-9]+
pattern and the name of a DataSet
Table
must not match the ^_DataSetTable_[0-9]+
pattern. These patterns are reserved for internal use only.
Instead of registering a DataStream
or DataSet
in a TableEnvironment
, it can also be directly converted into a Table
. This is convenient if you want to use the Table in a Table API query.
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
A Table
can be converted into a DataStream
or DataSet
. In this way, custom DataStream or DataSet programs can be run on the result of a Table API or SQL query.
When converting a Table
into a DataStream
or DataSet
, you need to specify the data type of the resulting DataStream
or DataSet
, i.e., the data type into which the rows of the Table
are to be converted. Often the most convenient conversion type is Row
. The following list gives an overview of the features of the different options:
null
values, no type-safe access.Table
fields), arbitrary number of fields, support for null
values, type-safe access.null
values, type-safe access.null
values, type-safe access.Table
must have a single field, no support for null
values, type-safe access.A Table
that is the result of a streaming query will be updated dynamically, i.e., it is changing as new records arrive on the query’s input streams. Hence, the DataStream
into which such a dynamic query is converted needs to encode the updates of the table.
There are two modes to convert a Table
into a DataStream
:
Table
is only modified by INSERT
changes, i.e, it is append-only and previously emitted results are never updated.INSERT
and DELETE
changes with a boolean
flag.// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
Note: A detailed discussion about dynamic tables and their properties is given in the Streaming Queries document.
A Table
is converted into a DataSet
as follows:
// get BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
Flink’s DataStream and DataSet APIs support very diverse types, such as Tuples (built-in Scala and Flink Java tuples), POJOs, case classes, and atomic types. In the following we describe how the Table API converts these types into an internal row representation and show examples of converting a DataStream
into a Table
.
Flink treats primitives (Integer
, Double
, String
) or generic types (types that cannot be analyzed and decomposed) as atomic types. A DataStream
or DataSet
of an atomic type is converted into a Table
with a single attribute. The type of the attribute is inferred from the atomic type and the name of the attribute must be specified.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Long> stream = ...
// convert DataStream into Table with field "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[Long] = ...
// convert DataStream into Table with field 'myLong
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Flink supports Scala’s built-in tuples and provides its own tuple classes for Java. DataStreams and DataSets of both kinds of tuples can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// convert DataStream into Table with field names "myLong", "myString"
Table table1 = tableEnv.fromDataStream(stream, "myLong, myString");
// convert DataStream into Table with default field names "f0", "f1"
Table table2 = tableEnv.fromDataStream(stream);
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with field names 'myLong, 'myString
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// convert DataStream into Table with default field names '_1, '_2
val table2: Table = tableEnv.fromDataStream(stream)
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with default field names 'name, 'age
val tableCC1 = tableEnv.fromDataStream(streamCC)
// convert DataStream into Table with field names 'myName, 'myAge
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
Flink supports POJOs as composite types. The rules for what determines a POJO are documented here.
When converting a POJO DataStream
or DataSet
into a Table
without specifying field names, the names of the original POJO fields are used. Renaming the original POJO fields requires the keyword AS
because POJO fields have no inherent order. The name mapping requires the original names and cannot be done by positions.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...
// convert DataStream into Table with field names "name", "age"
Table table1 = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field names "myName", "myAge"
Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
The Row data type supports an arbitrary number of fields and fields with null
values. Field names can be specified via a RowTypeInfo
or when converting a Row
DataStream
or DataSet
into a Table
(based on position).
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...
// convert DataStream into Table with field names "name", "age"
Table table1 = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field names "myName", "myAge"
Table table2 = tableEnv.fromDataStream(stream, "myName, myAge");
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
Apache Flink leverages Apache Calcite to optimize and translate queries. The optimization currently performed include projection and filter push-down, subquery decorrelation, and other kinds of query rewriting. Flink does not yet optimize the order of joins, but executes them in the same order as defined in the query (order of Tables in the FROM
clause and/or order of join predicates in the WHERE
clause).
It is possible to tweak the set of optimization rules which are applied in different phases by providing a CalciteConfig
object. This can be created via a builder by calling CalciteConfig.createBuilder())
and is provided to the TableEnvironment by calling tableEnv.getConfig.setCalciteConfig(calciteConfig)
.
The Table API provides a mechanism to explain the logical and optimized query plans to compute a Table
.
This is done through the TableEnvironment.explain(table)
method. It returns a String describing three plans:
The following code shows an example and the corresponding output:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, 'F%')])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, 'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE