This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

TableEnvironment

This document is an introduction of PyFlink TableEnvironment. It includes detailed descriptions of every public interface of the TableEnvironment class.

Create a TableEnvironment

The recommended way to create a TableEnvironment is to create from an EnvironmentSettings object:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment

# create a blink streaming TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

# create a flink streaming TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

# create a flink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_old_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

If you need to access the ExecutionEnvironment/StreamExecutionEnvironment object which the TableEnvironment based on, e.g. mixing with DataStream API, configuring via the APIs of ExecutionEnvironment/StreamExecutionEnvironment, you can also create a TableEnvironment from an ExecutionEnvironment/StreamExecutionEnvironment with a optional TableConfig object:

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig

# create a blink streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

# create a blink streaming TableEnvironment from a StreamExecutionEnvironment with a TableConfig
env = StreamExecutionEnvironment.get_execution_environment()
table_config = TableConfig()  # you can add configuration options in it
table_env = StreamTableEnvironment.create(env, table_config)

# create a flink batch TableEnvironment from an ExecutionEnvironment
env = ExecutionEnvironment.get_execution_environment()
table_env = BatchTableEnvironment.create(env)

# create a flink batch TableEnvironment from an ExecutionEnvironment with a TableConfig
env = ExecutionEnvironment.get_execution_environment()
table_config = TableConfig()  # you can add configuration options in it
table_env = BatchTableEnvironment.create(env, table_config)

# create a flink streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build()
table_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

Note: Almost all the configurations in ExecutionEnvironment/StreamExecutionEnvironment can be configured via TableEnvironment.get_config() now, see Configuration for more details. Only a few rarely used or deprecated configurations still require direct access to ExecutionEnvironment /StreamExecutionEnvironment for configuring, e.g. the input dependency constraint.

TableEnvironment API

Table/SQL Operations

These APIs are used to create/remove Table API/SQL Tables and write queries:

APIs Description Docs
from_elements(elements, schema=None, verify_schema=True) Creates a table from a collection of elements. link
from_pandas(pdf, schema=None, split_num=1) Creates a table from a pandas DataFrame. link
from_path(path) Creates a table from a registered table under the specified path, e.g. tables registered via create_temporary_view. link
sql_query(query) Evaluates a SQL query and retrieves the result as a `Table` object. link
create_temporary_view(view_path, table) Registers a `Table` object as a temporary view similar to SQL temporary views. link
drop_temporary_view(view_path) Drops a temporary view registered under the given path. link
drop_temporary_table(table_path) Drops a temporary table registered under the given path. You can use this interface to drop the temporary source table and temporary sink table. link
execute_sql(stmt) Executes the given single statement and returns the execution result. The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE.

Note that for "INSERT INTO" statement this is an asynchronous operation, which is usually expected when submitting a job to a remote cluster. However, when executing a job in a mini cluster or IDE, you need to wait until the job execution finished, then you can refer to here for more details.
Please refer the SQL documentation for more details about SQL statement.
link

Deprecated APIs

APIs Description Docs
from_table_source(table_source) Creates a table from a table source. link
scan(*table_path) Scans a registered table from catalog and returns the resulting Table. It can be replaced by from_path. link
register_table(name, table) Registers a `Table` object under a unique name in the TableEnvironment's catalog. Registered tables can be referenced in SQL queries. It can be replaced by create_temporary_view. link
register_table_source(name, table_source) Registers an external `TableSource` in the TableEnvironment's catalog. link
register_table_sink(name, table_sink) Registers an external `TableSink` in the TableEnvironment's catalog. link
insert_into(target_path, table) Instructs to write the content of a `Table` object into a sink table. Note that this interface would not trigger the execution of jobs. You need to call the "execute" method to execute your job. link
sql_update(stmt) Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement. It can be replaced by execute_sql. link
connect(connector_descriptor) Creates a temporary table from a descriptor. Currently the recommended way is using execute_sql to register temporary tables. link

Execute/Explain Jobs

These APIs are used to explain/execute jobs. Note that the API execute_sql can also be used to execute jobs.

APIs Description Docs
explain_sql(stmt, *extra_details) Returns the AST and the execution plan of the specified statement. link
create_statement_set() Creates a StatementSet instance which accepts DML statements or Tables. It can be used to execute a multi-sink job. link

Deprecated APIs

APIs Description Docs
explain(table=None, extended=False) Returns the AST of the specified Table API and SQL queries and the execution plan to compute the result of the given `Table` object or multi-sinks plan. If you use the insert_into or sql_update method to emit data to multiple sinks, you can use this method to get the plan. It can be replaced by TableEnvironment.explain_sql, Table.explain or StatementSet.explain. link
execute(job_name) Triggers the program execution. The environment will execute all parts of the program. If you use the insert_into or sql_update method to emit data to sinks, you can use this method trigger the program execution. This method will block the client program until the job is finished/canceled/failed. link

