This document is an introduction of PyFlink
It includes detailed descriptions of every public interface of the
The recommended way to create a
TableEnvironment is to create from an
If you need to access the
StreamExecutionEnvironment object which the
TableEnvironment based on,
e.g. mixing with DataStream API, configuring via the APIs of
you can also create a
TableEnvironment from an
StreamExecutionEnvironment with a optional
Note: Almost all the configurations in
StreamExecutionEnvironment can be configured via
TableEnvironment.get_config() now, see Configuration for more details.
Only a few rarely used or deprecated configurations still require direct access to
StreamExecutionEnvironment for configuring, e.g. the input dependency constraint.
These APIs are used to create/remove Table API/SQL Tables and write queries:
|from_elements(elements, schema=None, verify_schema=True)||Creates a table from a collection of elements.||link|
|from_pandas(pdf, schema=None, split_num=1)||Creates a table from a pandas DataFrame.||link|
|from_path(path)||Creates a table from a registered table under the specified path, e.g. tables registered via create_temporary_view.||link|
|sql_query(query)||Evaluates a SQL query and retrieves the result as a `Table` object.||link|
|create_temporary_view(view_path, table)||Registers a `Table` object as a temporary view similar to SQL temporary views.||link|
|drop_temporary_view(view_path)||Drops a temporary view registered under the given path.||link|
|drop_temporary_table(table_path)||Drops a temporary table registered under the given path. You can use this interface to drop the temporary source table and temporary sink table.||link|
Executes the given single statement and returns the execution result.
The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE.
Note that for "INSERT INTO" statement this is an asynchronous operation, which is usually expected when submitting a job to a remote cluster. However, when executing a job in a mini cluster or IDE, you need to wait until the job execution finished, then you can refer to here for more details.
Please refer the SQL documentation for more details about SQL statement.
|from_table_source(table_source)||Creates a table from a table source.||link|
|scan(*table_path)||Scans a registered table from catalog and returns the resulting Table. It can be replaced by from_path.||link|
|register_table(name, table)||Registers a `Table` object under a unique name in the TableEnvironment's catalog. Registered tables can be referenced in SQL queries. It can be replaced by create_temporary_view.||link|
|register_table_source(name, table_source)||Registers an external `TableSource` in the TableEnvironment's catalog.||link|
|register_table_sink(name, table_sink)||Registers an external `TableSink` in the TableEnvironment's catalog.||link|
|insert_into(target_path, table)||Instructs to write the content of a `Table` object into a sink table. Note that this interface would not trigger the execution of jobs. You need to call the "execute" method to execute your job.||link|
|sql_update(stmt)||Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement. It can be replaced by execute_sql.||link|
|connect(connector_descriptor)||Creates a temporary table from a descriptor. Currently the recommended way is using execute_sql to register temporary tables.||link|
These APIs are used to explain/execute jobs. Note that the API
execute_sql can also be used to execute jobs.
|explain_sql(stmt, *extra_details)||Returns the AST and the execution plan of the specified statement.||link|
|create_statement_set()||Creates a StatementSet instance which accepts DML statements or Tables. It can be used to execute a multi-sink job.||link|
|explain(table=None, extended=False)||Returns the AST of the specified Table API and SQL queries and the execution plan to compute the result of the given `Table` object or multi-sinks plan. If you use the "insert_into" or "sql_update" method to emit data to multiple sinks, you can use this method to get the plan. It can be replaced by TableEnvironment.explain_sql, Table.explain or StatementSet.explain.||link|
|execute(job_name)||Triggers the program execution. The environment will execute all parts of the program. If you use the insert_into or sql_update method to emit data to sinks, you can use this method trigger the program execution. This method will block the client program until the job is finished/canceled/failed.||link|
These APIs are used to register UDFs or remove the registered UDFs.
Note that the API
execute_sql can also be used to register/remove UDFs.
For more details about the different kinds of UDFs, please refer to User Defined Functions.
|create_temporary_function(path, function)||Registers a Python user defined function class as a temporary catalog function.||link|
|create_temporary_system_function(name, function)||Registers a Python user defined function class as a temporary system function. If the name of a temporary system function is the same as a temporary catalog function, the temporary system function takes precedence.||link|
|create_java_function(path, function_class_name, ignore_if_exists=None)||Registers a Java user defined function class as a catalog function under the given path. If the catalog is persistent, the registered catalog function can be used across multiple Flink sessions and clusters.||link|
|create_java_temporary_function(path, function_class_name)||Registers a Java user defined function class as a temporary catalog function.||link|
|create_java_temporary_system_function(name, function_class_name)||Registers a Java user defined function class as a temporary system function.||link|
|drop_function(path)||Drops a catalog function registered under the given path.||link|
|drop_temporary_function(path)||Drops a temporary system function registered under the given name.||link|
|drop_temporary_system_function(name)||Drops a temporary system function registered under the given name.||link|
|register_function(name, function)||Registers a Python user-defined function under a unique name. Replaces already existing user-defined function under this name. It can be replaced by create_temporary_system_function.||link|
|register_java_function(name, function_class_name)||Registers a Java user defined function under a unique name. Replaces already existing user-defined functions under this name. It can be replaced by create_java_temporary_system_function.||link|
These APIs are used to manage the Python dependencies which are required by the Python UDFs. Please refer to the Dependency Management documentation for more details.
|add_python_file(file_path)||Adds a Python dependency which could be Python files, Python packages or local directories. They will be added to the PYTHONPATH of the Python UDF worker.||link|
|set_python_requirements(requirements_file_path, requirements_cache_dir=None)||Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the Python UDF worker.||link|
|add_python_archive(archive_path, target_dir=None)||Adds a Python archive file. The file will be extracted to the working directory of Python UDF worker.||link|
Returns the table config to define the runtime behavior of the Table API.
You can find all the available configuration options in Configuration and
The following code is an example showing how to set the configuration options through this API:
|register_catalog(catalog_name, catalog)||Registers a `Catalog` under a unique name.||link|
|get_catalog(catalog_name)||Gets a registered `Catalog` by name.||link|
|use_catalog(catalog_name)||Sets the current catalog to the given value. It also sets the default database to the catalog's default one.||link|
|get_current_catalog()||Gets the current default catalog name of the current session.||link|
|get_current_database()||Gets the current default database name of the running session.||link|
|use_database(database_name)||Sets the current default database. It has to exist in the current catalog. That path will be used as the default one when looking for unqualified object names.||link|
|load_module(module_name, module)||Loads a `Module` under a unique name. Modules will be kept in the loaded order.||link|
|unload_module(module_name)||Unloads a `Module` with given name.||link|
|list_catalogs()||Gets the names of all catalogs registered in this environment.||link|
|list_modules()||Gets the names of all modules registered in this environment.||link|
|list_databases()||Gets the names of all databases in the current catalog.||link|
|list_tables()||Gets the names of all tables and views in the current database of the current catalog. It returns both temporary and permanent tables and views.||link|
|list_views()||Gets the names of all views in the current database of the current catalog. It returns both temporary and permanent views.||link|
|list_user_defined_functions()||Gets the names of all user defined functions registered in this environment.||link|
|list_functions()||Gets the names of all functions in this environment.||link|
|list_temporary_tables()||Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog).||link|
|list_temporary_views()||Gets the names of all temporary views available in the current namespace (the current database of the current catalog).||link|
Before Flink 1.10 you can configure the statebackend, checkpointing and restart strategy via the
And now you can configure them by setting key-value options in
TableConfig, see Fault Tolerance, State Backends and Checkpointing for more details.
The following code is an example showing how to configure the statebackend, checkpoint and restart strategy through the Table API: