TableEnvironment

TableEnvironment #

This document is an introduction of PyFlink TableEnvironment. It includes detailed descriptions of every public interface of the TableEnvironment class.

Create a TableEnvironment #

The recommended way to create a TableEnvironment is to create from an EnvironmentSettings object:

from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment

# create a streaming TableEnvironment
config = Configuration()
config.set_string('execution.buffer-timeout', '1 min')
env_settings = EnvironmentSettings \
    .new_instance() \
    .in_streaming_mode() \
    .with_configuration(config) \
    .build()

table_env = TableEnvironment.create(env_settings)

Alternatively, users can create a StreamTableEnvironment from an existing StreamExecutionEnvironment to interoperate with the DataStream API.

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# create a streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

TableEnvironment API #

Table/SQL Operations #

These APIs are used to create/remove Table API/SQL Tables and write queries:

APIs Description Docs
from_elements(elements, schema=None, verify_schema=True) Creates a table from a collection of elements. link
from_pandas(pdf, schema=None, split_num=1) Creates a table from a pandas DataFrame. link
from_path(path) Creates a table from a registered table under the specified path, e.g. tables registered via create_temporary_view. link
create_temporary_view(view_path, table) Registers a `Table` object as a temporary view similar to SQL temporary views. link
drop_temporary_view(view_path) Drops a temporary view registered under the given path. link
drop_temporary_table(table_path) Drops a temporary table registered under the given path. You can use this interface to drop the temporary source table and temporary sink table. link
execute_sql(stmt) Executes the given single statement and returns the execution result. The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE.

Note that for "INSERT INTO" statement this is an asynchronous operation, which is usually expected when submitting a job to a remote cluster. However, when executing a job in a mini cluster or IDE, you need to wait until the job execution finished, then you can refer to here for more details.
Please refer the SQL documentation for more details about SQL statement.
link
sql_query(query) Evaluates a SQL query and retrieves the result as a `Table` object. link

Deprecated APIs

APIs Description Docs
from_table_source(table_source) Creates a table from a table source. link
scan(*table_path) Scans a registered table from catalog and returns the resulting Table. It can be replaced by from_path. link
register_table(name, table) Registers a `Table` object under a unique name in the TableEnvironment's catalog. Registered tables can be referenced in SQL queries. It can be replaced by create_temporary_view. link
insert_into(target_path, table) Instructs to write the content of a `Table` object into a sink table. Note that this interface would not trigger the execution of jobs. You need to call the "execute" method to execute your job. link
sql_update(stmt) Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement. It can be replaced by execute_sql. link

Execute/Explain Jobs #

These APIs are used to explain/execute jobs. Note that the API execute_sql can also be used to execute jobs.

APIs Description Docs
explain_sql(stmt, *extra_details) Returns the AST and the execution plan of the specified statement. link
create_statement_set() Creates a StatementSet instance which accepts DML statements or Tables. It can be used to execute a multi-sink job. link

Deprecated APIs

APIs Description Docs
explain(table=None, extended=False) Returns the AST of the specified Table API and SQL queries and the execution plan to compute the result of the given `Table` object or multi-sinks plan. If you use the insert_into or sql_update method to emit data to multiple sinks, you can use this method to get the plan. It can be replaced by TableEnvironment.explain_sql, Table.explain or StatementSet.explain. link
execute(job_name) Triggers the program execution. The environment will execute all parts of the program. If you use the insert_into or sql_update method to emit data to sinks, you can use this method trigger the program execution. This method will block the client program until the job is finished/canceled/failed. link

Create/Drop User Defined Functions #

These APIs are used to register UDFs or remove the registered UDFs. Note that the API execute_sql can also be used to register/remove UDFs. For more details about the different kinds of UDFs, please refer to User Defined Functions.

