@Internal public class TableEnvironmentImpl extends Object implements TableEnvironment
TableEnvironment
that works exclusively with Table API interfaces.
Only TableSource
is supported as an input and TableSink
as an output. It also does
not bind to any particular StreamExecutionEnvironment
.Modifier and Type | Field and Description |
---|---|
protected Executor |
execEnv |
protected FunctionCatalog |
functionCatalog |
protected Planner |
planner |
protected TableConfig |
tableConfig |
Modifier | Constructor and Description |
---|---|
protected |
TableEnvironmentImpl(CatalogManager catalogManager,
TableConfig tableConfig,
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode) |
Modifier and Type | Method and Description |
---|---|
ConnectTableDescriptor |
connect(ConnectorDescriptor connectorDescriptor)
Creates a table source and/or table sink from a descriptor.
|
static TableEnvironmentImpl |
create(EnvironmentSettings settings)
Creates a table environment that is the entry point and central context for creating Table & SQL
API programs.
|
protected TableImpl |
createTable(QueryOperation tableOperation) |
JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
String |
explain(boolean extended)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of multiple-sinks plan.
|
String |
explain(Table table)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of the given
Table . |
String |
explain(Table table,
boolean extended)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of the given
Table . |
Table |
fromTableSource(TableSource<?> source)
Creates a table from a table source.
|
Optional<Catalog> |
getCatalog(String catalogName)
Gets a registered
Catalog by name. |
String[] |
getCompletionHints(String statement,
int position)
Returns completion hints for the given statement at the given cursor position.
|
TableConfig |
getConfig()
Returns the table config that defines the runtime behavior of the Table API.
|
String |
getCurrentCatalog()
Gets the current default catalog name of the current session.
|
String |
getCurrentDatabase()
Gets the current default database name of the running session.
|
Planner |
getPlanner() |
ExternalCatalog |
getRegisteredExternalCatalog(String name)
Gets a registered
ExternalCatalog by name. |
void |
insertInto(Table table,
String path,
String... pathContinued)
|
protected boolean |
isEagerOperationTranslation()
Defines the behavior of this
TableEnvironment . |
String[] |
listCatalogs()
Gets the names of all catalogs registered in this environment.
|
String[] |
listDatabases()
Gets the names of all databases registered in the current catalog.
|
String[] |
listTables()
Gets the names of all tables registered in the current database of the current catalog.
|
String[] |
listUserDefinedFunctions()
Gets the names of all user defined functions registered in this environment.
|
void |
registerCatalog(String catalogName,
Catalog catalog)
Registers a
Catalog under a unique name. |
void |
registerExternalCatalog(String name,
ExternalCatalog externalCatalog)
Registers an
ExternalCatalog under a unique name in the TableEnvironment's schema. |
void |
registerFunction(String name,
ScalarFunction function)
Registers a
ScalarFunction under a unique name. |
void |
registerTable(String name,
Table table)
Registers a
Table under a unique name in the TableEnvironment's catalog. |
protected void |
registerTableInternal(String name,
CatalogBaseTable table) |
void |
registerTableSink(String name,
String[] fieldNames,
TypeInformation<?>[] fieldTypes,
TableSink<?> tableSink)
Registers an external
TableSink with given field names and types in this
TableEnvironment 's catalog. |
void |
registerTableSink(String name,
TableSink<?> configuredSink)
Registers an external
TableSink with already configured field names and field types in
this TableEnvironment 's catalog. |
void |
registerTableSource(String name,
TableSource<?> tableSource)
Registers an external
TableSource in this TableEnvironment 's catalog. |
Table |
scan(String... tablePath)
Scans a registered table and returns the resulting
Table . |
Table |
sqlQuery(String query)
Evaluates a SQL query on registered tables and retrieves the result as a
Table . |
void |
sqlUpdate(String stmt)
Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
NOTE: Currently only SQL INSERT statements and CREATE TABLE statements are supported.
|
void |
useCatalog(String catalogName)
Sets the current catalog to the given value.
|
void |
useDatabase(String databaseName)
Sets the current default database.
|
protected void |
validateTableSource(TableSource<?> tableSource)
Subclasses can override this method to add additional checks.
|
protected final TableConfig tableConfig
protected final Executor execEnv
protected final FunctionCatalog functionCatalog
protected final Planner planner
protected TableEnvironmentImpl(CatalogManager catalogManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode)
public static TableEnvironmentImpl create(EnvironmentSettings settings)
TableEnvironment
It is unified both on a language level for all JVM-based languages (i.e. there is no distinction between Scala and Java API) and for bounded and unbounded data processing.
A table environment is responsible for:
Table
s and other meta objects from a catalog.Note: This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.
create
in interface TableEnvironment
settings
- The environment settings used to instantiate the TableEnvironment
.@VisibleForTesting public Planner getPlanner()
public Table fromTableSource(TableSource<?> source)
TableEnvironment
fromTableSource
in interface TableEnvironment
source
- table source used as tablepublic void registerExternalCatalog(String name, ExternalCatalog externalCatalog)
TableEnvironment
ExternalCatalog
under a unique name in the TableEnvironment's schema.
All tables registered in the ExternalCatalog
can be accessed.registerExternalCatalog
in interface TableEnvironment
name
- The name under which the externalCatalog will be registered.externalCatalog
- The externalCatalog to register.TableEnvironment.getCatalog(String)
,
TableEnvironment.registerCatalog(String, Catalog)
public ExternalCatalog getRegisteredExternalCatalog(String name)
TableEnvironment
ExternalCatalog
by name.getRegisteredExternalCatalog
in interface TableEnvironment
name
- The name to look up the ExternalCatalog
.ExternalCatalog
.TableEnvironment.getCatalog(String)
,
TableEnvironment.registerCatalog(String, Catalog)
public void registerCatalog(String catalogName, Catalog catalog)
TableEnvironment
registerCatalog
in interface TableEnvironment
catalogName
- The name under which the catalog will be registered.catalog
- The catalog to register.public Optional<Catalog> getCatalog(String catalogName)
TableEnvironment
Catalog
by name.getCatalog
in interface TableEnvironment
catalogName
- The name to look up the Catalog
.public void registerFunction(String name, ScalarFunction function)
TableEnvironment
ScalarFunction
under a unique name. Replaces already existing
user-defined functions under this name.registerFunction
in interface TableEnvironment
public void registerTable(String name, Table table)
TableEnvironment
Table
under a unique name in the TableEnvironment's catalog.
Registered tables can be referenced in SQL queries.registerTable
in interface TableEnvironment
name
- The name under which the table will be registered.table
- The table to register.public void registerTableSource(String name, TableSource<?> tableSource)
TableEnvironment
TableSource
in this TableEnvironment
's catalog.
Registered tables can be referenced in SQL queries.registerTableSource
in interface TableEnvironment
name
- The name under which the TableSource
is registered.tableSource
- The TableSource
to register.public void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink)
TableEnvironment
TableSink
with given field names and types in this
TableEnvironment
's catalog.
Registered sink tables can be referenced in SQL DML statements.registerTableSink
in interface TableEnvironment
name
- The name under which the TableSink
is registered.fieldNames
- The field names to register with the TableSink
.fieldTypes
- The field types to register with the TableSink
.tableSink
- The TableSink
to register.public void registerTableSink(String name, TableSink<?> configuredSink)
TableEnvironment
TableSink
with already configured field names and field types in
this TableEnvironment
's catalog.
Registered sink tables can be referenced in SQL DML statements.registerTableSink
in interface TableEnvironment
name
- The name under which the TableSink
is registered.configuredSink
- The configured TableSink
to register.public Table scan(String... tablePath)
TableEnvironment
Table
.
A table to scan must be registered in the TableEnvironment
. It can be either directly
registered or be an external member of a Catalog
.
See the documentation of TableEnvironment.useDatabase(String)
or
TableEnvironment.useCatalog(String)
for the rules on the path resolution.
Examples:
Scanning a directly registered table.
Table tab = tableEnv.scan("tableName");
Scanning a table from a registered catalog.
Table tab = tableEnv.scan("catalogName", "dbName", "tableName");
scan
in interface TableEnvironment
tablePath
- The path of the table to scan.Table
.TableEnvironment.useCatalog(String)
,
TableEnvironment.useDatabase(String)
public ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor)
TableEnvironment
Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.
The following example shows how to read from a connector using a JSON format and register a table source as "MyTable":
tableEnv
.connect(
new ExternalSystemXYZ()
.version("0.11"))
.withFormat(
new Json()
.jsonSchema("{...}")
.failOnMissingField(false))
.withSchema(
new Schema()
.field("user-name", "VARCHAR").from("u_name")
.field("count", "DECIMAL")
.registerSource("MyTable");
connect
in interface TableEnvironment
connectorDescriptor
- connector descriptor describing the external systempublic String[] listCatalogs()
TableEnvironment
listCatalogs
in interface TableEnvironment
public String[] listDatabases()
TableEnvironment
listDatabases
in interface TableEnvironment
public String[] listTables()
TableEnvironment
listTables
in interface TableEnvironment
public String[] listUserDefinedFunctions()
TableEnvironment
listUserDefinedFunctions
in interface TableEnvironment
public String explain(Table table)
TableEnvironment
Table
.explain
in interface TableEnvironment
table
- The table for which the AST and execution plan will be returned.public String explain(Table table, boolean extended)
TableEnvironment
Table
.explain
in interface TableEnvironment
table
- The table for which the AST and execution plan will be returned.extended
- if the plan should contain additional properties such as
e.g. estimated cost, traitspublic String explain(boolean extended)
TableEnvironment
explain
in interface TableEnvironment
extended
- if the plan should contain additional properties such as
e.g. estimated cost, traitspublic String[] getCompletionHints(String statement, int position)
TableEnvironment
getCompletionHints
in interface TableEnvironment
statement
- Partial or slightly incorrect SQL statementposition
- cursor positionpublic Table sqlQuery(String query)
TableEnvironment
Table
.
All tables referenced by the query must be registered in the TableEnvironment.
A Table
is automatically registered when its Table#toString()
method is
called, for example when it is embedded into a String.
Hence, SQL queries can directly reference a Table
as follows:
Table table = ...;
String tableName = table.toString();
// the table is not registered to the table environment
tEnv.sqlQuery("SELECT * FROM tableName");
sqlQuery
in interface TableEnvironment
query
- The SQL query to evaluate.public void insertInto(Table table, String path, String... pathContinued)
TableEnvironment
Table
to a TableSink
that was registered under the specified name.
See the documentation of TableEnvironment.useDatabase(String)
or
TableEnvironment.useCatalog(String)
for the rules on the path resolution.
insertInto
in interface TableEnvironment
table
- The Table to write to the sink.path
- The first part of the path of the registered TableSink
to which the Table
is
written. This is to ensure at least the name of the TableSink
is provided.pathContinued
- The remaining part of the path of the registered TableSink
to which the
Table
is written.public void sqlUpdate(String stmt)
TableEnvironment
All tables referenced by the query must be registered in the TableEnvironment.
A Table
is automatically registered when its Table#toString()
method is
called, for example when it is embedded into a String.
Hence, SQL queries can directly reference a Table
as follows:
// register the configured table sink into which the result is inserted.
tEnv.registerTableSink("sinkTable", configuredSink);
Table sourceTable = ...
String tableName = sourceTable.toString();
// sourceTable is not registered to the table environment
tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName");
A DDL statement can also be executed to create a table: For example, the below DDL statement would create a CSV table named `tbl1` into the current catalog:
create table tbl1( a int, b bigint, c varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )
SQL queries can directly execute as follows:
This code snippet creates a job to read data from Kafka source into a CSV sink.String sinkDDL = "create table sinkTable( a int, b varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )"; String sourceDDL ="create table sourceTable( a int, b varchar ) with ( 'connector.type' = 'kafka', 'update-mode' = 'append', 'connector.topic' = 'xxx', 'connector.properties.0.key' = 'k0', 'connector.properties.0.value' = 'v0', ... )"; String query = "INSERT INTO sinkTable SELECT * FROM sourceTable"; tEnv.sqlUpdate(sourceDDL); tEnv.sqlUpdate(sinkDDL); tEnv.sqlUpdate(query); tEnv.execute("MyJob");
sqlUpdate
in interface TableEnvironment
stmt
- The SQL statement to evaluate.public String getCurrentCatalog()
TableEnvironment
getCurrentCatalog
in interface TableEnvironment
TableEnvironment.useCatalog(String)
public void useCatalog(String catalogName)
TableEnvironment
TableEnvironment.useDatabase(String)
.
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to default_catalog
and default database set to
default_database
.
root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path | Resolved path |
---|---|
tab1 | default_catalog.default_database.tab1 |
db1.tab1 | default_catalog.db1.tab1 |
cat1.db1.tab1 | cat1.db1.tab1 |
useCatalog
in interface TableEnvironment
catalogName
- The name of the catalog to set as the current default catalog.TableEnvironment.useDatabase(String)
public String getCurrentDatabase()
TableEnvironment
getCurrentDatabase
in interface TableEnvironment
TableEnvironment.useDatabase(String)
public void useDatabase(String databaseName)
TableEnvironment
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to default_catalog
and default database set to
default_database
.
root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1The following table describes resolved paths:
Requested path | Resolved path |
---|---|
tab1 | default_catalog.default_database.tab1 |
db1.tab1 | default_catalog.db1.tab1 |
cat1.db1.tab1 | cat1.db1.tab1 |
useDatabase
in interface TableEnvironment
databaseName
- The name of the database to set as the current database.TableEnvironment.useCatalog(String)
public TableConfig getConfig()
TableEnvironment
getConfig
in interface TableEnvironment
public JobExecutionResult execute(String jobName) throws Exception
TableEnvironment
The program execution will be logged and displayed with the provided name
NOTE:It is highly advised to set all parameters in the TableConfig
on the very beginning of the program. It is undefined what configurations values will
be used for the execution if queries are mixed with config changes. It depends on
the characteristic of the particular parameter. For some of them the value from the
point in time of query construction (e.g. the currentCatalog) will be used. On the
other hand some values might be evaluated according to the state from the time when
this method is called (e.g. timeZone).
execute
in interface TableEnvironment
jobName
- Desired name of the jobException
- which occurs during job execution.protected boolean isEagerOperationTranslation()
TableEnvironment
. If true the queries will
be translated immediately. If false the ModifyOperation
s will be buffered
and translated only when execute(String)
is called.
If the TableEnvironment
works in a lazy manner it is undefined what
configurations values will be used. It depends on the characteristic of the particular
parameter. Some might used values current to the time of query construction (e.g. the currentCatalog)
and some use values from the time when execute(String)
is called (e.g. timeZone).
protected void validateTableSource(TableSource<?> tableSource)
tableSource
- tableSource to validateprotected void registerTableInternal(String name, CatalogBaseTable table)
protected TableImpl createTable(QueryOperation tableOperation)
Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.