Create/Drop User Defined Functions

These APIs are used to register UDFs or remove the registered UDFs. Note that the API execute_sql can also be used to register/remove UDFs. For more details about the different kinds of UDFs, please refer to User Defined Functions.

APIs Description Docs
create_temporary_function(path, function) Registers a Python user defined function class as a temporary catalog function. link
create_temporary_system_function(name, function) Registers a Python user defined function class as a temporary system function. If the name of a temporary system function is the same as a temporary catalog function, the temporary system function takes precedence. link
create_java_function(path, function_class_name, ignore_if_exists=None) Registers a Java user defined function class as a catalog function under the given path. If the catalog is persistent, the registered catalog function can be used across multiple Flink sessions and clusters. link
create_java_temporary_function(path, function_class_name) Registers a Java user defined function class as a temporary catalog function. link
create_java_temporary_system_function(name, function_class_name) Registers a Java user defined function class as a temporary system function. link
drop_function(path) Drops a catalog function registered under the given path. link
drop_temporary_function(path) Drops a temporary system function registered under the given name. link
drop_temporary_system_function(name) Drops a temporary system function registered under the given name. link

Deprecated APIs

APIs Description Docs
register_function(name, function) Registers a Python user-defined function under a unique name. Replaces already existing user-defined function under this name. It can be replaced by create_temporary_system_function. link
register_java_function(name, function_class_name) Registers a Java user defined function under a unique name. Replaces already existing user-defined functions under this name. It can be replaced by create_java_temporary_system_function. link

Dependency Management

These APIs are used to manage the Python dependencies which are required by the Python UDFs. Please refer to the Dependency Management documentation for more details.

APIs Description Docs
add_python_file(file_path) Adds a Python dependency which could be Python files, Python packages or local directories. They will be added to the PYTHONPATH of the Python UDF worker. link
set_python_requirements(requirements_file_path, requirements_cache_dir=None) Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the Python UDF worker. link
add_python_archive(archive_path, target_dir=None) Adds a Python archive file. The file will be extracted to the working directory of Python UDF worker. link

Configuration

APIs Description Docs
get_config() Returns the table config to define the runtime behavior of the Table API. You can find all the available configuration options in Configuration and Python Configuation.

The following code is an example showing how to set the configuration options through this API:
# set the parallelism to 8
table_env.get_config().get_configuration().set_string(
    "parallelism.default", "8")
link

Catalog APIs

These APIs are used to access catalogs and modules. You can find more detailed introduction in Modules and Catalogs documentation.

APIs Description Docs
register_catalog(catalog_name, catalog) Registers a `Catalog` under a unique name. link
get_catalog(catalog_name) Gets a registered `Catalog` by name. link
use_catalog(catalog_name) Sets the current catalog to the given value. It also sets the default database to the catalog's default one. link
get_current_catalog() Gets the current default catalog name of the current session. link
get_current_database() Gets the current default database name of the running session. link
use_database(database_name) Sets the current default database. It has to exist in the current catalog. That path will be used as the default one when looking for unqualified object names. link
load_module(module_name, module) Loads a `Module` under a unique name. Modules will be kept in the loaded order. link
unload_module(module_name) Unloads a `Module` with given name. link
list_catalogs() Gets the names of all catalogs registered in this environment. link
list_modules() Gets the names of all modules registered in this environment. link
list_databases() Gets the names of all databases in the current catalog. link
list_tables() Gets the names of all tables and views in the current database of the current catalog. It returns both temporary and permanent tables and views. link
list_views() Gets the names of all views in the current database of the current catalog. It returns both temporary and permanent views. link
list_user_defined_functions() Gets the names of all user defined functions registered in this environment. link
list_functions() Gets the names of all functions in this environment. link
list_temporary_tables() Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog). link
list_temporary_views() Gets the names of all temporary views available in the current namespace (the current database of the current catalog). link

Statebackend, Checkpoint and Restart Strategy

Before Flink 1.10 you can configure the statebackend, checkpointing and restart strategy via the StreamExecutionEnvironment. And now you can configure them by setting key-value options in TableConfig, see Fault Tolerance, State Backends and Checkpointing for more details.

The following code is an example showing how to configure the statebackend, checkpoint and restart strategy through the Table API:

# set the restart strategy to "fixed-delay"
table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")

# set the checkpoint mode to EXACTLY_ONCE
table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")

# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")

# set the checkpoint directory, which is required by the RocksDB statebackend
table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")