TableEnvironment#

A table environment is the base class, entry point, and central context for creating Table and SQL API programs.

EnvironmentSettings#

Defines all parameters that initialize a table environment. Those parameters are used only during instantiation of a TableEnvironment and cannot be changed afterwards.

Example:

>>> EnvironmentSettings.new_instance() \
...     .in_streaming_mode() \
...     .with_built_in_catalog_name("my_catalog") \
...     .with_built_in_database_name("my_database") \
...     .build()

in_streaming_mode() or in_batch_mode() might be convenient as shortcuts.

EnvironmentSettings.new_instance()

Creates a builder for creating an instance of EnvironmentSettings.

EnvironmentSettings.from_configuration(config)

Creates the EnvironmentSetting with specified Configuration.

EnvironmentSettings.in_streaming_mode()

Creates a default instance of EnvironmentSettings in streaming execution mode.

EnvironmentSettings.in_batch_mode()

Creates a default instance of EnvironmentSettings in batch execution mode.

EnvironmentSettings.get_built_in_catalog_name()

Gets the specified name of the initial catalog to be created when instantiating a TableEnvironment.

EnvironmentSettings.get_built_in_database_name()

Gets the specified name of the default database in the initial catalog to be created when instantiating a TableEnvironment.

EnvironmentSettings.is_streaming_mode()

Tells if the TableEnvironment should work in a batch or streaming mode.

EnvironmentSettings.to_configuration()

Convert to pyflink.common.Configuration.

EnvironmentSettings.get_configuration()

Get the underlying pyflink.common.Configuration.

EnvironmentSettings.Builder.with_configuration(config)

Creates the EnvironmentSetting with specified Configuration.

EnvironmentSettings.Builder.in_batch_mode()

Sets that the components should work in a batch mode.

EnvironmentSettings.Builder.in_streaming_mode()

Sets that the components should work in a streaming mode.

EnvironmentSettings.Builder.with_built_in_catalog_name(...)

Specifies the name of the initial catalog to be created when instantiating a TableEnvironment.

EnvironmentSettings.Builder.with_built_in_database_name(...)

Specifies the name of the default database in the initial catalog to be created when instantiating a TableEnvironment.

EnvironmentSettings.Builder.build()

Returns an immutable instance of EnvironmentSettings.

TableConfig#

Configuration for the current TableEnvironment session to adjust Table & SQL API programs.

This class is a pure API class that abstracts configuration from various sources. Currently, configuration can be set in any of the following layers (in the given order):

  • flink-conf.yaml

  • CLI parameters

  • StreamExecutionEnvironment when bridging to DataStream API

  • with_configuration()

  • set()

The latter two represent the application-specific part of the configuration. They initialize and directly modify get_configuration(). Other layers represent the configuration of the execution context and are immutable.

The getter get() gives read-only access to the full configuration. However, application-specific configuration has precedence. Configuration of outer layers is used for defaults and fallbacks. The setter set() will only affect application-specific configuration.

For common or important configuration options, this class provides getters and setters methods with detailed inline documentation.

For more advanced configuration, users can directly access the underlying key-value map via get_configuration().

Example:

>>> table_config = t_env.get_config()
>>> config = Configuration()
>>> config.set_string("parallelism.default", "128") \
...       .set_string("pipeline.auto-watermark-interval", "800ms") \
...       .set_string("execution.checkpointing.interval", "30s")
>>> table_config.add_configuration(config)

Note

Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment.

TableConfig([j_table_config])

Configuration for the current TableEnvironment session to adjust Table & SQL API programs.

TableEnvironment#

A table environment is the base class, entry point, and central context for creating Table and SQL API programs.

It is unified for bounded and unbounded data processing.

A table environment is responsible for:

  • Connecting to external systems.

  • Registering and retrieving Table and other meta objects from a catalog.

  • Executing SQL statements.

  • Offering further configuration options.

The path in methods such as create_temporary_view() should be a proper SQL identifier. The syntax is following [[catalog-name.]database-name.]object-name, where the catalog name and database are optional. For path resolution see use_catalog() and use_database(). All keywords or other special characters need to be escaped.

Example: cat.1.`db`.`Table` resolves to an object named ‘Table’ (table is a reserved keyword, thus must be escaped) in a catalog named ‘cat.1’ and database named ‘db’.

Note

This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.

TableEnvironment.add_python_archive(archive_path)

Adds a python archive file.

TableEnvironment.add_python_file(file_path)

Adds a python dependency which could be python files, python packages or local directories.

TableEnvironment.create(environment_settings)

Creates a table environment that is the entry point and central context for creating Table and SQL API programs.

TableEnvironment.create_java_function(path, ...)

Registers a java user defined function class as a catalog function in the given path.

TableEnvironment.create_java_temporary_function(...)

Registers a java user defined function class as a temporary catalog function.

TableEnvironment.create_java_temporary_system_function(...)

Registers a java user defined function class as a temporary system function.

TableEnvironment.create_statement_set()

Create a StatementSet instance which accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.

TableEnvironment.create_table(path, descriptor)

Registers the given TableDescriptor as a catalog table.

TableEnvironment.create_temporary_function(...)

Registers a python user defined function class as a temporary catalog function.

TableEnvironment.create_temporary_system_function(...)

Registers a python user defined function class as a temporary system function.

TableEnvironment.create_temporary_table(...)

Registers the given TableDescriptor as a temporary catalog table.

TableEnvironment.create_temporary_view(...)

  1. When table_or_data_stream is a Table:

TableEnvironment.drop_function(path)

Drops a catalog function registered in the given path.

TableEnvironment.drop_temporary_function(path)

Drops a temporary system function registered under the given name.

TableEnvironment.drop_temporary_system_function(name)

Drops a temporary system function registered under the given name.

TableEnvironment.drop_temporary_table(table_path)

Drops a temporary table registered in the given path.

TableEnvironment.drop_temporary_view(view_path)

Drops a temporary view registered in the given path.

TableEnvironment.execute_sql(stmt)

Execute the given single statement, and return the execution result.

TableEnvironment.explain_sql(stmt, ...)

Returns the AST of the specified statement and the execution plan.

TableEnvironment.from_descriptor(descriptor)

Returns a Table backed by the given TableDescriptor.

TableEnvironment.from_elements(elements[, ...])

Creates a table from a collection of elements.

TableEnvironment.from_pandas(pdf[, schema, ...])

Creates a table from a pandas DataFrame.

TableEnvironment.from_path(path)

Reads a registered table and returns the resulting Table.

TableEnvironment.from_table_source(table_source)

Creates a table from a table source.

TableEnvironment.get_catalog(catalog_name)

Gets a registered Catalog by name.

TableEnvironment.get_config()

Returns the table config to define the runtime behavior of the Table API.

TableEnvironment.get_current_catalog()

Gets the current default catalog name of the current session.

TableEnvironment.get_current_database()

Gets the current default database name of the running session.

TableEnvironment.list_catalogs()

Gets the names of all catalogs registered in this environment.

TableEnvironment.list_databases()

Gets the names of all databases in the current catalog.

TableEnvironment.list_full_modules()

Gets the names and statuses of all modules loaded in this environment.

TableEnvironment.list_functions()

Gets the names of all functions in this environment.

TableEnvironment.list_modules()

Gets the names of all modules used in this environment.

TableEnvironment.list_tables()

Gets the names of all tables and views in the current database of the current catalog.

TableEnvironment.list_temporary_tables()

Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog).

TableEnvironment.list_temporary_views()

Gets the names of all temporary views available in the current namespace (the current database of the current catalog).

TableEnvironment.list_user_defined_functions()

Gets the names of all user defined functions registered in this environment.

TableEnvironment.list_views()

Gets the names of all views in the current database of the current catalog.

TableEnvironment.load_module(module_name, module)

Loads a Module under a unique name.

TableEnvironment.register_catalog(...)

Registers a Catalog under a unique name.

TableEnvironment.register_function(name, ...)

Registers a python user-defined function under a unique name.

TableEnvironment.register_java_function(...)

Registers a java user defined function under a unique name.

TableEnvironment.register_table(name, table)

Registers a Table under a unique name in the TableEnvironment's catalog.

TableEnvironment.register_table_sink(name, ...)

Registers an external TableSink with given field names and types in this TableEnvironment's catalog.

TableEnvironment.register_table_source(name, ...)

Registers an external TableSource in this TableEnvironment's catalog.

TableEnvironment.scan(*table_path)

Scans a registered table and returns the resulting Table.

TableEnvironment.set_python_requirements(...)

Specifies a requirements.txt file which defines the third-party dependencies.

TableEnvironment.sql_query(query)

Evaluates a SQL query on registered tables and retrieves the result as a Table.

TableEnvironment.unload_module(module_name)

Unloads a Module with given name.

TableEnvironment.use_catalog(catalog_name)

Sets the current catalog to the given value.

TableEnvironment.use_database(database_name)

Sets the current default database.

TableEnvironment.use_modules(*module_names)

Use an array of Module with given names.

StreamTableEnvironment#

StreamTableEnvironment.add_python_archive(...)

Adds a python archive file.

StreamTableEnvironment.add_python_file(file_path)

Adds a python dependency which could be python files, python packages or local directories.

StreamTableEnvironment.create([...])

Creates a StreamTableEnvironment.

StreamTableEnvironment.create_java_function(...)

Registers a java user defined function class as a catalog function in the given path.

StreamTableEnvironment.create_java_temporary_function(...)

Registers a java user defined function class as a temporary catalog function.

StreamTableEnvironment.create_java_temporary_system_function(...)

Registers a java user defined function class as a temporary system function.

StreamTableEnvironment.create_statement_set()

Create a StatementSet instance which accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.

StreamTableEnvironment.create_table(path, ...)

