@Internal public class TableEnvironmentImpl extends Object implements TableEnvironmentInternal
TableEnvironment
that works exclusively with Table API interfaces. Only
TableSource
is supported as an input and TableSink
as an output. It also does not
bind to any particular StreamExecutionEnvironment
.Modifier and Type | Field and Description |
---|---|
protected Executor |
execEnv |
protected FunctionCatalog |
functionCatalog |
protected Parser |
parser |
protected Planner |
planner |
protected TableConfig |
tableConfig |
Modifier | Constructor and Description |
---|---|
protected |
TableEnvironmentImpl(CatalogManager catalogManager,
ModuleManager moduleManager,
TableConfig tableConfig,
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode,
ClassLoader userClassLoader) |
Modifier and Type | Method and Description |
---|---|
ConnectTableDescriptor |
connect(ConnectorDescriptor connectorDescriptor)
Creates a temporary table from a descriptor.
|
static TableEnvironmentImpl |
create(EnvironmentSettings settings)
Creates a table environment that is the entry point and central context for creating Table
and SQL API programs.
|
void |
createFunction(String path,
Class<? extends UserDefinedFunction> functionClass)
Registers a
UserDefinedFunction class as a catalog function in the given path. |
void |
createFunction(String path,
Class<? extends UserDefinedFunction> functionClass,
boolean ignoreIfExists)
Registers a
UserDefinedFunction class as a catalog function in the given path. |
StatementSet |
createStatementSet()
Create a
StatementSet instance which accepts DML statements or Tables, the planner
can optimize all added statements and Tables together and then submit as one job. |
protected TableImpl |
createTable(QueryOperation tableOperation) |
void |
createTemporaryFunction(String path,
Class<? extends UserDefinedFunction> functionClass)
Registers a
UserDefinedFunction class as a temporary catalog function. |
void |
createTemporaryFunction(String path,
UserDefinedFunction functionInstance)
Registers a
UserDefinedFunction instance as a temporary catalog function. |
void |
createTemporarySystemFunction(String name,
Class<? extends UserDefinedFunction> functionClass)
Registers a
UserDefinedFunction class as a temporary system function. |
void |
createTemporarySystemFunction(String name,
UserDefinedFunction functionInstance)
Registers a
UserDefinedFunction instance as a temporary system function. |
void |
createTemporaryView(String path,
Table view)
Registers a
Table API object as a temporary view similar to SQL temporary views. |
boolean |
dropFunction(String path)
Drops a catalog function registered in the given path.
|
boolean |
dropTemporaryFunction(String path)
Drops a temporary catalog function registered in the given path.
|
boolean |
dropTemporarySystemFunction(String name)
Drops a temporary system function registered under the given name.
|
boolean |
dropTemporaryTable(String path)
Drops a temporary table registered in the given path.
|
boolean |
dropTemporaryView(String path)
Drops a temporary view registered in the given path.
|
JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
TableResult |
executeInternal(List<ModifyOperation> operations)
Execute the given modify operations and return the execution result.
|
TableResult |
executeInternal(QueryOperation operation)
Execute the given query operation and return the execution result.
|
TableResult |
executeSql(String statement)
Execute the given single statement, and return the execution result.
|
String |
explain(boolean extended)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of multiple-sinks plan.
|
String |
explain(Table table)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of the given
Table . |
String |
explain(Table table,
boolean extended)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of the given
Table . |
String |
explainInternal(List<Operation> operations,
ExplainDetail... extraDetails)
Returns the AST of this table and the execution plan to compute the result of this table.
|
String |
explainSql(String statement,
ExplainDetail... extraDetails)
Returns the AST of the specified statement and the execution plan to compute the result of
the given statement.
|
Table |
from(String path)
Reads a registered table and returns the resulting
Table . |
Table |
fromTableSource(TableSource<?> source)
Creates a table from a table source.
|
Table |
fromValues(AbstractDataType<?> rowType,
Expression... values)
Creates a Table from given collection of objects with a given row type.
|
Table |
fromValues(AbstractDataType<?> rowType,
Iterable<?> values)
Creates a Table from given collection of objects with a given row type.
|
Table |
fromValues(AbstractDataType<?> rowType,
Object... values)
Creates a Table from given collection of objects with a given row type.
|
Table |
fromValues(Expression... values)
Creates a Table from given values.
|
Table |
fromValues(Iterable<?> values)
Creates a Table from given collection of objects.
|
Table |
fromValues(Object... values)
Creates a Table from given values.
|
Optional<Catalog> |
getCatalog(String catalogName)
Gets a registered
Catalog by name. |
CatalogManager |
getCatalogManager()
Returns a
CatalogManager that deals with all catalog objects. |
String[] |
getCompletionHints(String statement,
int position)
Returns completion hints for the given statement at the given cursor position.
|
TableConfig |
getConfig()
Returns the table config that defines the runtime behavior of the Table API.
|
String |
getCurrentCatalog()
Gets the current default catalog name of the current session.
|
String |
getCurrentDatabase()
Gets the current default database name of the running session.
|
protected ExplainDetail[] |
getExplainDetails(boolean extended) |
Parser |
getParser()
Return a
Parser that provides methods for parsing a SQL string. |
Planner |
getPlanner() |
void |
insertInto(String targetPath,
Table table)
Instructs to write the content of a
Table API object into a table. |
void |
insertInto(Table table,
String sinkPath,
String... sinkPathContinued)
|
String[] |
listCatalogs()
Gets the names of all catalogs registered in this environment.
|
String[] |
listDatabases()
Gets the names of all databases registered in the current catalog.
|
String[] |
listFunctions()
Gets the names of all functions in this environment.
|
String[] |
listModules()
Gets an array of names of all modules in this environment in the loaded order.
|
String[] |
listTables()
Gets the names of all tables available in the current namespace (the current database of the
current catalog).
|
String[] |
listTemporaryTables()
Gets the names of all temporary tables and views available in the current namespace (the
current database of the current catalog).
|
String[] |
listTemporaryViews()
Gets the names of all temporary views available in the current namespace (the current
database of the current catalog).
|
String[] |
listUserDefinedFunctions()
Gets the names of all user defined functions registered in this environment.
|
String[] |
listViews()
Gets the names of all views available in the current namespace (the current database of the
current catalog).
|
void |
loadModule(String moduleName,
Module module)
Loads a
Module under a unique name. |
protected QueryOperation |
qualifyQueryOperation(ObjectIdentifier identifier,
QueryOperation queryOperation)
Subclasses can override this method to transform the given QueryOperation to a new one with
the qualified object identifier.
|
void |
registerCatalog(String catalogName,
Catalog catalog)
Registers a
Catalog under a unique name. |
void |
registerFunction(String name,
ScalarFunction function)
Registers a
ScalarFunction under a unique name. |
void |
registerTable(String name,
Table table)
Registers a
Table under a unique name in the TableEnvironment's catalog. |
void |
registerTableSink(String name,
String[] fieldNames,
TypeInformation<?>[] fieldTypes,
TableSink<?> tableSink)
Registers an external
TableSink with given field names and types in this TableEnvironment 's catalog. |
void |
registerTableSink(String name,
TableSink<?> configuredSink)
Registers an external
TableSink with already configured field names and field types
in this TableEnvironment 's catalog. |
void |
registerTableSinkInternal(String name,
TableSink<?> tableSink)
Registers an external
TableSink with already configured field names and field types
in this TableEnvironment 's catalog. |
void |
registerTableSource(String name,
TableSource<?> tableSource)
Registers an external
TableSource in this TableEnvironment 's catalog. |
void |
registerTableSourceInternal(String name,
TableSource<?> tableSource)
Registers an external
TableSource in this TableEnvironment 's catalog. |
Table |
scan(String... tablePath)
Scans a registered table and returns the resulting
Table . |
Table |
sqlQuery(String query)
Evaluates a SQL query on registered tables and retrieves the result as a
Table . |
void |
sqlUpdate(String stmt)
Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement; NOTE:
Currently only SQL INSERT statements and CREATE TABLE statements are supported.
|
protected List<Transformation<?>> |
translateAndClearBuffer()
Translate the buffered operations to Transformations, and clear the buffer.
|
void |
unloadModule(String moduleName)
Unloads a
Module with given name. |
void |
useCatalog(String catalogName)
Sets the current catalog to the given value.
|
void |
useDatabase(String databaseName)
Sets the current default database.
|
protected void |
validateTableSource(TableSource<?> tableSource)
Subclasses can override this method to add additional checks.
|
protected final TableConfig tableConfig
protected final Executor execEnv
protected final FunctionCatalog functionCatalog
protected final Planner planner
protected final Parser parser
protected TableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader)
public static TableEnvironmentImpl create(EnvironmentSettings settings)
TableEnvironment
It is unified both on a language level for all JVM-based languages (i.e. there is no distinction between Scala and Java API) and for bounded and unbounded data processing.
A table environment is responsible for:
Table
s and other meta objects from a catalog.
Note: This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.
create
in interface TableEnvironment
settings
- The environment settings used to instantiate the TableEnvironment
.public Table fromValues(Object... values)
TableEnvironment
Examples:
You can use a row(...)
expression to create a composite rows:
tEnv.fromValues(
row(1, "ABC"),
row(2L, "ABCDE")
)
will produce a Table with a schema as follows:
root
|-- f0: BIGINT NOT NULL // original types INT and BIGINT are generalized to BIGINT
|-- f1: VARCHAR(5) NOT NULL // original types CHAR(3) and CHAR(5) are generalized to VARCHAR(5)
// it uses VARCHAR instead of CHAR so that no padding is applied
The method will derive the types automatically from the input expressions. If types at a
certain position differ, the method will try to find a common super type for all types. If a
common super type does not exist, an exception will be thrown. If you want to specify the
requested type explicitly see TableEnvironment.fromValues(AbstractDataType, Object...)
.
It is also possible to use Row
object instead of row
expressions.
ROWs that are a result of e.g. a function call are not flattened
public class RowFunction extends ScalarFunction {
{@literal @}DataTypeHint("ROW<f0 BIGINT, f1 VARCHAR(5)>")
Row eval();
}
tEnv.fromValues(
call(new RowFunction()),
call(new RowFunction())
)
will produce a Table with a schema as follows:
root
|-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
The row constructor can be dropped to create a table with a single column:
ROWs that are a result of e.g. a function call are not flattened
tEnv.fromValues(
1,
2L,
3
)
will produce a Table with a schema as follows:
root
|-- f0: BIGINT NOT NULL
fromValues
in interface TableEnvironment
values
- Expressions for constructing rows of the VALUES table.public Table fromValues(AbstractDataType<?> rowType, Object... values)
TableEnvironment
The difference between this method and TableEnvironment.fromValues(Object...)
is that the schema
can be manually adjusted. It might be helpful for assigning more generic types like e.g.
DECIMAL or naming the columns.
Examples:
tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "ABC"),
row(2L, "ABCDE")
)
will produce a Table with a schema as follows:
root
|-- id: DECIMAL(10, 2)
|-- f1: STRING
For more examples see TableEnvironment.fromValues(Object...)
.
fromValues
in interface TableEnvironment
rowType
- Expected row type for the values.values
- Expressions for constructing rows of the VALUES table.TableEnvironment.fromValues(Object...)
public Table fromValues(Expression... values)
TableEnvironment
Examples:
You can use a row(...)
expression to create a composite rows:
tEnv.fromValues(
row(1, "ABC"),
row(2L, "ABCDE")
)
will produce a Table with a schema as follows:
root
|-- f0: BIGINT NOT NULL // original types INT and BIGINT are generalized to BIGINT
|-- f1: VARCHAR(5) NOT NULL // original types CHAR(3) and CHAR(5) are generalized to VARCHAR(5)
* // it uses VARCHAR instead of CHAR so that no padding is applied
The method will derive the types automatically from the input expressions. If types at a
certain position differ, the method will try to find a common super type for all types. If a
common super type does not exist, an exception will be thrown. If you want to specify the
requested type explicitly see #fromValues(DataType, Expression...)
.
It is also possible to use Row
object instead of row
expressions.
ROWs that are a result of e.g. a function call are not flattened
public class RowFunction extends ScalarFunction {
{@literal @}DataTypeHint("ROW<f0 BIGINT, f1 VARCHAR(5)>")
Row eval();
}
tEnv.fromValues(
call(new RowFunction()),
call(new RowFunction())
)
will produce a Table with a schema as follows:
root
|-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
The row constructor can be dropped to create a table with a single column:
ROWs that are a result of e.g. a function call are not flattened
tEnv.fromValues(
lit(1).plus(2),
lit(2L),
lit(3)
)
will produce a Table with a schema as follows:
root
|-- f0: BIGINT NOT NULL
fromValues
in interface TableEnvironment
values
- Expressions for constructing rows of the VALUES table.public Table fromValues(AbstractDataType<?> rowType, Expression... values)
TableEnvironment
The difference between this method and TableEnvironment.fromValues(Expression...)
is that the
schema can be manually adjusted. It might be helpful for assigning more generic types like
e.g. DECIMAL or naming the columns.
Examples:
tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "ABC"),
row(2L, "ABCDE")
)
will produce a Table with a schema as follows:
root
|-- id: DECIMAL(10, 2)
|-- name: STRING
For more examples see TableEnvironment.fromValues(Expression...)
.
fromValues
in interface TableEnvironment
rowType
- Expected row type for the values.values
- Expressions for constructing rows of the VALUES table.TableEnvironment.fromValues(Expression...)
public Table fromValues(Iterable<?> values)
TableEnvironment
See TableEnvironment.fromValues(Object...)
for more explanation.
fromValues
in interface TableEnvironment
values
- Expressions for constructing rows of the VALUES table.TableEnvironment.fromValues(Object...)
public Table fromValues(AbstractDataType<?> rowType, Iterable<?> values)
TableEnvironment
See TableEnvironment.fromValues(AbstractDataType, Object...)
for more explanation.
fromValues
in interface TableEnvironment
rowType
- Expected row type for the values.values
- Expressions for constructing rows of the VALUES table.TableEnvironment.fromValues(AbstractDataType, Object...)
@VisibleForTesting public Planner getPlanner()
public Table fromTableSource(TableSource<?> source)
TableEnvironment
fromTableSource
in interface TableEnvironment
source
- table source used as tablepublic void registerCatalog(String catalogName, Catalog catalog)
TableEnvironment
registerCatalog
in interface TableEnvironment
catalogName
- The name under which the catalog will be registered.catalog
- The catalog to register.public Optional<Catalog> getCatalog(String catalogName)
TableEnvironment
Catalog
by name.getCatalog
in interface TableEnvironment
catalogName
- The name to look up the Catalog
.public void loadModule(String moduleName, Module module)
TableEnvironment
Module
under a unique name. Modules will be kept in the loaded order.
ValidationException is thrown when there is already a module with the same name.loadModule
in interface TableEnvironment
moduleName
- name of the Module
module
- the module instancepublic void unloadModule(String moduleName)
TableEnvironment
Module
with given name. ValidationException is thrown when there is no
module with the given nameunloadModule
in interface TableEnvironment
moduleName
- name of the Module
public void registerFunction(String name, ScalarFunction function)
TableEnvironment
ScalarFunction
under a unique name. Replaces already existing
user-defined functions under this name.registerFunction
in interface TableEnvironment
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass)
TableEnvironment
UserDefinedFunction
class as a temporary system function.
Compared to TableEnvironment.createTemporaryFunction(String, Class)
, system functions are
identified by a global name that is independent of the current catalog and current database.
Thus, this method allows to extend the set of built-in system functions like TRIM
,
ABS
, etc.
Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary system function.
createTemporarySystemFunction
in interface TableEnvironment
name
- The name under which the function will be registered globally.functionClass
- The function class containing the implementation.public void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance)
TableEnvironment
UserDefinedFunction
instance as a temporary system function.
Compared to TableEnvironment.createTemporarySystemFunction(String, Class)
, this method takes a
function instance that might have been parameterized before (e.g. through its constructor).
This might be useful for more interactive sessions. Make sure that the instance is Serializable
.
Compared to TableEnvironment.createTemporaryFunction(String, UserDefinedFunction)
, system
functions are identified by a global name that is independent of the current catalog and
current database. Thus, this method allows to extend the set of built-in system functions
like TRIM
, ABS
, etc.
Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary system function.
createTemporarySystemFunction
in interface TableEnvironment
name
- The name under which the function will be registered globally.functionInstance
- The (possibly pre-configured) function instance containing the
implementation.public boolean dropTemporarySystemFunction(String name)
TableEnvironment
If a permanent function with the given name exists, it will be used from now on for any queries that reference this name.
dropTemporarySystemFunction
in interface TableEnvironment
name
- The name under which the function has been registered globally.public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass)
TableEnvironment
UserDefinedFunction
class as a catalog function in the given path.
Compared to system functions with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.
There must not be another function (temporary or permanent) registered under the same path.
createFunction
in interface TableEnvironment
path
- The path under which the function will be registered. See also the TableEnvironment
class description for the format of the path.functionClass
- The function class containing the implementation.public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass, boolean ignoreIfExists)
TableEnvironment
UserDefinedFunction
class as a catalog function in the given path.
Compared to system functions with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.
createFunction
in interface TableEnvironment
path
- The path under which the function will be registered. See also the TableEnvironment
class description for the format of the path.functionClass
- The function class containing the implementation.ignoreIfExists
- If a function exists under the given path and this flag is set, no
operation is executed. An exception is thrown otherwise.public boolean dropFunction(String path)
TableEnvironment
dropFunction
in interface TableEnvironment
path
- The path under which the function has been registered. See also the TableEnvironment
class description for the format of the path.public void createTemporaryFunction(String path, Class<? extends UserDefinedFunction> functionClass)
TableEnvironment
UserDefinedFunction
class as a temporary catalog function.
Compared to TableEnvironment.createTemporarySystemFunction(String, Class)
with a globally defined
name, catalog functions are always (implicitly or explicitly) identified by a catalog and
database.
Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary function.
createTemporaryFunction
in interface TableEnvironment
path
- The path under which the function will be registered. See also the TableEnvironment
class description for the format of the path.functionClass
- The function class containing the implementation.public void createTemporaryFunction(String path, UserDefinedFunction functionInstance)
TableEnvironment
UserDefinedFunction
instance as a temporary catalog function.
Compared to TableEnvironment.createTemporaryFunction(String, Class)
, this method takes a function
instance that might have been parameterized before (e.g. through its constructor). This might
be useful for more interactive sessions. Make sure that the instance is Serializable
.
Compared to TableEnvironment.createTemporarySystemFunction(String, UserDefinedFunction)
with a
globally defined name, catalog functions are always (implicitly or explicitly) identified by
a catalog and database.
Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary function.
createTemporaryFunction
in interface TableEnvironment
path
- The path under which the function will be registered. See also the TableEnvironment
class description for the format of the path.functionInstance
- The (possibly pre-configured) function instance containing the
implementation.public boolean dropTemporaryFunction(String path)
TableEnvironment
If a permanent function with the given path exists, it will be used from now on for any queries that reference this path.
dropTemporaryFunction
in interface TableEnvironment
path
- The path under which the function will be registered. See also the TableEnvironment
class description for the format of the path.public void registerTable(String name, Table table)
TableEnvironment
Table
under a unique name in the TableEnvironment's catalog. Registered
tables can be referenced in SQL queries.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
registerTable
in interface TableEnvironment
name
- The name under which the table will be registered.table
- The table to register.public void createTemporaryView(String path, Table view)
TableEnvironment
Table
API object as a temporary view similar to SQL temporary views.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
createTemporaryView
in interface TableEnvironment
path
- The path under which the view will be registered. See also the TableEnvironment
class description for the format of the path.view
- The view to register.public void registerTableSource(String name, TableSource<?> tableSource)
TableEnvironment
TableSource
in this TableEnvironment
's catalog.
Registered tables can be referenced in SQL queries.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
registerTableSource
in interface TableEnvironment
name
- The name under which the TableSource
is registered.tableSource
- The TableSource
to register.public void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink)
TableEnvironment
TableSink
with given field names and types in this TableEnvironment
's catalog. Registered sink tables can be referenced in SQL DML statements.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
registerTableSink
in interface TableEnvironment
name
- The name under which the TableSink
is registered.fieldNames
- The field names to register with the TableSink
.fieldTypes
- The field types to register with the TableSink
.tableSink
- The TableSink
to register.public void registerTableSink(String name, TableSink<?> configuredSink)
TableEnvironment
TableSink
with already configured field names and field types
in this TableEnvironment
's catalog. Registered sink tables can be referenced in SQL
DML statements.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
registerTableSink
in interface TableEnvironment
name
- The name under which the TableSink
is registered.configuredSink
- The configured TableSink
to register.public Table scan(String... tablePath)
TableEnvironment
Table
.
A table to scan must be registered in the TableEnvironment
. It can be either
directly registered or be an external member of a Catalog
.
See the documentation of TableEnvironment.useDatabase(String)
or TableEnvironment.useCatalog(String)
for the rules on the path resolution.
Examples:
Scanning a directly registered table.
Table tab = tableEnv.scan("tableName");
Scanning a table from a registered catalog.
Table tab = tableEnv.scan("catalogName", "dbName", "tableName");
scan
in interface TableEnvironment
tablePath
- The path of the table to scan.Table
.TableEnvironment.useCatalog(String)
,
TableEnvironment.useDatabase(String)
public Table from(String path)
TableEnvironment
Table
.
A table to scan must be registered in the TableEnvironment
.
See the documentation of TableEnvironment.useDatabase(String)
or TableEnvironment.useCatalog(String)
for the rules on the path resolution.
Examples:
Reading a table from default catalog and database.
Table tab = tableEnv.from("tableName");
Reading a table from a registered catalog.
Table tab = tableEnv.from("catalogName.dbName.tableName");
Reading a table from a registered catalog with escaping. Dots in e.g. a database name must be escaped.
Table tab = tableEnv.from("catalogName.`db.Name`.Table");
from
in interface TableEnvironment
path
- The path of a table API object to scan.TableEnvironment.useCatalog(String)
,
TableEnvironment.useDatabase(String)
public void insertInto(String targetPath, Table table)
TableEnvironment
Table
API object into a table.
See the documentation of TableEnvironment.useDatabase(String)
or TableEnvironment.useCatalog(String)
for the rules on the path resolution.
insertInto
in interface TableEnvironment
targetPath
- The path of the registered TableSink
to which the Table
is
written.table
- The Table to write to the sink.public void insertInto(Table table, String sinkPath, String... sinkPathContinued)
TableEnvironment
Table
to a TableSink
that was registered under the specified name.
See the documentation of TableEnvironment.useDatabase(String)
or TableEnvironment.useCatalog(String)
for the rules on the path resolution.
insertInto
in interface TableEnvironment
table
- The Table to write to the sink.sinkPath
- The first part of the path of the registered TableSink
to which the
Table
is written. This is to ensure at least the name of the TableSink
is
provided.sinkPathContinued
- The remaining part of the path of the registered TableSink
to which the Table
is written.public ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor)
TableEnvironment
Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.
The following example shows how to read from a connector using a JSON format and register a temporary table as "MyTable":
tableEnv
.connect(
new ExternalSystemXYZ()
.version("0.11"))
.withFormat(
new Json()
.jsonSchema("{...}")
.failOnMissingField(false))
.withSchema(
new Schema()
.field("user-name", "VARCHAR").from("u_name")
.field("count", "DECIMAL")
.createTemporaryTable("MyTable");
connect
in interface TableEnvironment
connectorDescriptor
- connector descriptor describing the external systempublic String[] listCatalogs()
TableEnvironment
listCatalogs
in interface TableEnvironment
public String[] listModules()
TableEnvironment
listModules
in interface TableEnvironment
public String[] listDatabases()
TableEnvironment
listDatabases
in interface TableEnvironment
public String[] listTables()
TableEnvironment
listTables
in interface TableEnvironment
TableEnvironment.listTemporaryTables()
,
TableEnvironment.listTemporaryViews()
public String[] listViews()
TableEnvironment
listViews
in interface TableEnvironment
TableEnvironment.listTemporaryViews()
public String[] listTemporaryTables()
TableEnvironment
listTemporaryTables
in interface TableEnvironment
TableEnvironment.listTables()
public String[] listTemporaryViews()
TableEnvironment
listTemporaryViews
in interface TableEnvironment
TableEnvironment.listTables()
public boolean dropTemporaryTable(String path)
TableEnvironment
If a permanent table with a given path exists, it will be used from now on for any queries that reference this path.
dropTemporaryTable
in interface TableEnvironment
public boolean dropTemporaryView(String path)
TableEnvironment
If a permanent table or view with a given path exists, it will be used from now on for any queries that reference this path.
dropTemporaryView
in interface TableEnvironment
public String[] listUserDefinedFunctions()
TableEnvironment
listUserDefinedFunctions
in interface TableEnvironment
public String[] listFunctions()
TableEnvironment
listFunctions
in interface TableEnvironment
public String explain(Table table)
TableEnvironment
Table
.explain
in interface TableEnvironment
table
- The table for which the AST and execution plan will be returned.public String explain(Table table, boolean extended)
TableEnvironment
Table
.explain
in interface TableEnvironment
table
- The table for which the AST and execution plan will be returned.extended
- if the plan should contain additional properties, e.g. estimated cost, traitspublic String explain(boolean extended)
TableEnvironment
explain
in interface TableEnvironment
extended
- if the plan should contain additional properties, e.g. estimated cost, traitspublic String explainSql(String statement, ExplainDetail... extraDetails)
TableEnvironment
explainSql
in interface TableEnvironment
statement
- The statement for which the AST and execution plan will be returned.extraDetails
- The extra explain details which the explain result should include, e.g.
estimated cost, changelog mode for streamingpublic String explainInternal(List<Operation> operations, ExplainDetail... extraDetails)
TableEnvironmentInternal
explainInternal
in interface TableEnvironmentInternal
operations
- The operations to be explained.extraDetails
- The extra explain details which the explain result should include, e.g.
estimated cost, changelog mode for streamingpublic String[] getCompletionHints(String statement, int position)
TableEnvironment
getCompletionHints
in interface TableEnvironment
statement
- Partial or slightly incorrect SQL statementposition
- cursor positionpublic Table sqlQuery(String query)
TableEnvironment
Table
.
All tables referenced by the query must be registered in the TableEnvironment. A Table
is automatically registered when its Table#toString()
method is called, for
example when it is embedded into a String. Hence, SQL queries can directly reference a Table
as follows:
Table table = ...;
String tableName = table.toString();
// the table is not registered to the table environment
tEnv.sqlQuery("SELECT * FROM tableName");
sqlQuery
in interface TableEnvironment
query
- The SQL query to evaluate.public TableResult executeSql(String statement)
TableEnvironment
The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. For DML and DQL, this method returns TableResult once the job has been submitted. For DDL and DCL statements, TableResult is returned once the operation has finished.
executeSql
in interface TableEnvironment
public StatementSet createStatementSet()
TableEnvironment
StatementSet
instance which accepts DML statements or Tables, the planner
can optimize all added statements and Tables together and then submit as one job.createStatementSet
in interface TableEnvironment
public TableResult executeInternal(List<ModifyOperation> operations)
TableEnvironmentInternal
executeInternal
in interface TableEnvironmentInternal
operations
- The operations to be executed.public TableResult executeInternal(QueryOperation operation)
TableEnvironmentInternal
executeInternal
in interface TableEnvironmentInternal
operation
- The QueryOperation to be executed.public void sqlUpdate(String stmt)
TableEnvironment
All tables referenced by the query must be registered in the TableEnvironment. A Table
is automatically registered when its Table#toString()
method is called, for
example when it is embedded into a String. Hence, SQL queries can directly reference a Table
as follows:
// register the configured table sink into which the result is inserted.
tEnv.registerTableSinkInternal("sinkTable", configuredSink);
Table sourceTable = ...
String tableName = sourceTable.toString();
// sourceTable is not registered to the table environment
tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName");
A DDL statement can also be executed to create a table: For example, the below DDL statement would create a CSV table named `tbl1` into the current catalog:
create table tbl1( a int, b bigint, c varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )
SQL queries can directly execute as follows:
String sinkDDL = "create table sinkTable( a int, b varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )"; String sourceDDL ="create table sourceTable( a int, b varchar ) with ( 'connector.type' = 'kafka', 'update-mode' = 'append', 'connector.topic' = 'xxx', 'connector.properties.bootstrap.servers' = 'localhost:9092', ... )"; String query = "INSERT INTO sinkTable SELECT * FROM sourceTable"; tEnv.sqlUpdate(sourceDDL); tEnv.sqlUpdate(sinkDDL); tEnv.sqlUpdate(query); tEnv.execute("MyJob");
This code snippet creates a job to read data from Kafka source into a CSV sink.
sqlUpdate
in interface TableEnvironment
stmt
- The SQL statement to evaluate.public String getCurrentCatalog()
TableEnvironment
getCurrentCatalog
in interface TableEnvironment
TableEnvironment.useCatalog(String)
public void useCatalog(String catalogName)
TableEnvironment
TableEnvironment.useDatabase(String)
.
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to default_catalog
and default database
set to default_database
.
root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path | Resolved path |
---|---|
tab1 | default_catalog.default_database.tab1 |
db1.tab1 | default_catalog.db1.tab1 |
cat1.db1.tab1 | cat1.db1.tab1 |
useCatalog
in interface TableEnvironment
catalogName
- The name of the catalog to set as the current default catalog.TableEnvironment.useDatabase(String)
public String getCurrentDatabase()
TableEnvironment
getCurrentDatabase
in interface TableEnvironment
TableEnvironment.useDatabase(String)
public void useDatabase(String databaseName)
TableEnvironment
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to default_catalog
and default database
set to default_database
.
root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path | Resolved path |
---|---|
tab1 | default_catalog.default_database.tab1 |
db1.tab1 | default_catalog.db1.tab1 |
cat1.db1.tab1 | cat1.db1.tab1 |
useDatabase
in interface TableEnvironment
databaseName
- The name of the database to set as the current database.TableEnvironment.useCatalog(String)
public TableConfig getConfig()
TableEnvironment
getConfig
in interface TableEnvironment
public JobExecutionResult execute(String jobName) throws Exception
TableEnvironment
The program execution will be logged and displayed with the provided name
NOTE:It is highly advised to set all parameters in the TableConfig
on the
very beginning of the program. It is undefined what configurations values will be used for
the execution if queries are mixed with config changes. It depends on the characteristic of
the particular parameter. For some of them the value from the point in time of query
construction (e.g. the currentCatalog) will be used. On the other hand some values might be
evaluated according to the state from the time when this method is called (e.g. timeZone).
Once the execution finishes, any previously defined DMLs will be cleared, no matter
whether the execution succeeds or not. Therefore, if you want to retry in case of failures,
you have to re-define the DMLs, i.e. by calling TableEnvironment.sqlUpdate(String)
, before you call
this method again.
execute
in interface TableEnvironment
jobName
- Desired name of the jobException
- which occurs during job execution.public Parser getParser()
TableEnvironmentInternal
Parser
that provides methods for parsing a SQL string.getParser
in interface TableEnvironmentInternal
Parser
.public CatalogManager getCatalogManager()
TableEnvironmentInternal
CatalogManager
that deals with all catalog objects.getCatalogManager
in interface TableEnvironmentInternal
protected QueryOperation qualifyQueryOperation(ObjectIdentifier identifier, QueryOperation queryOperation)
fromDataStream(DataStream)
. But the identifier is required when converting this
QueryOperation to RelNode.protected void validateTableSource(TableSource<?> tableSource)
tableSource
- tableSource to validateprotected List<Transformation<?>> translateAndClearBuffer()
The buffer will be clear even if the `translate` fails. In most cases, the failure is not retryable (e.g. type mismatch, can't generate physical plan). If the buffer is not clear after failure, the following `translate` will also fail.
@VisibleForTesting protected ExplainDetail[] getExplainDetails(boolean extended)
public void registerTableSourceInternal(String name, TableSource<?> tableSource)
TableEnvironmentInternal
TableSource
in this TableEnvironment
's catalog.
Registered tables can be referenced in SQL queries.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
registerTableSourceInternal
in interface TableEnvironmentInternal
name
- The name under which the TableSource
is registered.tableSource
- The TableSource
to register.public void registerTableSinkInternal(String name, TableSink<?> tableSink)
TableEnvironmentInternal
TableSink
with already configured field names and field types
in this TableEnvironment
's catalog. Registered sink tables can be referenced in SQL
DML statements.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
registerTableSinkInternal
in interface TableEnvironmentInternal
name
- The name under which the TableSink
is registered.tableSink
- The configured TableSink
to register.protected TableImpl createTable(QueryOperation tableOperation)
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.