APIs Description Docs
create_temporary_function(path, function) Registers a Python user defined function class as a temporary catalog function. link
create_temporary_system_function(name, function) Registers a Python user defined function class as a temporary system function. If the name of a temporary system function is the same as a temporary catalog function, the temporary system function takes precedence. link
create_java_function(path, function_class_name, ignore_if_exists=None) Registers a Java user defined function class as a catalog function under the given path. If the catalog is persistent, the registered catalog function can be used across multiple Flink sessions and clusters. link
create_java_temporary_function(path, function_class_name) Registers a Java user defined function class as a temporary catalog function. link
create_java_temporary_system_function(name, function_class_name) Registers a Java user defined function class as a temporary system function. link
drop_function(path) Drops a catalog function registered under the given path. link
drop_temporary_function(path) Drops a temporary system function registered under the given name. link
drop_temporary_system_function(name) Drops a temporary system function registered under the given name. link

Dependency Management #

These APIs are used to manage the Python dependencies which are required by the Python UDFs. Please refer to the Dependency Management documentation for more details.

APIs Description Docs
add_python_file(file_path) Adds a Python dependency which could be Python files, Python packages or local directories. They will be added to the PYTHONPATH of the Python UDF worker. link
set_python_requirements(requirements_file_path, requirements_cache_dir=None) Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the Python UDF worker. link
add_python_archive(archive_path, target_dir=None) Adds a Python archive file. The file will be extracted to the working directory of Python UDF worker. link

Configuration #

APIs Description Docs
get_config() Returns the table config to define the runtime behavior of the Table API. You can find all the available configuration options in Configuration and Python Configuration.

The following code is an example showing how to set the configuration options through this API:
# set the parallelism to 8
table_env.get_config().set("parallelism.default", "8")
# set the job name
table_env.get_config().set("pipeline.name", "my_first_job")
link

Catalog APIs #

These APIs are used to access catalogs and modules. You can find more detailed introduction in Modules and Catalogs documentation.

APIs Description Docs
register_catalog(catalog_name, catalog) Registers a `Catalog` under a unique name. link
get_catalog(catalog_name) Gets a registered `Catalog` by name. link
use_catalog(catalog_name) Sets the current catalog to the given value. It also sets the default database to the catalog's default one. link
get_current_catalog() Gets the current default catalog name of the current session. link
get_current_database() Gets the current default database name of the running session. link
use_database(database_name) Sets the current default database. It has to exist in the current catalog. That path will be used as the default one when looking for unqualified object names. link
load_module(module_name, module) Loads a `Module` under a unique name. Modules will be kept in the loaded order. link
unload_module(module_name) Unloads a `Module` with given name. link
use_modules(*module_names) Enables and changes the resolution order of loaded modules. link
list_catalogs() Gets the names of all catalogs registered in this environment. link
list_modules() Gets the names of all enabled modules registered in this environment. link
list_full_modules() Gets the names of all loaded modules (including disabled modules) registered in this environment. link
list_databases() Gets the names of all databases in the current catalog. link
list_tables() Gets the names of all tables and views in the current database of the current catalog. It returns both temporary and permanent tables and views. link
list_views() Gets the names of all views in the current database of the current catalog. It returns both temporary and permanent views. link
list_user_defined_functions() Gets the names of all user defined functions registered in this environment. link
list_functions() Gets the names of all functions in this environment. link
list_temporary_tables() Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog). link
list_temporary_views() Gets the names of all temporary views available in the current namespace (the current database of the current catalog). link

Statebackend, Checkpoint and Restart Strategy #

Before Flink 1.10 you can configure the statebackend, checkpointing and restart strategy via the StreamExecutionEnvironment. And now you can configure them by setting key-value options in TableConfig, see Fault Tolerance, State Backends and Checkpointing for more details.

The following code is an example showing how to configure the statebackend, checkpoint and restart strategy through the Table API:

# set the restart strategy to "fixed-delay"
table_env.get_config().set("restart-strategy.type", "fixed-delay")
table_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")

# set the checkpoint mode to EXACTLY_ONCE
table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().set("execution.checkpointing.interval", "3min")

# set the statebackend type to "rocksdb", other available options are "hashmap"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
table_env.get_config().set("state.backend.type", "rocksdb")

# set the checkpoint directory, which is required by the RocksDB statebackend
table_env.get_config().set("execution.checkpointing.dir", "file:///tmp/checkpoints/")