Registers the given TableDescriptor as a catalog table.

StreamTableEnvironment.create_temporary_function(...)

Registers a python user defined function class as a temporary catalog function.

StreamTableEnvironment.create_temporary_system_function(...)

Registers a python user defined function class as a temporary system function.

StreamTableEnvironment.create_temporary_table(...)

Registers the given TableDescriptor as a temporary catalog table.

StreamTableEnvironment.create_temporary_view(...)

  1. When table_or_data_stream is a Table:

StreamTableEnvironment.drop_function(path)

Drops a catalog function registered in the given path.

StreamTableEnvironment.drop_temporary_function(path)

Drops a temporary system function registered under the given name.

StreamTableEnvironment.drop_temporary_system_function(name)

Drops a temporary system function registered under the given name.

StreamTableEnvironment.drop_temporary_table(...)

Drops a temporary table registered in the given path.

StreamTableEnvironment.drop_temporary_view(...)

Drops a temporary view registered in the given path.

StreamTableEnvironment.execute_sql(stmt)

Execute the given single statement, and return the execution result.

StreamTableEnvironment.explain_sql(stmt, ...)

Returns the AST of the specified statement and the execution plan.

StreamTableEnvironment.from_descriptor(...)

Returns a Table backed by the given TableDescriptor.

StreamTableEnvironment.from_elements(elements)

Creates a table from a collection of elements.

StreamTableEnvironment.from_pandas(pdf[, ...])

Creates a table from a pandas DataFrame.

StreamTableEnvironment.from_path(path)

Reads a registered table and returns the resulting Table.

StreamTableEnvironment.from_table_source(...)

Creates a table from a table source.

StreamTableEnvironment.from_data_stream(...)

  1. When fields_or_schema is a str or a sequence of Expression:

StreamTableEnvironment.from_changelog_stream(...)

Converts the given DataStream of changelog entries into a Table.

StreamTableEnvironment.get_catalog(catalog_name)

Gets a registered Catalog by name.

StreamTableEnvironment.get_config()

Returns the table config to define the runtime behavior of the Table API.

StreamTableEnvironment.get_current_catalog()

Gets the current default catalog name of the current session.

StreamTableEnvironment.get_current_database()

Gets the current default database name of the running session.

StreamTableEnvironment.list_catalogs()

Gets the names of all catalogs registered in this environment.

StreamTableEnvironment.list_databases()

Gets the names of all databases in the current catalog.

StreamTableEnvironment.list_full_modules()

Gets the names and statuses of all modules loaded in this environment.

StreamTableEnvironment.list_functions()

Gets the names of all functions in this environment.

StreamTableEnvironment.list_modules()

Gets the names of all modules used in this environment.

StreamTableEnvironment.list_tables()

Gets the names of all tables and views in the current database of the current catalog.

StreamTableEnvironment.list_temporary_tables()

Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog).

StreamTableEnvironment.list_temporary_views()

Gets the names of all temporary views available in the current namespace (the current database of the current catalog).

StreamTableEnvironment.list_user_defined_functions()

Gets the names of all user defined functions registered in this environment.

StreamTableEnvironment.list_views()

Gets the names of all views in the current database of the current catalog.

StreamTableEnvironment.load_module(...)

Loads a Module under a unique name.

StreamTableEnvironment.register_catalog(...)

Registers a Catalog under a unique name.

StreamTableEnvironment.register_function(...)

Registers a python user-defined function under a unique name.

StreamTableEnvironment.register_java_function(...)

Registers a java user defined function under a unique name.

StreamTableEnvironment.register_table(name, ...)

Registers a Table under a unique name in the TableEnvironment's catalog.

StreamTableEnvironment.register_table_sink(...)

Registers an external TableSink with given field names and types in this TableEnvironment's catalog.

StreamTableEnvironment.register_table_source(...)

Registers an external TableSource in this TableEnvironment's catalog.

StreamTableEnvironment.scan(*table_path)

Scans a registered table and returns the resulting Table.

StreamTableEnvironment.set_python_requirements(...)

Specifies a requirements.txt file which defines the third-party dependencies.

StreamTableEnvironment.sql_query(query)

Evaluates a SQL query on registered tables and retrieves the result as a Table.

StreamTableEnvironment.to_data_stream(table)

Converts the given Table into a DataStream.

StreamTableEnvironment.to_changelog_stream(table)

Converts the given Table into a DataStream of changelog entries.

StreamTableEnvironment.to_append_stream(...)

Converts the given Table into a DataStream of a specified type.

StreamTableEnvironment.to_retract_stream(...)

Converts the given Table into a DataStream of add and retract messages.

StreamTableEnvironment.unload_module(module_name)

Unloads a Module with given name.

StreamTableEnvironment.use_catalog(catalog_name)

Sets the current catalog to the given value.

StreamTableEnvironment.use_database(...)

Sets the current default database.

StreamTableEnvironment.use_modules(*module_names)

Use an array of Module with given names.