pyflink.table package¶
Module contents¶
Entry point classes of Flink Table API:
TableEnvironment
andStreamTableEnvironment
Main entry point for Flink Table API & SQL functionality.TableEnvironment
is used in pure Table API & SQL jobs. Meanwhile,StreamTableEnvironment
needs to be used when mixing use of Table API and DataStream API.
Table
The core component of the Table API. Use the methods ofTable
to transform data.
StatementSet
The core component of the Table API. It’s used to create jobs with multiple sinks.
EnvironmentSettings
Defines all the parameters used to initialize aTableEnvironment
.
TableConfig
A config to define the runtime behavior of the Table API. It is used together withpyflink.datastream.StreamExecutionEnvironment
to createStreamTableEnvironment
.
Classes to define user-defined functions:
ScalarFunction
Base interface for user-defined scalar function.
TableFunction
Base interface for user-defined table function.
AggregateFunction
Base interface for user-defined aggregate function.
TableAggregateFunction
Base interface for user-defined table aggregate function.
FunctionContext
Used to obtain global runtime information about the context in which the user-defined function is executed, such as the metric group, and global job parameters, etc.
Classes to define window:
window.GroupWindow
Group windows group rows based on time or row-count intervals. Seewindow.Tumble
,window.Session
andwindow.Slide
for more details on how to create a tumble window, session window, hop window separately.
window.OverWindow
Over window aggregates compute an aggregate for each input row over a range of its neighboring rows. Seewindow.Over
for more details on how to create an over window.
Classes for catalog:
catalog.Catalog
Responsible for reading and writing metadata such as database/table/views/UDFs from and to a catalog.
catalog.HiveCatalog
Responsible for reading and writing metadata stored in Hive.
Classes to define source & sink:
TableDescriptor
TableDescriptor is a template for creating a CatalogTable instance. It closely resembles the “CREATE TABLE” SQL DDL statement, containing schema, connector options, and other characteristics. Since tables in Flink are typically backed by external systems, the descriptor describes how a connector (and possibly its format) are configured.
FormatDescriptor
Describes a format and its options for use withTableDescriptor
.
Schema
Describes the schema for use withTableDescriptor
. It represents the schema part of a CREATE TABLE (schema) WITH (options) DDL statement in SQL. It defines columns of different kind, constraints, time attributes, and watermark strategies. It is possible to reference objects (such as functions or types) across different catalogs.
Classes for module:
Module
Defines a set of metadata, including functions, user defined types, operators, rules, etc. Metadata from modules are regarded as built-in or system metadata that users can take advantages of.
module.HiveModule
Implementation ofModule
to provide Hive built-in metadata.
Other important classes:
DataTypes
Defines a list of data types available in Table API.
Expression
Represents a logical tree for producing a computation result for a column in aTable
. Might be literal values, function calls, or field references.
TableSchema
Represents a table’s structure with field names and data types.
SqlDialect
Enumeration of valid SQL compatibility modes.
ChangelogMode
The set of changes contained in a changelog.
ExplainDetail
Defines the types of details for explain result.
-
class
pyflink.table.
TableEnvironment
(j_tenv, serializer=PickleSerializer())[source]¶ Bases:
object
A table environment is the base class, entry point, and central context for creating Table and SQL API programs.
It is unified for bounded and unbounded data processing.
A table environment is responsible for:
Connecting to external systems.
Registering and retrieving
Table
and other meta objects from a catalog.Executing SQL statements.
Offering further configuration options.
The path in methods such as
create_temporary_view()
should be a proper SQL identifier. The syntax is following [[catalog-name.]database-name.]object-name, where the catalog name and database are optional. For path resolution seeuse_catalog()
anduse_database()
. All keywords or other special characters need to be escaped.Example: cat.1.`db`.`Table` resolves to an object named ‘Table’ (table is a reserved keyword, thus must be escaped) in a catalog named ‘cat.1’ and database named ‘db’.
Note
This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.
-
add_python_archive
(archive_path: str, target_dir: str = None)[source]¶ Adds a python archive file. The file will be extracted to the working directory of python UDF worker.
If the parameter “target_dir” is specified, the archive file will be extracted to a directory named ${target_dir}. Otherwise, the archive file will be extracted to a directory with the same name of the archive file.
If python UDF depends on a specific python version which does not exist in the cluster, this method can be used to upload the virtual environment. Note that the path of the python interpreter contained in the uploaded environment should be specified via the method
pyflink.table.TableConfig.set_python_executable()
.The files uploaded via this method are also accessible in UDFs via relative path.
Example:
# command executed in shell # assert the relative path of python interpreter is py_env/bin/python $ zip -r py_env.zip py_env # python code >>> table_env.add_python_archive("py_env.zip") >>> table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python") # or >>> table_env.add_python_archive("py_env.zip", "myenv") >>> table_env.get_config().set_python_executable("myenv/py_env/bin/python") # the files contained in the archive file can be accessed in UDF >>> def my_udf(): ... with open("myenv/py_env/data/data.txt") as f: ... ...
Note
Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.5 or higher.
Note
Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc. The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
- Parameters
archive_path – The archive file path.
target_dir – Optional, the target dir name that the archive file extracted to.
New in version 1.10.0.
-
add_python_file
(file_path: str)[source]¶ Adds a python dependency which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker. Please make sure that these dependencies can be imported.
- Parameters
file_path – The path of the python dependency.
New in version 1.10.0.
-
static
create
(environment_settings: pyflink.table.environment_settings.EnvironmentSettings) → pyflink.table.table_environment.TableEnvironment[source]¶ Creates a table environment that is the entry point and central context for creating Table and SQL API programs.
- Parameters
environment_settings – The environment settings used to instantiate the
TableEnvironment
.- Returns
The
TableEnvironment
.
-
create_java_function
(path: str, function_class_name: str, ignore_if_exists: bool = None)[source]¶ Registers a java user defined function class as a catalog function in the given path.
Compared to system functions with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.
There must not be another function (temporary or permanent) registered under the same path.
Example:
>>> table_env.create_java_function("func", "java.user.defined.function.class.name")
- Parameters
path – The path under which the function will be registered. See also the
TableEnvironment
class description for the format of the path.function_class_name – The java full qualified class name of the function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.
ignore_if_exists – If a function exists under the given path and this flag is set, no operation is executed. An exception is thrown otherwise.
New in version 1.12.0.
-
create_java_temporary_function
(path: str, function_class_name: str)[source]¶ Registers a java user defined function class as a temporary catalog function.
Compared to .. seealso::
create_java_temporary_system_function()
with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary function.
Example:
>>> table_env.create_java_temporary_function("func", ... "java.user.defined.function.class.name")
- Parameters
path – The path under which the function will be registered. See also the
TableEnvironment
class description for the format of the path.function_class_name – The java full qualified class name of the function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.
New in version 1.12.0.
-
create_java_temporary_system_function
(name: str, function_class_name: str)[source]¶ Registers a java user defined function class as a temporary system function.
Compared to .. seealso::
create_java_temporary_function()
, system functions are identified by a global name that is independent of the current catalog and current database. Thus, this method allows to extend the set of built-in system functions like TRIM, ABS, etc.Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary system function.
Example:
>>> table_env.create_java_temporary_system_function("func", ... "java.user.defined.function.class.name")
- Parameters
name – The name under which the function will be registered globally.
function_class_name – The java full qualified class name of the function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.
New in version 1.12.0.
-
create_statement_set
() → pyflink.table.statement_set.StatementSet[source]¶ Create a StatementSet instance which accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.
:return statement_set instance
New in version 1.11.0.
-
create_table
(path: str, descriptor: pyflink.table.table_descriptor.TableDescriptor)[source]¶ Registers the given
TableDescriptor
as a catalog table.The TableDescriptor is converted into a CatalogTable and stored in the catalog.
If the table should not be permanently stored in a catalog, use
create_temporary_table()
instead.Examples:
>>> table_env.create_table("MyTable", TableDescriptor.for_connector("datagen") ... .schema(Schema.new_builder() ... .column("f0", DataTypes.STRING()) ... .build()) ... .option("rows-per-second", 10) ... .option("fields.f0.kind", "random") ... .build())
- Parameters
path – The path under which the table will be registered.
descriptor – Template for creating a CatalogTable instance.
New in version 1.14.0.
-
create_temporary_function
(path: str, function: Union[pyflink.table.udf.UserDefinedFunctionWrapper, pyflink.table.udf.AggregateFunction])[source]¶ Registers a python user defined function class as a temporary catalog function.
Compared to .. seealso::
create_temporary_system_function()
with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary function.
Example:
>>> table_env.create_temporary_function( ... "add_one", udf(lambda i: i + 1, result_type=DataTypes.BIGINT())) >>> @udf(result_type=DataTypes.BIGINT()) ... def add(i, j): ... return i + j >>> table_env.create_temporary_function("add", add) >>> class SubtractOne(ScalarFunction): ... def eval(self, i): ... return i - 1 >>> table_env.create_temporary_function( ... "subtract_one", udf(SubtractOne(), result_type=DataTypes.BIGINT()))
- Parameters
path – The path under which the function will be registered. See also the
TableEnvironment
class description for the format of the path.function – The function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.
New in version 1.12.0.
-
create_temporary_system_function
(name: str, function: Union[pyflink.table.udf.UserDefinedFunctionWrapper, pyflink.table.udf.AggregateFunction])[source]¶ Registers a python user defined function class as a temporary system function.
Compared to .. seealso::
create_temporary_function()
, system functions are identified by a global name that is independent of the current catalog and current database. Thus, this method allows to extend the set of built-in system functions like TRIM, ABS, etc.Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary system function.
Example:
>>> table_env.create_temporary_system_function( ... "add_one", udf(lambda i: i + 1, result_type=DataTypes.BIGINT())) >>> @udf(result_type=DataTypes.BIGINT()) ... def add(i, j): ... return i + j >>> table_env.create_temporary_system_function("add", add) >>> class SubtractOne(ScalarFunction): ... def eval(self, i): ... return i - 1 >>> table_env.create_temporary_system_function( ... "subtract_one", udf(SubtractOne(), result_type=DataTypes.BIGINT()))
- Parameters
name – The name under which the function will be registered globally.
function – The function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.
New in version 1.12.0.
-
create_temporary_table
(path: str, descriptor: pyflink.table.table_descriptor.TableDescriptor)[source]¶ Registers the given
TableDescriptor
as a temporary catalog table.The TableDescriptor is converted into a CatalogTable and stored in the catalog.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.
Examples:
>>> table_env.create_temporary_table("MyTable", TableDescriptor.for_connector("datagen") ... .schema(Schema.new_builder() ... .column("f0", DataTypes.STRING()) ... .build()) ... .option("rows-per-second", 10) ... .option("fields.f0.kind", "random") ... .build())
- Parameters
path – The path under which the table will be registered.
descriptor – Template for creating a CatalogTable instance.
New in version 1.14.0.
-
create_temporary_view
(view_path: str, table_or_data_stream: Union[pyflink.table.table.Table, pyflink.datastream.data_stream.DataStream], *fields_or_schema: Union[str, pyflink.table.expression.Expression, pyflink.table.schema.Schema])[source]¶ When table_or_data_stream is a
Table
:Registers a
Table
API object as a temporary view similar to SQL temporary views.Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
When table_or_data_stream is a
DataStream
:2.1 When fields_or_schema is a str or a sequence of
Expression
:Creates a view from the given {@link DataStream} in a given path with specified field names. Registered views can be referenced in SQL queries.
1. Reference input fields by name: All fields in the schema definition are referenced by name (and possibly renamed using an alias (as). Moreover, we can define proctime and rowtime attributes at arbitrary positions using arbitrary names (except those that exist in the result schema). In this mode, fields can be reordered and projected out. This mode can be used for any input type, including POJOs.
Example:
>>> stream = ... # reorder the fields, rename the original 'f0' field to 'name' and add # event-time attribute named 'rowtime' # use str >>> table_env.create_temporary_view( ... "cat.db.myTable", ... stream, ... "f1, rowtime.rowtime, f0 as 'name'") # or use a sequence of expression >>> table_env.create_temporary_view( ... "cat.db.myTable", ... stream, ... col("f1"), ... col("rowtime").rowtime, ... col("f0").alias('name'))
2. Reference input fields by position: In this mode, fields are simply renamed. Event-time attributes can replace the field on their position in the input data (if it is of correct type) or be appended at the end. Proctime attributes must be appended at the end. This mode can only be used if the input type has a defined field order (tuple, case class, Row) and none of the {@code fields} references a field of the input type.
Example:
>>> stream = ... # rename the original fields to 'a' and 'b' and extract the internally attached # timestamp into an event-time attribute named 'rowtime' # use str >>> table_env.create_temporary_view( ... "cat.db.myTable", stream, "a, b, rowtime.rowtime") # or use a sequence of expressions >>> table_env.create_temporary_view( ... "cat.db.myTable", ... stream, ... col("a"), ... col("b"), ... col("rowtime").rowtime)
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
2.2 When fields_or_schema is a
Schema
:Creates a view from the given {@link DataStream} in a given path. Registered views can be referenced in SQL queries.
See
from_data_stream()
for more information on how aDataStream
is translated into a table.Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
Note
create_temporary_view by providing a Schema (case 2.) was added from flink 1.14.0.
- Parameters
view_path – The path under which the view will be registered. See also the
TableEnvironment
class description for the format of the path.table_or_data_stream – The Table or DataStream out of which to create the view.
fields_or_schema – The fields expressions(str) to map original fields of the DataStream to the fields of the View or the customized schema for the final table.
New in version 1.10.0.
-
drop_function
(path: str) → bool[source]¶ Drops a catalog function registered in the given path.
- Parameters
path – The path under which the function will be registered. See also the
TableEnvironment
class description for the format of the path.- Returns
true if a function existed in the given path and was removed.
New in version 1.12.0.
-
drop_temporary_function
(path: str) → bool[source]¶ Drops a temporary system function registered under the given name.
If a permanent function with the given name exists, it will be used from now on for any queries that reference this name.
- Parameters
path – The path under which the function will be registered. See also the
TableEnvironment
class description for the format of the path.- Returns
true if a function existed in the given path and was removed.
New in version 1.12.0.
-
drop_temporary_system_function
(name: str) → bool[source]¶ Drops a temporary system function registered under the given name.
If a permanent function with the given name exists, it will be used from now on for any queries that reference this name.
- Parameters
name – The name under which the function has been registered globally.
- Returns
true if a function existed under the given name and was removed.
New in version 1.12.0.
-
drop_temporary_table
(table_path: str) → bool[source]¶ Drops a temporary table registered in the given path.
If a permanent table with a given path exists, it will be used from now on for any queries that reference this path.
- Parameters
table_path – The path of the registered temporary table.
- Returns
True if a table existed in the given path and was removed.
New in version 1.10.0.
-
drop_temporary_view
(view_path: str) → bool[source]¶ Drops a temporary view registered in the given path.
If a permanent table or view with a given path exists, it will be used from now on for any queries that reference this path.
- Returns
True if a view existed in the given path and was removed.
New in version 1.10.0.
-
execute
(job_name: str) → pyflink.common.job_execution_result.JobExecutionResult[source]¶ Triggers the program execution. The environment will execute all parts of the program.
The program execution will be logged and displayed with the provided name.
Note
It is highly advised to set all parameters in the
TableConfig
on the very beginning of the program. It is undefined what configurations values will be used for the execution if queries are mixed with config changes. It depends on the characteristic of the particular parameter. For some of them the value from the point in time of query construction (e.g. the current catalog) will be used. On the other hand some values might be evaluated according to the state from the time when this method is called (e.g. timezone).- Parameters
job_name – Desired name of the job.
- Returns
The result of the job execution, containing elapsed time and accumulators.
Note
Deprecated in 1.11. Use
execute_sql()
for single sink, usecreate_statement_set()
for multiple sinks.
-
execute_sql
(stmt: str) → pyflink.table.table_result.TableResult[source]¶ Execute the given single statement, and return the execution result.
The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. For DML and DQL, this method returns TableResult once the job has been submitted. For DDL and DCL statements, TableResult is returned once the operation has finished.
- :return content for DQL/SHOW/DESCRIBE/EXPLAIN,
the affected row count for DML (-1 means unknown), or a string message (“OK”) for other statements.
New in version 1.11.0.
-
explain
(table: pyflink.table.table.Table = None, extended: bool = False) → str[source]¶ Returns the AST of the specified Table API and SQL queries and the execution plan to compute the result of the given
Table
or multi-sinks plan.- Parameters
table – The table to be explained. If table is None, explain for multi-sinks plan, else for given table.
extended – If the plan should contain additional properties. e.g. estimated cost, traits
- Returns
The table for which the AST and execution plan will be returned.
Note
Deprecated in 1.11. Use
Table`#:func:`explain
instead.
-
explain_sql
(stmt: str, *extra_details: pyflink.table.explain_detail.ExplainDetail) → str[source]¶ Returns the AST of the specified statement and the execution plan.
- Parameters
stmt – The statement for which the AST and execution plan will be returned.
extra_details – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming
- Returns
The statement for which the AST and execution plan will be returned.
New in version 1.11.0.
-
from_descriptor
(descriptor: pyflink.table.table_descriptor.TableDescriptor) → pyflink.table.table.Table[source]¶ Returns a Table backed by the given TableDescriptor.
The TableDescriptor is registered as an inline (i.e. anonymous) temporary table (see
create_temporary_table()
) using a unique identifier and then read. Note that calling this method multiple times, even with the same descriptor, results in multiple temporary tables. In such cases, it is recommended to register it under a name usingcreate_temporary_table()
and reference it viafrom_path()
Examples:
>>> table_env.from_descriptor(TableDescriptor.for_connector("datagen") ... .schema(Schema.new_builder() ... .column("f0", DataTypes.STRING()) ... .build()) ... .build()
Note that the returned Table is an API object and only contains a pipeline description. It actually corresponds to a <i>view</i> in SQL terms. Call
execute()
in Table to trigger an execution.- Returns
The Table object describing the pipeline for further transformations.
New in version 1.14.0.
-
from_elements
(elements: Iterable, schema: Union[pyflink.table.types.DataType, List[str]] = None, verify_schema: bool = True) → pyflink.table.table.Table[source]¶ Creates a table from a collection of elements. The elements types must be acceptable atomic types or acceptable composite types. All elements must be of the same type. If the elements types are composite types, the composite types must be strictly equal, and its subtypes must also be acceptable types. e.g. if the elements are tuples, the length of the tuples must be equal, the element types of the tuples must be equal in order.
The built-in acceptable atomic element types contains:
int, long, str, unicode, bool, float, bytearray, datetime.date, datetime.time, datetime.datetime, datetime.timedelta, decimal.Decimal
The built-in acceptable composite element types contains:
list, tuple, dict, array,
Row
If the element type is a composite type, it will be unboxed. e.g. table_env.from_elements([(1, ‘Hi’), (2, ‘Hello’)]) will return a table like:
_1
_2
1
Hi
2
Hello
“_1” and “_2” are generated field names.
Example:
# use the second parameter to specify custom field names >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b']) # use the second parameter to specify custom table schema >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), ... DataTypes.FIELD("b", DataTypes.STRING())])) # use the third parameter to switch whether to verify the elements against the schema >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), ... DataTypes.FIELD("b", DataTypes.STRING())]), ... False) # create Table from expressions >>> table_env.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), ... DataTypes.FIELD("b", DataTypes.STRING()), ... DataTypes.FIELD("c", DataTypes.FLOAT())]))
- Parameters
elements – The elements to create a table from.
schema – The schema of the table.
verify_schema – Whether to verify the elements against the schema.
- Returns
The result table.
-
from_pandas
(pdf, schema: Union[pyflink.table.types.RowType, List[str], Tuple[str], List[pyflink.table.types.DataType], Tuple[pyflink.table.types.DataType]] = None, splits_num: int = 1) → pyflink.table.table.Table[source]¶ Creates a table from a pandas DataFrame.
Example:
>>> pdf = pd.DataFrame(np.random.rand(1000, 2)) # use the second parameter to specify custom field names >>> table_env.from_pandas(pdf, ["a", "b"]) # use the second parameter to specify custom field types >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])) # use the second parameter to specify custom table schema >>> table_env.from_pandas(pdf, ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.DOUBLE()), ... DataTypes.FIELD("b", DataTypes.DOUBLE())]))
- Parameters
pdf – The pandas DataFrame.
schema – The schema of the converted table.
splits_num – The number of splits the given Pandas DataFrame will be split into. It determines the number of parallel source tasks. If not specified, the default parallelism will be used.
- Returns
The result table.
New in version 1.11.0.
-
from_path
(path: str) → pyflink.table.table.Table[source]¶ Reads a registered table and returns the resulting
Table
.A table to scan must be registered in the
TableEnvironment
.See the documentation of
use_database()
oruse_catalog()
for the rules on the path resolution.Examples:
Reading a table from default catalog and database.
>>> tab = table_env.from_path("tableName")
Reading a table from a registered catalog.
>>> tab = table_env.from_path("catalogName.dbName.tableName")
Reading a table from a registered catalog with escaping. (Table is a reserved keyword). Dots in e.g. a database name also must be escaped.
>>> tab = table_env.from_path("catalogName.`db.Name`.`Table`")
- Parameters
path – The path of a table API object to scan.
- Returns
Either a table or virtual table (=view).
See also
See also
New in version 1.10.0.
-
from_table_source
(table_source: pyflink.table.sources.TableSource) → pyflink.table.table.Table[source]¶ Creates a table from a table source.
Example:
>>> csv_table_source = CsvTableSource( ... csv_file_path, ['a', 'b'], [DataTypes.STRING(), DataTypes.BIGINT()]) >>> table_env.from_table_source(csv_table_source)
- Parameters
table_source – The table source used as table.
- Returns
The result table.
-
get_catalog
(catalog_name: str) → pyflink.table.catalog.Catalog[source]¶ Gets a registered
Catalog
by name.- Parameters
catalog_name – The name to look up the
Catalog
.- Returns
The requested catalog, None if there is no registered catalog with given name.
-
get_config
() → pyflink.table.table_config.TableConfig[source]¶ Returns the table config to define the runtime behavior of the Table API.
- Returns
Current table config.
-
get_current_catalog
() → str[source]¶ Gets the current default catalog name of the current session.
- Returns
The current default catalog name that is used for the path resolution.
See also
-
get_current_database
() → str[source]¶ Gets the current default database name of the running session.
- Returns
The name of the current database of the current catalog.
See also
-
insert_into
(target_path: str, table: pyflink.table.table.Table)[source]¶ Instructs to write the content of a
Table
API object into a table.See the documentation of
use_database()
oruse_catalog()
for the rules on the path resolution.Example:
>>> tab = table_env.scan("tableName") >>> table_env.insert_into("sink", tab)
- Parameters
Changed in version 1.10.0: The signature is changed, e.g. the parameter table_path_continued was removed and the parameter target_path is moved before the parameter table.
Note
Deprecated in 1.11. Use
execute_insert()
for single sink, usecreate_statement_set()
for multiple sinks.
-
list_catalogs
() → List[str][source]¶ Gets the names of all catalogs registered in this environment.
- Returns
List of catalog names.
-
list_databases
() → List[str][source]¶ Gets the names of all databases in the current catalog.
- Returns
List of database names in the current catalog.
-
list_full_modules
() → List[pyflink.table.module.ModuleEntry][source]¶ Gets the names and statuses of all modules loaded in this environment.
- Returns
List of module names and use statuses.
New in version 1.13.0.
-
list_functions
() → List[str][source]¶ Gets the names of all functions in this environment.
- Returns
List of the names of all functions in this environment.
New in version 1.10.0.
-
list_modules
() → List[str][source]¶ Gets the names of all modules used in this environment.
- Returns
List of module names.
New in version 1.10.0.
-
list_tables
() → List[str][source]¶ Gets the names of all tables and views in the current database of the current catalog. It returns both temporary and permanent tables and views.
- Returns
List of table and view names in the current database of the current catalog.
-
list_temporary_tables
() → List[str][source]¶ Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog).
- Returns
A list of the names of all registered temporary tables and views in the current database of the current catalog.
See also
New in version 1.10.0.
-
list_temporary_views
() → List[str][source]¶ Gets the names of all temporary views available in the current namespace (the current database of the current catalog).
- Returns
A list of the names of all registered temporary views in the current database of the current catalog.
See also
New in version 1.10.0.
-
list_user_defined_functions
() → List[str][source]¶ Gets the names of all user defined functions registered in this environment.
- Returns
List of the names of all user defined functions registered in this environment.
-
list_views
() → List[str][source]¶ Gets the names of all views in the current database of the current catalog. It returns both temporary and permanent views.
- Returns
List of view names in the current database of the current catalog.
New in version 1.11.0.
-
load_module
(module_name: str, module: pyflink.table.module.Module)[source]¶ Loads a
Module
under a unique name. Modules will be kept in the loaded order. ValidationException is thrown when there is already a module with the same name.- Parameters
module_name – Name of the
Module
.module – The module instance.
New in version 1.12.0.
-
register_catalog
(catalog_name: str, catalog: pyflink.table.catalog.Catalog)[source]¶ Registers a
Catalog
under a unique name. All tables registered in theCatalog
can be accessed.- Parameters
catalog_name – The name under which the catalog will be registered.
catalog – The catalog to register.
-
register_function
(name: str, function: pyflink.table.udf.UserDefinedFunctionWrapper)[source]¶ Registers a python user-defined function under a unique name. Replaces already existing user-defined function under this name.
Example:
>>> table_env.register_function( ... "add_one", udf(lambda i: i + 1, result_type=DataTypes.BIGINT())) >>> @udf(result_type=DataTypes.BIGINT()) ... def add(i, j): ... return i + j >>> table_env.register_function("add", add) >>> class SubtractOne(ScalarFunction): ... def eval(self, i): ... return i - 1 >>> table_env.register_function( ... "subtract_one", udf(SubtractOne(), result_type=DataTypes.BIGINT()))
- Parameters
name – The name under which the function is registered.
function – The python user-defined function to register.
New in version 1.10.0.
Note
Deprecated in 1.12. Use
create_temporary_system_function()
instead.
-
register_java_function
(name: str, function_class_name: str)[source]¶ Registers a java user defined function under a unique name. Replaces already existing user-defined functions under this name. The acceptable function type contains ScalarFunction, TableFunction and AggregateFunction.
Example:
>>> table_env.register_java_function("func1", "java.user.defined.function.class.name")
- Parameters
name – The name under which the function is registered.
function_class_name – The java full qualified class name of the function to register. The function must have a public no-argument constructor and can be founded in current Java classloader.
Note
Deprecated in 1.12. Use
create_java_temporary_system_function()
instead.
-
register_table
(name: str, table: pyflink.table.table.Table)[source]¶ Registers a
Table
under a unique name in the TableEnvironment’s catalog. Registered tables can be referenced in SQL queries.Example:
>>> tab = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b']) >>> table_env.register_table("source", tab)
- Parameters
name – The name under which the table will be registered.
table – The table to register.
Note
Deprecated in 1.10. Use
create_temporary_view()
instead.
-
register_table_sink
(name: str, table_sink: pyflink.table.sinks.TableSink)[source]¶ Registers an external
TableSink
with given field names and types in thisTableEnvironment
’s catalog. Registered sink tables can be referenced in SQL DML statements.Example:
>>> table_env.register_table_sink("sink", ... CsvTableSink(["a", "b"], ... [DataTypes.INT(), ... DataTypes.STRING()], ... "./2.csv"))
- Parameters
name – The name under which the table sink is registered.
table_sink – The table sink to register.
Note
Deprecated in 1.10. Use
execute_sql()
instead.
-
register_table_source
(name: str, table_source: pyflink.table.sources.TableSource)[source]¶ Registers an external
TableSource
in thisTableEnvironment
’s catalog. Registered tables can be referenced in SQL queries.Example:
>>> table_env.register_table_source("source", ... CsvTableSource("./1.csv", ... ["a", "b"], ... [DataTypes.INT(), ... DataTypes.STRING()]))
- Parameters
name – The name under which the table source is registered.
table_source – The table source to register.
Note
Deprecated in 1.10. Use
execute_sql()
instead.
-
scan
(*table_path: str) → pyflink.table.table.Table[source]¶ Scans a registered table and returns the resulting
Table
. A table to scan must be registered in the TableEnvironment. It can be either directly registered or be an external member of aCatalog
.See the documentation of
use_database()
oruse_catalog()
for the rules on the path resolution.Examples:
Scanning a directly registered table
>>> tab = table_env.scan("tableName")
Scanning a table from a registered catalog
>>> tab = table_env.scan("catalogName", "dbName", "tableName")
- Parameters
table_path – The path of the table to scan.
- Throws
Exception if no table is found using the given table path.
- Returns
The resulting table.
Note
Deprecated in 1.10. Use
from_path()
instead.
-
set_python_requirements
(requirements_file_path: str, requirements_cache_dir: str = None)[source]¶ Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker.
For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter “requirements_cached_dir”. It will be uploaded to the cluster to support offline installation.
Example:
# commands executed in shell $ echo numpy==1.16.5 > requirements.txt $ pip download -d cached_dir -r requirements.txt --no-binary :all: # python code >>> table_env.set_python_requirements("requirements.txt", "cached_dir")
Note
Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of SetupTools (version >= 37.0.0).
- Parameters
requirements_file_path – The path of “requirements.txt” file.
requirements_cache_dir – The path of the local directory which contains the installation packages.
New in version 1.10.0.
-
sql_query
(query: str) → pyflink.table.table.Table[source]¶ Evaluates a SQL query on registered tables and retrieves the result as a
Table
.All tables referenced by the query must be registered in the TableEnvironment.
A
Table
is automatically registered when its__str__()
method is called, for example when it is embedded into a String.Hence, SQL queries can directly reference a
Table
as follows:>>> table = ... # the table is not registered to the table environment >>> table_env.sql_query("SELECT * FROM %s" % table)
- Parameters
query – The sql query string.
- Returns
The result table.
-
sql_update
(stmt: str)[source]¶ Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
Note
Currently only SQL INSERT statements and CREATE TABLE statements are supported.
All tables referenced by the query must be registered in the TableEnvironment. A
Table
is automatically registered when its__str__()
method is called, for example when it is embedded into a String. Hence, SQL queries can directly reference aTable
as follows:# register the table sink into which the result is inserted. >>> table_env.register_table_sink("sink_table", table_sink) >>> source_table = ... # source_table is not registered to the table environment >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
A DDL statement can also be executed to create/drop a table: For example, the below DDL statement would create a CSV table named tbl1 into the current catalog:
create table tbl1( a int, b bigint, c varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )
SQL queries can directly execute as follows:
>>> source_ddl = \ ... ''' ... create table sourceTable( ... a int, ... b varchar ... ) with ( ... 'connector.type' = 'kafka', ... 'update-mode' = 'append', ... 'connector.topic' = 'xxx', ... 'connector.properties.bootstrap.servers' = 'localhost:9092' ... ) ... ''' >>> sink_ddl = \ ... ''' ... create table sinkTable( ... a int, ... b varchar ... ) with ( ... 'connector.type' = 'filesystem', ... 'format.type' = 'csv', ... 'connector.path' = 'xxx' ... ) ... ''' >>> query = "INSERT INTO sinkTable SELECT FROM sourceTable" >>> table_env.sql(source_ddl) >>> table_env.sql(sink_ddl) >>> table_env.sql(query) >>> table_env.execute("MyJob")
- Parameters
stmt – The SQL statement to evaluate.
Note
Deprecated in 1.11. Use
execute_sql()
for single statement, usecreate_statement_set()
for multiple DML statements.
-
unload_module
(module_name: str)[source]¶ Unloads a
Module
with given name. ValidationException is thrown when there is no module with the given name.- Parameters
module_name – Name of the
Module
.
New in version 1.12.0.
-
use_catalog
(catalog_name: str)[source]¶ Sets the current catalog to the given value. It also sets the default database to the catalog’s default one. See also
use_database()
.This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to
default_catalog
and default database set todefault_database
.root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path
Resolved path
tab1
default_catalog.default_database.tab1
db1.tab1
default_catalog.db1.tab1
cat1.db1.tab1
cat1.db1.tab1
- Parameters
catalog_name – The name of the catalog to set as the current default catalog.
- Throws
CatalogException
thrown if a catalog with given name could not be set as the default one.
See also
-
use_database
(database_name: str)[source]¶ Sets the current default database. It has to exist in the current catalog. That path will be used as the default one when looking for unqualified object names.
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to
default_catalog
and default database set todefault_database
.root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path
Resolved path
tab1
default_catalog.default_database.tab1
db1.tab1
default_catalog.db1.tab1
cat1.db1.tab1
cat1.db1.tab1
- Throws
CatalogException
thrown if the given catalog and database could not be set as the default ones.
See also
- Parameters
database_name – The name of the database to set as the current database.
-
class
pyflink.table.
StreamTableEnvironment
(j_tenv)[source]¶ Bases:
pyflink.table.table_environment.TableEnvironment
-
static
create
(stream_execution_environment: pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment = None, table_config: pyflink.table.table_config.TableConfig = None, environment_settings: pyflink.table.environment_settings.EnvironmentSettings = None) → pyflink.table.table_environment.StreamTableEnvironment[source]¶ Creates a
StreamTableEnvironment
.Example:
# create with StreamExecutionEnvironment. >>> env = StreamExecutionEnvironment.get_execution_environment() >>> table_env = StreamTableEnvironment.create(env) # create with StreamExecutionEnvironment and TableConfig. >>> table_config = TableConfig() >>> table_config.set_null_check(False) >>> table_env = StreamTableEnvironment.create(env, table_config) # create with StreamExecutionEnvironment and EnvironmentSettings. >>> environment_settings = EnvironmentSettings.in_streaming_mode() >>> table_env = StreamTableEnvironment.create( ... env, environment_settings=environment_settings) # create with EnvironmentSettings. >>> table_env = StreamTableEnvironment.create(environment_settings=environment_settings)
- Parameters
stream_execution_environment – The
StreamExecutionEnvironment
of the TableEnvironment.table_config – The configuration of the TableEnvironment, optional.
environment_settings – The environment settings used to instantiate the TableEnvironment.
- Returns
The StreamTableEnvironment created from given StreamExecutionEnvironment and configuration.
-
from_changelog_stream
(data_stream: pyflink.datastream.data_stream.DataStream, schema: pyflink.table.schema.Schema = None, changelog_mode: pyflink.table.changelog_mode.ChangelogMode = None) → pyflink.table.table.Table[source]¶ Converts the given DataStream of changelog entries into a Table.
Compared to
from_data_stream()
, this method consumes instances of Row and evaluates the RowKind flag that is contained in every record during runtime. The runtime behavior is similar to that of a DynamicTableSource.If you don’t specify the changelog_mode, the changelog containing all kinds of changes (enumerated in RowKind) as the default ChangelogMode.
Column names and types of the Table are automatically derived from the TypeInformation of the DataStream. If the outermost record’s TypeInformation is a CompositeType, it will be flattened in the first level. Composite nested fields will not be accessible.
By default, the stream record’s timestamp and watermarks are not propagated unless explicitly declared.
This method allows to declare a Schema for the resulting table. The declaration is similar to a {@code CREATE TABLE} DDL in SQL and allows to:
enrich or overwrite automatically derived columns with a custom DataType
reorder columns
add computed or metadata columns next to the physical columns
access a stream record’s timestamp
declare a watermark strategy or propagate the DataStream watermarks
declare a primary key
See
from_data_stream()
for more information and examples of how to declare a Schema.- Parameters
data_stream – The changelog stream of Row.
schema – The customized schema for the final table.
changelog_mode – The expected kinds of changes in the incoming changelog.
- Returns
The converted Table.
-
from_data_stream
(data_stream: pyflink.datastream.data_stream.DataStream, *fields_or_schema: Union[str, pyflink.table.expression.Expression, pyflink.table.schema.Schema]) → pyflink.table.table.Table[source]¶ When fields_or_schema is a str or a sequence of Expression:
Converts the given DataStream into a Table with specified field names.
There are two modes for mapping original fields to the fields of the Table:
Reference input fields by name:
All fields in the schema definition are referenced by name (and possibly renamed using and alias (as). Moreover, we can define proctime and rowtime attributes at arbitrary positions using arbitrary names (except those that exist in the result schema). In this mode, fields can be reordered and projected out. This mode can be used for any input type.
Reference input fields by position:
In this mode, fields are simply renamed. Event-time attributes can replace the field on their position in the input data (if it is of correct type) or be appended at the end. Proctime attributes must be appended at the end. This mode can only be used if the input type has a defined field order (tuple, case class, Row) and none of the fields references a field of the input type.
When fields_or_schema is a Schema:
Converts the given DataStream into a Table.
Column names and types of the Table are automatically derived from the TypeInformation of the DataStream. If the outermost record’s TypeInformation is a CompositeType, it will be flattened in the first level. Composite nested fields will not be accessible.
Since the DataStream API does not support changelog processing natively, this method assumes append-only/insert-only semantics during the stream-to-table conversion. Records of class Row must describe RowKind.INSERT changes.
By default, the stream record’s timestamp and watermarks are not propagated unless explicitly declared.
This method allows to declare a Schema for the resulting table. The declaration is similar to a {@code CREATE TABLE} DDL in SQL and allows to:
enrich or overwrite automatically derived columns with a custom DataType
reorder columns
add computed or metadata columns next to the physical columns
access a stream record’s timestamp
declare a watermark strategy or propagate the DataStream watermarks
It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.
The following examples illustrate common schema declarations and their semantics:
Example:
=== EXAMPLE 1 === no physical columns defined, they will be derived automatically, e.g. BigDecimal becomes DECIMAL(38, 18) >>> Schema.new_builder() ... .column_by_expression("c1", "f1 + 42") ... .column_by_expression("c2", "f1 - 1") ... .build() equal to: CREATE TABLE (f0 STRING, f1 DECIMAL(38, 18), c1 AS f1 + 42, c2 AS f1 - 1) === EXAMPLE 2 === physical columns defined, input fields and columns will be mapped by name, columns are reordered and their data type overwritten, all columns must be defined to show up in the final table's schema >>> Schema.new_builder() ... .column("f1", "DECIMAL(10, 2)") ... .column_by_expression("c", "f1 - 1") ... .column("f0", "STRING") ... .build() equal to: CREATE TABLE (f1 DECIMAL(10, 2), c AS f1 - 1, f0 STRING) === EXAMPLE 3 === timestamp and watermarks can be added from the DataStream API, physical columns will be derived automatically >>> Schema.new_builder() ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") ... .watermark("rowtime", "SOURCE_WATERMARK()") ... .build() equal to: CREATE TABLE ( f0 STRING, f1 DECIMAL(38, 18), rowtime TIMESTAMP(3) METADATA, WATERMARK FOR rowtime AS SOURCE_WATERMARK() )
Note
create_temporary_view by providing a Schema (case 2.) was added from flink 1.14.0.
- Parameters
data_stream – The datastream to be converted.
fields_or_schema – The fields expressions to map original fields of the DataStream to the fields of the Table or the customized schema for the final table.
- Returns
The converted Table.
New in version 1.12.0.
-
to_append_stream
(table: pyflink.table.table.Table, type_info: pyflink.common.typeinfo.TypeInformation) → pyflink.datastream.data_stream.DataStream[source]¶ Converts the given Table into a DataStream of a specified type. The Table must only have insert (append) changes. If the Table is also modified by update or delete changes, the conversion will fail.
The fields of the Table are mapped to DataStream as follows: Row and Tuple types: Fields are mapped by position, field types must match.
- Parameters
table – The Table to convert.
type_info – The TypeInformation that specifies the type of the DataStream.
- Returns
The converted DataStream.
New in version 1.12.0.
-
to_changelog_stream
(table: pyflink.table.table.Table, target_schema: pyflink.table.schema.Schema = None, changelog_mode: pyflink.table.changelog_mode.ChangelogMode = None) → pyflink.datastream.data_stream.DataStream[source]¶ Converts the given Table into a DataStream of changelog entries.
Compared to
to_data_stream()
, this method produces instances of Row and sets the RowKind flag that is contained in every record during runtime. The runtime behavior is similar to that of a DynamicTableSink.If you don’t specify the changelog_mode, the changelog containing all kinds of changes (enumerated in RowKind) as the default ChangelogMode.
The given Schema is used to configure the table runtime to convert columns and internal data structures to the desired representation. The following example shows how to convert a table column into a Row type.
Example:
>>> table_env.to_changelog_stream( ... table, ... Schema.new_builder() ... .column("id", DataTypes.BIGINT()) ... .column("payload", DataTypes.ROW( ... [DataTypes.FIELD("name", DataTypes.STRING()), ... DataTypes.FIELD("age", DataTypes.INT())])) ... .build())
Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the Types semantics of the DataStream API need to be considered.
If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp. Watermarks will be propagated as well.
If the rowtime should not be a concrete field in the final Row anymore, or the schema should be symmetrical for both
from_changelog_stream()
andto_changelog_stream()
, the rowtime can also be declared as a metadata column that will be propagated into a stream record’s timestamp. It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.The following examples illustrate common schema declarations and their semantics:
Example:
given a Table of (id INT, name STRING, my_rowtime TIMESTAMP_LTZ(3)) === EXAMPLE 1 === no physical columns defined, they will be derived automatically, the last derived physical column will be skipped in favor of the metadata column >>> Schema.new_builder() ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") ... .build() equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA) === EXAMPLE 2 === physical columns defined, all columns must be defined >>> Schema.new_builder() ... .column("id", "INT") ... .column("name", "STRING") ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") ... .build() equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA)
- Parameters
table – The Table to convert. It can be updating or insert-only.
target_schema – The Schema that decides about the final external representation in DataStream records.
changelog_mode – The required kinds of changes in the result changelog. An exception will be thrown if the given updating table cannot be represented in this changelog mode.
- Returns
The converted changelog stream of Row.
-
to_data_stream
(table: pyflink.table.table.Table) → pyflink.datastream.data_stream.DataStream[source]¶ Converts the given Table into a DataStream.
Since the DataStream API does not support changelog processing natively, this method assumes append-only/insert-only semantics during the table-to-stream conversion. The records of class Row will always describe RowKind#INSERT changes. Updating tables are not supported by this method and will produce an exception.
Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the Types semantics of the DataStream API need to be considered.
If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp. Watermarks will be propagated as well.
- Parameters
table – The Table to convert.
- Returns
The converted DataStream.
-
to_retract_stream
(table: pyflink.table.table.Table, type_info: pyflink.common.typeinfo.TypeInformation) → pyflink.datastream.data_stream.DataStream[source]¶ Converts the given Table into a DataStream of add and retract messages. The message will be encoded as Tuple. The first field is a boolean flag, the second field holds the record of the specified type.
A true flag indicates an add message, a false flag indicates a retract message.
The fields of the Table are mapped to DataStream as follows: Row and Tuple types: Fields are mapped by position, field types must match.
- Parameters
table – The Table to convert.
type_info – The TypeInformation of the requested record type.
- Returns
The converted DataStream.
New in version 1.12.0.
-
static
-
class
pyflink.table.
Table
(j_table, t_env)[source]¶ Bases:
object
A
Table
is the core component of the Table API. Similar to how the DataStream API has DataStream, the Table API is built aroundTable
.Use the methods of
Table
to transform data.Example:
>>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_parallelism(1) >>> t_env = StreamTableEnvironment.create(env) >>> ... >>> t_env.register_table_source("source", ...) >>> t = t_env.from_path("source") >>> t.select(...) >>> ... >>> t_env.register_table_sink("result", ...) >>> t.execute_insert("result")
Operations such as
join()
,select()
,where()
andgroup_by()
take arguments in an expression string. Please refer to the documentation for the expression syntax.-
add_columns
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Adds additional columns. Similar to a SQL SELECT statement. The field expressions can contain complex expressions, but can not contain aggregations. It will throw an exception if the added fields already exist.
Example:
>>> from pyflink.table import expressions as expr >>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b, 'sunny').alias('b1')) >>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
- Parameters
fields – Column list string.
- Returns
The result table.
-
add_or_replace_columns
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Adds additional columns. Similar to a SQL SELECT statement. The field expressions can contain complex expressions, but can not contain aggregations. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.
Example:
>>> from pyflink.table import expressions as expr >>> tab.add_or_replace_columns((tab.a + 1).alias('a1'), ... expr.concat(tab.b, 'sunny').alias('b1')) >>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as b1")
- Parameters
fields – Column list string.
- Returns
The result table.
-
aggregate
(func: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedAggregateFunctionWrapper]) → pyflink.table.table.AggregatedTable[source]¶ Performs a global aggregate operation with an aggregate function. You have to close the aggregate with a select statement.
Example:
>>> agg = udaf(lambda a: (a.mean(), a.max()), ... result_type=DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b") >>> # take all the columns as inputs >>> # pd is a Pandas.DataFrame >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.a.max()), ... result_type=DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") >>> tab.aggregate(agg.alias("a, b")).select("a, b")
- Parameters
func – user-defined aggregate function.
- Returns
The result table.
New in version 1.13.0.
-
alias
(field: str, *fields: str) → pyflink.table.table.Table[source]¶ Renames the fields of the expression result. Use this to disambiguate fields before joining two tables.
Example:
>>> tab.alias("a", "b", "c") >>> tab.alias("a, b, c")
- Parameters
field – Field alias.
fields – Additional field aliases.
- Returns
The result table.
-
distinct
() → pyflink.table.table.Table[source]¶ Removes duplicate values and returns only distinct (different) values.
Example:
>>> tab.select(tab.key, tab.value).distinct()
- Returns
The result table.
-
drop_columns
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Drops existing columns. The field expressions should be field reference expressions.
Example:
>>> tab.drop_columns(tab.a, tab.b) >>> tab.drop_columns("a, b")
- Parameters
fields – Column list string.
- Returns
The result table.
-
execute
() → pyflink.table.table_result.TableResult[source]¶ Collects the contents of the current table local client.
Example:
>>> tab.execute()
- Returns
The content of the table.
New in version 1.11.0.
-
execute_insert
(table_path_or_descriptor: Union[str, pyflink.table.table_descriptor.TableDescriptor], overwrite: bool = False) → pyflink.table.table_result.TableResult[source]¶ When target_path_or_descriptor is a tale path:
Writes the
Table
to aTableSink
that was registered under the specified name, and then execute the insert operation. For the path resolution algorithm seeuse_database()
.Example:
>>> tab.execute_insert("sink")
When target_path_or_descriptor is a table descriptor:
Declares that the pipeline defined by the given Table object should be written to a table (backed by a DynamicTableSink) expressed via the given TableDescriptor. It executes the insert operation.
TableDescriptor is registered as an inline (i.e. anonymous) temporary catalog table (see
create_temporary_table()
) using a unique identifier. Note that calling this method multiple times, even with the same descriptor, results in multiple sink tables being registered.This method allows to declare a
Schema
for the sink descriptor. The declaration is similar to a {@code CREATE TABLE} DDL in SQL and allows to:overwrite automatically derived columns with a custom DataType
add metadata columns next to the physical columns
declare a primary key
It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.
Examples:
>>> schema = Schema.new_builder() ... .column("f0", DataTypes.STRING()) ... .build() >>> table = table_env.from_descriptor(TableDescriptor.for_connector("datagen") ... .schema(schema) ... .build()) >>> table.execute_insert(TableDescriptor.for_connector("blackhole") ... .schema(schema) ... .build())
If multiple pipelines should insert data into one or more sink tables as part of a single execution, use a
StatementSet
(seecreate_statement_set()
).By default, all insertion operations are executed asynchronously. Use
await()
orget_job_client()
to monitor the execution.Note
execute_insert for a table descriptor (case 2.) was added from flink 1.14.0.
- Parameters
- Returns
The table result.
New in version 1.11.0.
-
explain
(*extra_details: pyflink.table.explain_detail.ExplainDetail) → str[source]¶ Returns the AST of this table and the execution plan.
- Parameters
extra_details – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming
- Returns
The statement for which the AST and execution plan will be returned.
New in version 1.11.0.
-
fetch
(fetch: int) → pyflink.table.table.Table[source]¶ Limits a (possibly sorted) result to the first n rows.
This method can be combined with a preceding
order_by()
call for a deterministic order andoffset()
call to return n rows after skipping the first o rows.Example:
Returns the first 3 records.
>>> tab.order_by(tab.name.desc).fetch(3) >>> tab.order_by("name.desc").fetch(3)
Skips the first 10 rows and returns the next 5 rows.
>>> tab.order_by(tab.name.desc).offset(10).fetch(5)
- Parameters
fetch – The number of records to return. Fetch must be >= 0.
- Returns
The result table.
-
filter
(predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶ Filters out elements that don’t pass the filter predicate. Similar to a SQL WHERE clause.
Example:
>>> tab.filter(tab.name == 'Fred') >>> tab.filter("name = 'Fred'")
- Parameters
predicate – Predicate expression string.
- Returns
The result table.
-
flat_aggregate
(func: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedAggregateFunctionWrapper]) → pyflink.table.table.FlatAggregateTable[source]¶ Perform a global flat_aggregate without group_by. flat_aggregate takes a
TableAggregateFunction
which returns multiple rows. Use a selection after the flat_aggregate.Example:
>>> table_agg = udtaf(MyTableAggregateFunction()) >>> tab.flat_aggregate(table_agg(tab.a).alias("a", "b")).select("a, b") >>> # take all the columns as inputs >>> class Top2(TableAggregateFunction): ... def emit_value(self, accumulator): ... yield Row(accumulator[0]) ... yield Row(accumulator[1]) ... ... def create_accumulator(self): ... return [None, None] ... ... def accumulate(self, accumulator, *args): ... args[0] # type: Row ... if args[0][0] is not None: ... if accumulator[0] is None or args[0][0] > accumulator[0]: ... accumulator[1] = accumulator[0] ... accumulator[0] = args[0][0] ... elif accumulator[1] is None or args[0][0] > accumulator[1]: ... accumulator[1] = args[0][0] ... ... def get_accumulator_type(self): ... return DataTypes.ARRAY(DataTypes.BIGINT()) ... ... def get_result_type(self): ... return DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) >>> top2 = udtaf(Top2()) >>> tab.flat_aggregate(top2.alias("a", "b")).select("a, b")
- Parameters
func – user-defined table aggregate function.
- Returns
The result table.
New in version 1.13.0.
-
flat_map
(func: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedTableFunctionWrapper]) → pyflink.table.table.Table[source]¶ Performs a flatMap operation with a user-defined table function.
Example:
>>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) ... def split(x, string): ... for s in string.split(","): ... yield x, s >>> tab.flat_map(split(tab.a, table.b)) >>> # take all the columns as inputs >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) ... def split_row(row: Row): ... for s in row[1].split(","): ... yield row[0], s >>> tab.flat_map(split_row)
- Parameters
func – user-defined table function.
- Returns
The result table.
New in version 1.13.0.
-
full_outer_join
(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶ Joins two
Table
. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary.Note
Both tables must be bound to the same
TableEnvironment
and itsTableConfig
must have null check enabled (default).Example:
>>> left.full_outer_join(right, left.a == right.b) >>> left.full_outer_join(right, "a = b")
- Parameters
right – Right table.
join_predicate – The join predicate expression string.
- Returns
The result table.
-
get_schema
() → pyflink.table.table_schema.TableSchema[source]¶ Returns the
TableSchema
of this table.- Returns
The schema of this table.
-
group_by
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.GroupedTable[source]¶ Groups the elements on some grouping keys. Use this before a selection with aggregations to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
Example:
>>> tab.group_by(tab.key).select(tab.key, tab.value.avg) >>> tab.group_by("key").select("key, value.avg")
- Parameters
fields – Group keys.
- Returns
The grouped table.
-
insert_into
(table_path: str)[source]¶ Writes the
Table
to aTableSink
that was registered under the specified name. For the path resolution algorithm seeuse_database()
.Example:
>>> tab.insert_into("sink")
Note
Deprecated in 1.11. Use
execute_insert()
for single sink, useTableTableEnvironment`#:func:`create_statement_set
for multiple sinks.
-
intersect
(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶ Intersects two
Table
with duplicate records removed. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Similar to a SQL INTERSECT. The fields of the two intersect operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.intersect(right)
- Parameters
right – Right table.
- Returns
The result table.
-
intersect_all
(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶ Intersects two
Table
. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Similar to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.intersect_all(right)
- Parameters
right – Right table.
- Returns
The result table.
-
join
(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None)[source]¶ Joins two
Table
. Similar to a SQL join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary. You can use where and select clauses after a join to further specify the behaviour of the join.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.join(right).where((left.a == right.b) && (left.c > 3)) >>> left.join(right).where("a = b && c > 3") >>> left.join(right, left.a == right.b)
- Parameters
right – Right table.
join_predicate – Optional, the join predicate expression string.
- Returns
The result table.
-
join_lateral
(table_function_call: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedTableFunctionWrapper], join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None) → pyflink.table.table.Table[source]¶ Joins this Table with an user-defined TableFunction. This join is similar to a SQL inner join but works with a table function. Each row of the table is joined with the rows produced by the table function.
Example:
>>> t_env.create_java_temporary_system_function("split", ... "java.table.function.class.name") >>> tab.join_lateral("split(text, ' ') as (b)", "a = b") >>> from pyflink.table import expressions as expr >>> tab.join_lateral(expr.call('split', ' ').alias('b'), expr.col('a') == expr.col('b')) >>> # take all the columns as inputs >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) ... def split_row(row: Row): ... for s in row[1].split(","): ... yield row[0], s >>> tab.join_lateral(split_row.alias("a", "b"))
- Parameters
table_function_call – An expression representing a table function call.
join_predicate – Optional, The join predicate expression string, join ON TRUE if not exist.
- Returns
The result Table.
-
left_outer_join
(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None) → pyflink.table.table.Table[source]¶ Joins two
Table
. Similar to a SQL left outer join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary.Note
Both tables must be bound to the same
TableEnvironment
and itsTableConfig
must have null check enabled (default).Example:
>>> left.left_outer_join(right) >>> left.left_outer_join(right, left.a == right.b) >>> left.left_outer_join(right, "a = b")
- Parameters
right – Right table.
join_predicate – Optional, the join predicate expression string.
- Returns
The result table.
-
left_outer_join_lateral
(table_function_call: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedTableFunctionWrapper], join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None) → pyflink.table.table.Table[source]¶ Joins this Table with an user-defined TableFunction. This join is similar to a SQL left outer join but works with a table function. Each row of the table is joined with all rows produced by the table function. If the join does not produce any row, the outer row is padded with nulls.
Example:
>>> t_env.create_java_temporary_system_function("split", ... "java.table.function.class.name") >>> tab.left_outer_join_lateral("split(text, ' ') as (b)") >>> from pyflink.table import expressions as expr >>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b')) >>> # take all the columns as inputs >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) ... def split_row(row: Row): ... for s in row[1].split(","): ... yield row[0], s >>> tab.left_outer_join_lateral(split_row.alias("a", "b"))
- Parameters
table_function_call – An expression representing a table function call.
join_predicate – Optional, The join predicate expression string, join ON TRUE if not exist.
- Returns
The result Table.
-
limit
(fetch: int, offset: int = 0) → pyflink.table.table.Table[source]¶ Limits a (possibly sorted) result to the first n rows.
This method is a synonym for
offset()
followed byfetch()
.Example:
Returns the first 3 records.
>>> tab.limit(3)
Skips the first 10 rows and returns the next 5 rows.
>>> tab.limit(5, 10)
- Parameters
fetch – the first number of rows to fetch.
offset – the number of records to skip, default 0.
- Returns
The result table.
-
map
(func: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedScalarFunctionWrapper]) → pyflink.table.table.Table[source]¶ Performs a map operation with a user-defined scalar function.
Example:
>>> add = udf(lambda x: Row(x + 1, x * x), result_type=DataTypes.Row( ... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.INT())])) >>> tab.map(add(tab.a)).alias("a, b") >>> # take all the columns as inputs >>> identity = udf(lambda row: row, result_type=DataTypes.Row( ... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.INT())])) >>> tab.map(identity)
- Parameters
func – user-defined scalar function.
- Returns
The result table.
New in version 1.13.0.
-
minus
(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶ Minus of two
Table
with duplicate records removed. Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.minus(right)
- Parameters
right – Right table.
- Returns
The result table.
-
minus_all
(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶ Minus of two
Table
. Similar to a SQL EXCEPT ALL. Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.minus_all(right)
- Parameters
right – Right table.
- Returns
The result table.
-
offset
(offset: int) → pyflink.table.table.Table[source]¶ Limits a (possibly sorted) result from an offset position.
This method can be combined with a preceding
order_by()
call for a deterministic order and a subsequentfetch()
call to return n rows after skipping the first o rows.Example:
# skips the first 3 rows and returns all following rows. >>> tab.order_by(tab.name.desc).offset(3) >>> tab.order_by("name.desc").offset(3) # skips the first 10 rows and returns the next 5 rows. >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
For unbounded tables, this operation requires a subsequent fetch operation.
- Parameters
offset – Number of records to skip.
- Returns
The result table.
-
order_by
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Sorts the given
Table
. Similar to SQL ORDER BY. The resulting Table is sorted globally sorted across all parallel partitions.Example:
>>> tab.order_by(tab.name.desc) >>> tab.order_by("name.desc")
For unbounded tables, this operation requires a sorting on a time attribute or a subsequent fetch operation.
- Parameters
fields – Order fields expression string.
- Returns
The result table.
-
over_window
(*over_windows: pyflink.table.window.OverWindow) → pyflink.table.table.OverWindowedTable[source]¶ Defines over-windows on the records of a table.
An over-window defines for each record an interval of records over which aggregation functions can be computed.
Example:
>>> from pyflink.table import expressions as expr >>> tab.over_window(Over.partition_by(tab.c).order_by(tab.rowtime) \ ... .preceding(lit(10).seconds).alias("ow")) \ ... .select(tab.c, tab.b.count.over(col('ow'), tab.e.sum.over(col('ow'))))
Note
Computing over window aggregates on a streaming table is only a parallel operation if the window is partitioned. Otherwise, the whole stream will be processed by a single task, i.e., with parallelism 1.
Note
Over-windows for batch tables are currently not supported.
- Parameters
over_windows – over windows created from
Over
.- Returns
A over windowed table.
-
rename_columns
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Renames existing columns. Similar to a field alias statement. The field expressions should be alias expressions, and only the existing fields can be renamed.
Example:
>>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1')) >>> tab.rename_columns("a as a1, b as b1")
- Parameters
fields – Column list string.
- Returns
The result table.
-
right_outer_join
(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶ Joins two
Table
. Similar to a SQL right outer join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary.Note
Both tables must be bound to the same
TableEnvironment
and itsTableConfig
must have null check enabled (default).Example:
>>> left.right_outer_join(right, left.a == right.b) >>> left.right_outer_join(right, "a = b")
- Parameters
right – Right table.
join_predicate – The join predicate expression string.
- Returns
The result table.
-
select
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Performs a selection operation. Similar to a SQL SELECT statement. The field expressions can contain complex expressions.
Example:
>>> from pyflink.table import expressions as expr >>> tab.select(tab.key, expr.concat(tab.value, 'hello')) >>> tab.select(expr.col('key'), expr.concat(expr.col('value'), 'hello')) >>> tab.select("key, value + 'hello'")
- Returns
The result table.
-
to_pandas
()[source]¶ Converts the table to a pandas DataFrame. It will collect the content of the table to the client side and so please make sure that the content of the table could fit in memory before calling this method.
Example:
>>> pdf = pd.DataFrame(np.random.rand(1000, 2)) >>> table = table_env.from_pandas(pdf, ["a", "b"]) >>> table.filter(table.a > 0.5).to_pandas()
- Returns
the result pandas DataFrame.
New in version 1.11.0.
-
union
(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶ Unions two
Table
with duplicate records removed. Similar to a SQL UNION. The fields of the two union operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.union(right)
- Parameters
right – Right table.
- Returns
The result table.
-
union_all
(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶ Unions two
Table
. Similar to a SQL UNION ALL. The fields of the two union operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.union_all(right)
- Parameters
right – Right table.
- Returns
The result table.
-
where
(predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶ Filters out elements that don’t pass the filter predicate. Similar to a SQL WHERE clause.
Example:
>>> tab.where(tab.name == 'Fred') >>> tab.where("name = 'Fred'")
- Parameters
predicate – Predicate expression string.
- Returns
The result table.
-
window
(window: pyflink.table.window.GroupWindow) → pyflink.table.table.GroupWindowedTable[source]¶ Defines group window on the records of a table.
A group window groups the records of a table by assigning them to windows defined by a time or row interval.
For streaming tables of infinite size, grouping into windows is required to define finite groups on which group-based aggregates can be computed.
For batch tables of finite size, windowing essentially provides shortcuts for time-based groupBy.
Note
Computing windowed aggregates on a streaming table is only a parallel operation if additional grouping attributes are added to the
group_by()
clause. If thegroup_by()
only references a GroupWindow alias, the streamed table will be processed by a single task, i.e., with parallelism 1.Example:
>>> from pyflink.table import expressions as expr >>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \ ... .group_by(col('w')) \ ... .select(tab.a.sum.alias('a'), ... col('w').start.alias('b'), ... col('w').end.alias('c'), ... col('w').rowtime.alias('d'))
- Parameters
window – A
GroupWindow
created fromTumble
,Session
orSlide
.- Returns
A group windowed table.
-
-
class
pyflink.table.
StatementSet
(_j_statement_set, t_env)[source]¶ Bases:
object
A StatementSet accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.
Note
The added statements and Tables will be cleared when calling the execute method.
New in version 1.11.0.
-
add_insert
(target_path_or_descriptor: Union[str, pyflink.table.table_descriptor.TableDescriptor], table, overwrite: bool = False) → pyflink.table.statement_set.StatementSet[source]¶ Adds a statement that the pipeline defined by the given Table object should be written to a table (backed by a DynamicTableSink) that was registered under the specified path or expressed via the given TableDescriptor.
When target_path_or_descriptor is a tale path:
See the documentation of
use_database()
oruse_catalog()
for the rules on the path resolution.When target_path_or_descriptor is a table descriptor:
The given TableDescriptor is registered as an inline (i.e. anonymous) temporary catalog table (see
create_temporary_table()
).Then a statement is added to the statement set that inserts the Table object’s pipeline into that temporary table.
This method allows to declare a Schema for the sink descriptor. The declaration is similar to a {@code CREATE TABLE} DDL in SQL and allows to:
overwrite automatically derived columns with a custom DataType
add metadata columns next to the physical columns
declare a primary key
It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.
Examples:
>>> stmt_set = table_env.create_statement_set() >>> source_table = table_env.from_path("SourceTable") >>> sink_descriptor = TableDescriptor.for_connector("blackhole") ... .schema(Schema.new_builder() ... .build()) ... .build() >>> stmt_set.add_insert(sink_descriptor, source_table)
Note
add_insert for a table descriptor (case 2.) was added from flink 1.14.0.
- Parameters
target_path_or_descriptor – The path of the registered
TableSink
or the descriptor describing the sink table into which data should be inserted to which theTable
is written.table (pyflink.table.Table) – The Table to add.
overwrite – Indicates whether the insert should overwrite existing data or not.
- Returns
current StatementSet instance.
New in version 1.11.0.
-
add_insert_sql
(stmt: str) → pyflink.table.statement_set.StatementSet[source]¶ add insert statement to the set.
- Parameters
stmt – The statement to be added.
- Returns
current StatementSet instance.
New in version 1.11.0.
-
execute
() → pyflink.table.table_result.TableResult[source]¶ execute all statements and Tables as a batch.
Note
The added statements and Tables will be cleared when executing this method.
- Returns
execution result.
New in version 1.11.0.
-
explain
(*extra_details: pyflink.table.explain_detail.ExplainDetail) → str[source]¶ returns the AST and the execution plan of all statements and Tables.
- Parameters
extra_details – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming
- Returns
All statements and Tables for which the AST and execution plan will be returned.
New in version 1.11.0.
-
-
class
pyflink.table.
EnvironmentSettings
(j_environment_settings)[source]¶ Bases:
object
Defines all parameters that initialize a table environment. Those parameters are used only during instantiation of a
TableEnvironment
and cannot be changed afterwards.Example:
>>> EnvironmentSettings.new_instance() \ ... .in_streaming_mode() \ ... .with_built_in_catalog_name("my_catalog") \ ... .with_built_in_database_name("my_database") \ ... .build()
EnvironmentSettings.in_streaming_mode()
orEnvironmentSettings.in_batch_mode()
might be convenient as shortcuts.-
class
Builder
[source]¶ Bases:
object
A builder for
EnvironmentSettings
.-
build
() → pyflink.table.environment_settings.EnvironmentSettings[source]¶ Returns an immutable instance of EnvironmentSettings.
- Returns
an immutable instance of EnvironmentSettings.
-
in_batch_mode
() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Sets that the components should work in a batch mode. Streaming mode by default.
- Returns
This object.
-
in_streaming_mode
() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Sets that the components should work in a streaming mode. Enabled by default.
- Returns
This object.
-
use_any_planner
() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Does not set a planner requirement explicitly.
A planner will be discovered automatically, if there is only one planner available.
By default,
use_blink_planner()
is enabled.Note
The old planner has been removed in Flink 1.14. Since there is only one planner left (previously called the ‘blink’ planner), this setting is obsolete and will be removed in future versions.
- Returns
This object.
-
use_blink_planner
() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Sets the Blink planner as the required module.
This is the default behavior.
Note
The old planner has been removed in Flink 1.14. Since there is only one planner left (previously called the ‘blink’ planner), this setting is obsolete and will be removed in future versions.
- Returns
This object.
-
use_old_planner
() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Note
The old planner has been removed in Flink 1.14. Since there is only one planner left (previously called the ‘blink’ planner), this setting will throw an exception.
-
with_built_in_catalog_name
(built_in_catalog_name: str) → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Specifies the name of the initial catalog to be created when instantiating a
TableEnvironment
.This catalog is an in-memory catalog that will be used to store all temporary objects (e.g. from
create_temporary_view()
orcreate_temporary_system_function()
) that cannot be persisted because they have no serializable representation.It will also be the initial value for the current catalog which can be altered via
use_catalog()
.Default: “default_catalog”.
- Parameters
built_in_catalog_name – The specified built-in catalog name.
- Returns
This object.
-
with_built_in_database_name
(built_in_database_name: str) → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Specifies the name of the default database in the initial catalog to be created when instantiating a
TableEnvironment
.This database is an in-memory database that will be used to store all temporary objects (e.g. from
create_temporary_view()
orcreate_temporary_system_function()
) that cannot be persisted because they have no serializable representation.It will also be the initial value for the current catalog which can be altered via
use_catalog()
.Default: “default_database”.
- Parameters
built_in_database_name – The specified built-in database name.
- Returns
This object.
-
-
static
from_configuration
(config: pyflink.common.configuration.Configuration) → pyflink.table.environment_settings.EnvironmentSettings[source]¶ Creates the EnvironmentSetting with specified Configuration.
- Returns
EnvironmentSettings.
-
get_built_in_catalog_name
() → str[source]¶ Gets the specified name of the initial catalog to be created when instantiating a
TableEnvironment
.- Returns
The specified name of the initial catalog to be created.
-
get_built_in_database_name
() → str[source]¶ Gets the specified name of the default database in the initial catalog to be created when instantiating a
TableEnvironment
.- Returns
The specified name of the default database in the initial catalog to be created.
-
static
in_batch_mode
() → pyflink.table.environment_settings.EnvironmentSettings[source]¶ Creates a default instance of EnvironmentSettings in batch execution mode.
This mode is highly optimized for batch scenarios. Only bounded data streams can be processed in this mode.
This method is a shortcut for creating a
TableEnvironment
with little code. Use the builder provided inEnvironmentSettings.new_instance()
for advanced settings.- Returns
EnvironmentSettings.
-
static
in_streaming_mode
() → pyflink.table.environment_settings.EnvironmentSettings[source]¶ Creates a default instance of EnvironmentSettings in streaming execution mode.
In this mode, both bounded and unbounded data streams can be processed.
This method is a shortcut for creating a
TableEnvironment
with little code. Use the builder provided inEnvironmentSettings.new_instance()
for advanced settings.- Returns
EnvironmentSettings.
-
is_blink_planner
() → bool[source]¶ Tells if
TableEnvironment
should work in a blink or old planner.Note
The old planner has been removed in Flink 1.14. Since there is only one planner left (previously called the ‘blink’ planner), this method is obsolete and will be removed in future versions.
- Returns
True if the TableEnvironment should work in a blink planner, false otherwise.
-
is_streaming_mode
() → bool[source]¶ Tells if the
TableEnvironment
should work in a batch or streaming mode.- Returns
True if the TableEnvironment should work in a streaming mode, false otherwise.
-
static
new_instance
() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶ Creates a builder for creating an instance of EnvironmentSettings.
By default, it does not specify a required planner and will use the one that is available on the classpath via discovery.
- Returns
A builder of EnvironmentSettings.
-
class
-
class
pyflink.table.
TableConfig
(j_table_config=None)[source]¶ Bases:
object
Configuration for the current
TableEnvironment
session to adjust Table & SQL API programs.For common or important configuration options, this class provides getters and setters methods with detailed inline documentation.
For more advanced configuration, users can directly access the underlying key-value map via
get_configuration()
.Note
Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment.
-
add_configuration
(configuration: pyflink.common.configuration.Configuration)[source]¶ Adds the given key-value configuration to the underlying configuration. It overwrites existing keys.
- Parameters
configuration – Key-value configuration to be added.
-
get_configuration
() → pyflink.common.configuration.Configuration[source]¶ Gives direct access to the underlying key-value map for advanced configuration.
- Returns
Entire key-value configuration.
-
get_decimal_context
() → Tuple[int, str][source]¶ Returns current context for decimal division calculation, (precision=34, rounding_mode=HALF_EVEN) by default.
See also
- Returns
the current context for decimal division calculation.
-
static
get_default
() → pyflink.table.table_config.TableConfig[source]¶ - Returns
A TableConfig object with default settings.
-
get_idle_state_retention
() → datetime.timedelta[source]¶ - Returns
The duration until state which was not updated will be retained.
-
get_local_timezone
() → str[source]¶ Returns the local timezone id for timestamp with local time zone, either an abbreviation such as “PST”, a full name such as “America/Los_Angeles”, or a custom timezone_id such as “GMT-08:00”.
-
get_max_generated_code_length
() → int[source]¶ The current threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default is 64000.
-
get_max_idle_state_retention_time
() → int[source]¶ State will be cleared and removed if it was not updated for the defined period of time.
Note
Currently the concept of min/max idle state retention has been deprecated and only idle state retention time is supported. The min idle state retention is regarded as idle state retention and the max idle state retention is derived from idle state retention as 1.5 x idle state retention.
- Returns
The maximum time until state which was not updated will be retained.
-
get_min_idle_state_retention_time
() → int[source]¶ State might be cleared and removed if it was not updated for the defined period of time.
Note
Currently the concept of min/max idle state retention has been deprecated and only idle state retention time is supported. The min idle state retention is regarded as idle state retention and the max idle state retention is derived from idle state retention as 1.5 x idle state retention.
- Returns
The minimum time until state which was not updated will be retained.
-
get_null_check
() → bool[source]¶ A boolean value, “True” enables NULL check and “False” disables NULL check.
-
get_python_executable
() → str[source]¶ Gets the path of the python interpreter which is used to execute the python udf workers. If no path is specified before, it will return a None value.
- Returns
The path of the python interpreter which is used to execute the python udf workers.
New in version 1.10.0.
-
set_decimal_context
(precision: int, rounding_mode: str)[source]¶ Sets the default context for decimal division calculation. (precision=34, rounding_mode=HALF_EVEN) by default.
The precision is the number of digits to be used for an operation. A value of 0 indicates that unlimited precision (as many digits as are required) will be used. Note that leading zeros (in the coefficient of a number) are never significant.
The rounding mode is the rounding algorithm to be used for an operation. It could be:
UP, DOWN, CEILING, FLOOR, HALF_UP, HALF_DOWN, HALF_EVEN, UNNECESSARY
The table below shows the results of rounding input to one digit with the given rounding mode:
Input
UP
DOWN
CEILING
FLOOR
HALF_UP
HALF_DOWN
HALF_EVEN
UNNECESSARY
5.5
6
5
6
5
6
5
6
Exception
2.5
3
2
3
2
3
2
2
Exception
1.6
2
1
2
1
2
2
2
Exception
1.1
2
1
2
1
1
1
1
Exception
1.0
1
1
1
1
1
1
1
1
-1.0
-1
-1
-1
-1
-1
-1
-1
-1
-1.1
-2
-1
-1
-2
-1
-1
-1
Exception
-1.6
-2
-1
-1
-2
-2
-2
-2
Exception
2.5
-3
-2
-2
-3
-3
-2
-2
Exception
5.5
-6
-5
-5
-6
-6
-5
-6
Exception
- Parameters
precision – The precision of the decimal context.
rounding_mode – The rounding mode of the decimal context.
-
set_idle_state_retention
(duration: datetime.timedelta)[source]¶ Specifies a retention time interval for how long idle state, i.e., state which was not updated, will be retained.
State will never be cleared until it was idle for less than the duration and will never be kept if it was idle for more than the 1.5 x duration.
When new data arrives for previously cleaned-up state, the new data will be handled as if it was the first data. This can result in previous results being overwritten.
Set to 0 (zero) to never clean-up the state.
Example:
>>> table_config = TableConfig() \ ... .set_idle_state_retention(datetime.timedelta(days=1))
Note
Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of minTime and maxTime. The difference between minTime and maxTime must be at least 5 minutes.
- Parameters
duration – The retention time interval for which idle state is retained. Set to 0 (zero) to never clean-up the state.
-
set_idle_state_retention_time
(min_time: datetime.timedelta, max_time: datetime.timedelta)[source]¶ Specifies a minimum and a maximum time interval for how long idle state, i.e., state which was not updated, will be retained.
State will never be cleared until it was idle for less than the minimum time and will never be kept if it was idle for more than the maximum time.
When new data arrives for previously cleaned-up state, the new data will be handled as if it was the first data. This can result in previous results being overwritten.
Set to 0 (zero) to never clean-up the state.
Example:
>>> table_config = TableConfig() \ ... .set_idle_state_retention_time(datetime.timedelta(days=1), ... datetime.timedelta(days=3))
Note
Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of minTime and maxTime. The difference between minTime and maxTime must be at least 5 minutes.
Method set_idle_state_retention_time is deprecated now. The suggested way to set idle state retention time is
set_idle_state_retention()
Currently, setting max_time will not work and the max_time is directly derived from the min_time as 1.5 x min_time.- Parameters
min_time – The minimum time interval for which idle state is retained. Set to 0 (zero) to never clean-up the state.
max_time – The maximum time interval for which idle state is retained. Must be at least 5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
-
set_local_timezone
(timezone_id: str)[source]¶ Sets the local timezone id for timestamp with local time zone.
- Parameters
timezone_id – The timezone id, either an abbreviation such as “PST”, a full name such as “America/Los_Angeles”, or a custom timezone_id such as “GMT-08:00”.
-
set_max_generated_code_length
(max_generated_code_length: int)[source]¶ Returns the current threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default is 64000.
-
set_null_check
(null_check: bool)[source]¶ Sets the NULL check. If enabled, all fields need to be checked for NULL first.
-
set_python_executable
(python_exec: str)[source]¶ Sets the path of the python interpreter which is used to execute the python udf workers.
e.g. “/usr/local/bin/python3”.
If python UDF depends on a specific python version which does not exist in the cluster, the method
pyflink.table.TableEnvironment.add_python_archive()
can be used to upload a virtual environment. The path of the python interpreter contained in the uploaded environment can be specified via this method.Example:
# command executed in shell # assume that the relative path of python interpreter is py_env/bin/python $ zip -r py_env.zip py_env # python code >>> table_env.add_python_archive("py_env.zip") >>> table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
Note
Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.6 or higher.
Note
The python udf worker depends on Apache Beam (version == 2.27.0). Please ensure that the specified environment meets the above requirements.
- Parameters
python_exec – The path of python interpreter.
New in version 1.10.0.
-
-
class
pyflink.table.
GroupedTable
(java_table, t_env)[source]¶ Bases:
object
A table that has been grouped on a set of grouping keys.
-
aggregate
(func: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedAggregateFunctionWrapper]) → pyflink.table.table.AggregatedTable[source]¶ Performs a aggregate operation with an aggregate function. You have to close the aggregate with a select statement.
Example:
>>> agg = udaf(lambda a: (a.mean(), a.max()), ... result_type=DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c", "d")).select("a, c, d") >>> # take all the columns as inputs >>> # pd is a Pandas.DataFrame >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()), ... result_type=DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
- Parameters
func – user-defined aggregate function.
- Returns
The result table.
New in version 1.13.0.
-
flat_aggregate
(func: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedAggregateFunctionWrapper]) → pyflink.table.table.FlatAggregateTable[source]¶ Performs a flat_aggregate operation on a grouped table. flat_aggregate takes a
TableAggregateFunction
which returns multiple rows. Use a selection after flatAggregate.Example:
>>> table_agg = udtaf(MyTableAggregateFunction()) >>> tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select("c, a") >>> # take all the columns as inputs >>> class Top2(TableAggregateFunction): ... def emit_value(self, accumulator): ... yield Row(accumulator[0]) ... yield Row(accumulator[1]) ... ... def create_accumulator(self): ... return [None, None] ... ... def accumulate(self, accumulator, *args): ... args[0] # type: Row ... if args[0][0] is not None: ... if accumulator[0] is None or args[0][0] > accumulator[0]: ... accumulator[1] = accumulator[0] ... accumulator[0] = args[0][0] ... elif accumulator[1] is None or args[0][0] > accumulator[1]: ... accumulator[1] = args[0][0] ... ... def get_accumulator_type(self): ... return DataTypes.ARRAY(DataTypes.BIGINT()) ... ... def get_result_type(self): ... return DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) >>> top2 = udtaf(Top2()) >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", "b")).select("a, b")
- Parameters
func – user-defined table aggregate function.
- Returns
The result table.
New in version 1.13.0.
-
select
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.
Example:
>>> tab.group_by(tab.key).select(tab.key, tab.value.avg.alias('average')) >>> tab.group_by("key").select("key, value.avg as average")
- Parameters
fields – Expression string that contains group keys and aggregate function calls.
- Returns
The result table.
-
-
class
pyflink.table.
GroupWindowedTable
(java_group_windowed_table, t_env)[source]¶ Bases:
object
A table that has been windowed for
GroupWindow
.-
group_by
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.WindowGroupedTable[source]¶ Groups the elements by a mandatory window and one or more optional grouping attributes. The window is specified by referring to its alias.
If no additional grouping attribute is specified and if the input is a streaming table, the aggregation will be performed by a single task, i.e., with parallelism 1.
Aggregations are performed per group and defined by a subsequent
select()
clause similar to SQL SELECT-GROUP-BY query.Example:
>>> from pyflink.table import expressions as expr >>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \ ... .group_by(col('w')) \ ... .select(tab.a.sum.alias('a'), ... col('w').start.alias('b'), ... col('w').end.alias('c'), ... col('w').rowtime.alias('d'))
- Parameters
fields – Group keys.
- Returns
A window grouped table.
-
-
class
pyflink.table.
OverWindowedTable
(java_over_windowed_table, t_env)[source]¶ Bases:
object
A table that has been windowed for
OverWindow
.Unlike group windows, which are specified in the GROUP BY clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows.
-
select
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Performs a selection operation on a over windowed table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.
Example:
>>> over_windowed_table.select(col('c'), ... col('b').count.over(col('ow')), ... col('e').sum.over(col('ow'))) >>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
- Parameters
fields – Expression string.
- Returns
The result table.
-
-
class
pyflink.table.
WindowGroupedTable
(java_window_grouped_table, t_env)[source]¶ Bases:
object
A table that has been windowed and grouped for
GroupWindow
.-
aggregate
(func: Union[str, pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedAggregateFunctionWrapper]) → pyflink.table.table.AggregatedTable[source]¶ Performs an aggregate operation on a window grouped table. You have to close the aggregate with a select statement.
Example:
>>> agg = udaf(lambda a: (a.mean(), a.max()), ... result_type=DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") >>> window_grouped_table.group_by("w") ... .aggregate(agg(window_grouped_table.b) ... .alias("c", "d")) ... .select("c, d") >>> # take all the columns as inputs >>> # pd is a Pandas.DataFrame >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()), ... result_type=DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") >>> window_grouped_table.group_by("w, a").aggregate(agg_row)
- Parameters
func – user-defined aggregate function.
- Returns
The result table.
New in version 1.13.0.
-
select
(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶ Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.
Example:
>>> window_grouped_table.select(col('key'), ... col('window').start, ... col('value').avg.alias('valavg')) >>> window_grouped_table.select("key, window.start, value.avg as valavg")
- Parameters
fields – Expression string.
- Returns
The result table.
-
-
class
pyflink.table.
ScalarFunction
[source]¶ Bases:
pyflink.table.udf.UserDefinedFunction
Base interface for user-defined scalar function. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
New in version 1.10.0.
-
class
pyflink.table.
TableFunction
[source]¶ Bases:
pyflink.table.udf.UserDefinedFunction
Base interface for user-defined table function. A user-defined table function creates zero, one, or multiple rows to a new row value.
New in version 1.11.0.
-
class
pyflink.table.
AggregateFunction
[source]¶ Bases:
pyflink.table.udf.ImperativeAggregateFunction
Base interface for user-defined aggregate function. A user-defined aggregate function maps scalar values of multiple rows to a new scalar value.
New in version 1.12.0.
-
abstract
get_value
(accumulator: ACC) → T[source]¶ Called every time when an aggregation result should be materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.
- Parameters
accumulator – the accumulator which contains the current intermediate results
- Returns
the aggregation result
-
abstract
-
class
pyflink.table.
TableAggregateFunction
[source]¶ Bases:
pyflink.table.udf.ImperativeAggregateFunction
Base class for a user-defined table aggregate function. A user-defined table aggregate function maps scalar values of multiple rows to zero, one, or multiple rows (or structured types). If an output record consists of only one field, the structured record can be omitted, and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime.
New in version 1.13.0.
-
abstract
emit_value
(accumulator: ACC) → Iterable[T][source]¶ Called every time when an aggregation result should be materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.
- Parameters
accumulator – the accumulator which contains the current aggregated results.
- Returns
multiple aggregated result
-
abstract
-
class
pyflink.table.
FunctionContext
(base_metric_group)[source]¶ Bases:
object
Used to obtain global runtime information about the context in which the user-defined function is executed. The information includes the metric group, and global job parameters, etc.
-
class
pyflink.table.
DataView
[source]¶ Bases:
abc.ABC
A DataView is a collection type that can be used in the accumulator of an user defined
pyflink.table.AggregateFunction
. Depending on the context in which the function is used, a DataView can be backed by a normal collection or a state backend.
-
class
pyflink.table.
ListView
[source]¶ Bases:
pyflink.table.data_view.DataView
,typing.Generic
A
DataView
that provides list-like functionality in the accumulator of an AggregateFunction when large amounts of data are expected.
-
class
pyflink.table.
MapView
[source]¶ Bases:
typing.Generic
A
DataView
that provides dict-like functionality in the accumulator of an AggregateFunction when large amounts of data are expected.-
is_empty
() → bool[source]¶ Returns true if the map view contains no key-value mappings, otherwise false.
-
put
(key: K, value: V) → None[source]¶ Inserts a value for the given key into the map view. If the map view already contains a value for the key, the existing value is overwritten.
-
-
class
pyflink.table.
TableDescriptor
(j_table_descriptor)[source]¶ Bases:
object
Describes a CatalogTable representing a source or sink.
TableDescriptor is a template for creating a CatalogTable instance. It closely resembles the “CREATE TABLE” SQL DDL statement, containing schema, connector options, and other characteristics. Since tables in Flink are typically backed by external systems, the descriptor describes how a connector (and possibly its format) are configured.
This can be used to register a table in the Table API, see
create_temporary_table()
in TableEnvironment.-
class
Builder
(j_builder)[source]¶ Bases:
object
Builder for TableDescriptor.
-
build
() → pyflink.table.table_descriptor.TableDescriptor[source]¶ Returns an immutable instance of
TableDescriptor
.
-
comment
(comment: str) → pyflink.table.table_descriptor.TableDescriptor.Builder[source]¶ Define the comment for this table.
-
format
(format: Union[str, FormatDescriptor], format_option: pyflink.common.config_options.ConfigOption[str][str] = None) → pyflink.table.table_descriptor.TableDescriptor.Builder[source]¶ Defines the format to be used for this table.
Note that not every connector requires a format to be specified, while others may use multiple formats.
Example:
>>> TableDescriptor.for_connector("kafka") ... .format(FormatDescriptor.for_format("json") ... .option("ignore-parse-errors", "true") ... .build()) will result in the options: 'format' = 'json' 'json.ignore-parse-errors' = 'true'
-
option
(key: Union[str, pyflink.common.config_options.ConfigOption], value) → pyflink.table.table_descriptor.TableDescriptor.Builder[source]¶ Sets the given option on the table.
Option keys must be fully specified. When defining options for a Format, use format(FormatDescriptor) instead.
Example:
>>> TableDescriptor.for_connector("kafka") ... .option("scan.startup.mode", "latest-offset") ... .build()
-
-
class
-
class
pyflink.table.
FormatDescriptor
(j_format_descriptor)[source]¶ Bases:
object
Describes a Format and its options for use with
TableDescriptor
.Formats are responsible for encoding and decoding data in table connectors. Note that not every connector has a format, while others may have multiple formats (e.g. the Kafka connector has separate formats for keys and values). Common formats are “json”, “csv”, “avro”, etc.
-
class
Builder
(j_builder)[source]¶ Bases:
object
Builder for FormatDescriptor.
-
build
() → pyflink.table.table_descriptor.FormatDescriptor[source]¶ Returns an immutable instance of
FormatDescriptor
.
-
option
(key: Union[str, pyflink.common.config_options.ConfigOption], value) → pyflink.table.table_descriptor.FormatDescriptor.Builder[source]¶ Sets the given option on the format.
Note that format options must not be prefixed with the format identifier itself here.
Example:
>>> FormatDescriptor.for_format("json") ... .option("ignore-parse-errors", "true") ... .build() will automatically be converted into its prefixed form: 'format' = 'json' 'json.ignore-parse-errors' = 'true'
-
-
class
-
class
pyflink.table.
Schema
(j_schema)[source]¶ Bases:
object
Schema of a table or view.
A schema represents the schema part of a {@code CREATE TABLE (schema) WITH (options)} DDL statement in SQL. It defines columns of different kind, constraints, time attributes, and watermark strategies. It is possible to reference objects (such as functions or types) across different catalogs.
This class is used in the API and catalogs to define an unresolved schema that will be translated to ResolvedSchema. Some methods of this class perform basic validation, however, the main validation happens during the resolution. Thus, an unresolved schema can be incomplete and might be enriched or merged with a different schema at a later stage.
Since an instance of this class is unresolved, it should not be directly persisted. The str() shows only a summary of the contained objects.
-
class
Builder
(j_builder)[source]¶ Bases:
object
A builder for constructing an immutable but still unresolved Schema.
-
column
(column_name: str, data_type: Union[str, pyflink.table.types.DataType]) → pyflink.table.schema.Schema.Builder[source]¶ Declares a physical column that is appended to this schema.
Physical columns are regular columns known from databases. They define the names, the types, and the order of fields in the physical data. Thus, physical columns represent the payload that is read from and written to an external system. Connectors and formats use these columns (in the defined order) to configure themselves. Other kinds of columns can be declared between physical columns but will not influence the final physical schema.
- Parameters
column_name – Column name
data_type – Data type of the column
-
column_by_expression
(column_name: str, expr: Union[str, pyflink.table.expression.Expression]) → pyflink.table.schema.Schema.Builder[source]¶ Declares a computed column that is appended to this schema.
Computed columns are virtual columns that are generated by evaluating an expression that can reference other columns declared in the same table. Both physical columns and metadata columns can be accessed. The column itself is not physically stored within the table. The column’s data type is derived automatically from the given expression and does not have to be declared manually.
Computed columns are commonly used for defining time attributes. For example, the computed column can be used if the original field is not TIMESTAMP(3) type or is nested in a JSON string.
Example:
>>> Schema.new_builder(). ... column_by_expression("ts", "orig_ts - INTERVAL '60' MINUTE"). ... column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp")
- Parameters
column_name – Column name
expr – Computation of the column
-
column_by_metadata
(column_name: str, data_type: Union[pyflink.table.types.DataType, str], metadata_key: str = None, is_virtual: bool = False) → pyflink.table.schema.Schema.Builder[source]¶ Declares a metadata column that is appended to this schema.
Metadata columns allow to access connector and/or format specific fields for every row of a table. For example, a metadata column can be used to read and write the timestamp from and to Kafka records for time-based operations. The connector and format documentation lists the available metadata fields for every component.
Every metadata field is identified by a string-based key and has a documented data type. The metadata key can be omitted if the column name should be used as the identifying metadata key. For convenience, the runtime will perform an explicit cast if the data type of the column differs from the data type of the metadata field. Of course, this requires that the two data types are compatible.
By default, a metadata column can be used for both reading and writing. However, in many cases an external system provides more read-only metadata fields than writable fields. Therefore, it is possible to exclude metadata columns from persisting by setting the {@code is_virtual} flag to {@code true}.
- Parameters
column_name – Column name
data_type – Data type of the column
metadata_key – Identifying metadata key, if null the column name will be used as metadata key
is_virtual – Whether the column should be persisted or not
-
from_fields
(field_names: List[str], field_data_types: List[pyflink.table.types.DataType]) → pyflink.table.schema.Schema.Builder[source]¶ Adopts the given field names and field data types as physical columns of the schema.
-
from_row_data_type
(data_type: pyflink.table.types.DataType) → pyflink.table.schema.Schema.Builder[source]¶ Adopts all fields of the given row as physical columns of the schema.
-
from_schema
(unresolved_schema: pyflink.table.schema.Schema) → pyflink.table.schema.Schema.Builder[source]¶ Adopts all members from the given unresolved schema.
-
primary_key
(*column_names: str) → pyflink.table.schema.Schema.Builder[source]¶ Declares a primary key constraint for a set of given columns. Primary key uniquely identify a row in a table. Neither of columns in a primary can be nullable. The primary key is informational only. It will not be enforced. It can be used for optimizations. It is the data owner’s responsibility to ensure uniqueness of the data.
The primary key will be assigned a generated name in the format {@code PK_col1_col2}.
- Parameters
column_names – Columns that form a unique primary key
-
primary_key_named
(constraint_name: str, *column_names: str) → pyflink.table.schema.Schema.Builder[source]¶ Declares a primary key constraint for a set of given columns. Primary key uniquely identify a row in a table. Neither of columns in a primary can be nullable. The primary key is informational only. It will not be enforced. It can be used for optimizations. It is the data owner’s responsibility to ensure uniqueness of the data.
- Parameters
constraint_name – Name for the primary key, can be used to reference the constraint
column_names – Columns that form a unique primary key
-
watermark
(column_name: str, watermark_expr: Union[str, pyflink.table.expression.Expression]) → pyflink.table.schema.Schema.Builder[source]¶ Declares that the given column should serve as an event-time (i.e. rowtime) attribute and specifies a corresponding watermark strategy as an expression.
The column must be of type {@code TIMESTAMP(3)} or {@code TIMESTAMP_LTZ(3)} and be a top-level column in the schema. It may be a computed column.
The watermark generation expression is evaluated by the framework for every record during runtime. The framework will periodically emit the largest generated watermark. If the current watermark is still identical to the previous one, or is null, or the value of the returned watermark is smaller than that of the last emitted one, then no new watermark will be emitted. A watermark is emitted in an interval defined by the configuration.
Any scalar expression can be used for declaring a watermark strategy for in-memory/temporary tables. However, currently, only SQL expressions can be persisted in a catalog. The expression’s return data type must be {@code TIMESTAMP(3)}. User-defined functions (also defined in different catalogs) are supported.
Example:
>>> Schema.new_builder().watermark("ts", "ts - INTERVAL '5' SECOND")
- Parameters
column_name – The column name used as a rowtime attribute
watermark_expr – The expression used for watermark generation
-
-
class
-
class
pyflink.table.
Module
(j_module)[source]¶ Bases:
object
Modules define a set of metadata, including functions, user defined types, operators, rules, etc. Metadata from modules are regarded as built-in or system metadata that users can take advantages of.
New in version 1.12.0.
-
class
pyflink.table.
ModuleEntry
(name: str, used: bool, j_module_entry=None)[source]¶ Bases:
object
A POJO to represent a module’s name and use status.
-
class
pyflink.table.
SqlDialect
[source]¶ Bases:
object
Enumeration of valid SQL compatibility modes.
In most of the cases, the built-in compatibility mode should be sufficient. For some features, i.e. the “INSERT INTO T PARTITION(a=’xxx’) …” grammar, you may need to switch to the Hive dialect if required.
We may introduce other SQL dialects in the future.
Flink’s default SQL behavior.
HIVE
:SQL dialect that allows some Apache Hive specific grammar.
Note: We might never support all of the Hive grammar. See the documentation for supported features.
-
DEFAULT
= 0¶
-
HIVE
= 1¶
-
-
class
pyflink.table.
DataTypes
[source]¶ Bases:
object
A
DataType
can be used to declare input and/or output types of operations. This class enumerates all supported data types of the Table & SQL API.-
static
ARRAY
(element_type: pyflink.table.types.DataType, nullable: bool = True) → pyflink.table.types.ArrayType[source]¶ Data type of an array of elements with same subtype.
Compared to the SQL standard, the maximum cardinality of an array cannot be specified but is fixed at 2147483647(0x7fffffff). Also, any valid type is supported as a subtype.
- Parameters
element_type –
DataType
of each element in the array.nullable – boolean, whether the type can be null (None) or not.
-
static
BIGINT
(nullable: bool = True) → pyflink.table.types.BigIntType[source]¶ Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
BINARY
(length: int, nullable: bool = True) → pyflink.table.types.BinaryType[source]¶ Data type of a fixed-length binary string (=a sequence of bytes).
- Parameters
length – int, the number of bytes. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
BinaryType is still not supported yet.
-
static
BOOLEAN
(nullable: bool = True) → pyflink.table.types.BooleanType[source]¶ Data type of a boolean with a (possibly) three-valued logic of TRUE, FALSE, UNKNOWN.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
BYTES
(nullable: bool = True) → pyflink.table.types.VarBinaryType[source]¶ Data type of a variable-length binary string (=a sequence of bytes) with defined maximum length. This is a shortcut for
DataTypes.VARBINARY(2147483647)
.- Parameters
nullable – boolean, whether the type can be null (None) or not.
See also
-
static
CHAR
(length: int, nullable: bool = True) → pyflink.table.types.CharType[source]¶ Data type of a fixed-length character string.
- Parameters
length – int, the string representation length. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
CharType is still not supported yet.
-
static
DATE
(nullable: bool = True) → pyflink.table.types.DateType[source]¶ Data type of a date consisting of year-month-day with values ranging from
0000-01-01
to9999-12-31
.Compared to the SQL standard, the range starts at year 0000.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
DAY
(precision: int = 2) → pyflink.table.types.Resolution[source]¶ Resolution in days.
- Parameters
precision – int, the number of digits of days. It must have a value between 1 and 6 (both inclusive), (default: 2).
- Returns
the specified
Resolution
.
See also
-
static
DECIMAL
(precision: int, scale: int, nullable: bool = True) → pyflink.table.types.DecimalType[source]¶ Data type of a decimal number with fixed precision and scale.
- Parameters
precision – the number of digits in a number. It must have a value between 1 and 38 (both inclusive).
scale – the number of digits on right side of dot. It must have a value between 0 and precision (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The precision must be 38 and the scale must be 18 currently.
-
static
DOUBLE
(nullable: bool = True) → pyflink.table.types.DoubleType[source]¶ Data type of an 8-byte double precision floating point number.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
FIELD
(name: str, data_type: pyflink.table.types.DataType, description: str = None) → pyflink.table.types.RowField[source]¶ Field definition with field name, data type, and a description.
- Parameters
name – string, name of the field.
data_type –
DataType
of the field.description – string, description of the field.
-
static
FLOAT
(nullable: bool = True) → pyflink.table.types.FloatType[source]¶ Data type of a 4-byte single precision floating point number.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
HOUR
() → pyflink.table.types.Resolution[source]¶ Resolution in hours.
- Returns
Resolution
See also
-
static
INT
(nullable: bool = True) → pyflink.table.types.IntType[source]¶ Data type of a 2-byte signed integer with values from -2,147,483,648 to 2,147,483,647.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
INTERVAL
(upper_resolution: pyflink.table.types.Resolution, lower_resolution: pyflink.table.types.Resolution = None) → Union[pyflink.table.types.DayTimeIntervalType, pyflink.table.types.YearMonthIntervalType][source]¶ Data type of a temporal interval. There are two types of temporal intervals: day-time intervals with up to nanosecond granularity or year-month intervals with up to month granularity.
An interval of day-time consists of
+days hours:months:seconds.fractional
with values ranging from-999999 23:59:59.999999999
to+999999 23:59:59.999999999
. The type must be parameterized to one of the following resolutions: interval of days, interval of days to hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds, or interval of seconds. The value representation is the same for all types of resolutions. For example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds format (with default precisions):+00 00:01:10.000000
.An interval of year-month consists of
+years-months
with values ranging from-9999-11
to+9999-11
. The type must be parameterized to one of the following resolutions: interval of years, interval of years to months, or interval of months. The value representation is the same for all types of resolutions. For example, an interval of months of 50 is always represented in an interval-of-years-to-months format (with default year precision):+04-02
.Examples:
INTERVAL(DAY(2), SECOND(9))
for a day-time interval orINTERVAL(YEAR(4), MONTH())
for a year-month interval.- Parameters
upper_resolution –
Resolution
, the upper resolution of the interval.lower_resolution –
Resolution
, the lower resolution of the interval.
Note
the upper_resolution must be MONTH for YearMonthIntervalType, SECOND for DayTimeIntervalType and the lower_resolution must be None currently.
See also
See also
See also
See also
See also
See also
-
static
LIST_VIEW
(element_type: pyflink.table.types.DataType) → pyflink.table.types.ListViewType[source]¶ Data type of a
pyflink.table.data_view.ListView
.It can only be used in accumulator type declaration of an Aggregate Function.
- Parameters
element_type –
DataType
of each element in the list view.
-
static
MAP
(key_type: pyflink.table.types.DataType, value_type: pyflink.table.types.DataType, nullable: bool = True) → pyflink.table.types.MapType[source]¶ Data type of an associative array that maps keys to values. A map cannot contain duplicate keys; each key can map to at most one value.
There is no restriction of key types; it is the responsibility of the user to ensure uniqueness. The map type is an extension to the SQL standard.
- Parameters
key_type –
DataType
of the keys in the map.value_type –
DataType
of the values in the map.nullable – boolean, whether the type can be null (None) or not.
-
static
MAP_VIEW
(key_type: pyflink.table.types.DataType, value_type: pyflink.table.types.DataType) → pyflink.table.types.MapViewType[source]¶ Data type of a
pyflink.table.data_view.ListView
.It can only be used in accumulator type declaration of an Aggregate Function.
- Parameters
key_type –
DataType
of the keys in the map view.value_type –
DataType
of the values in the map view.
-
static
MINUTE
() → pyflink.table.types.Resolution[source]¶ Resolution in minutes.
- Returns
the specified
Resolution
.
See also
-
static
MONTH
() → pyflink.table.types.Resolution[source]¶ Resolution in months.
- Returns
the specified
Resolution
.
See also
-
static
MULTISET
(element_type: pyflink.table.types.DataType, nullable: bool = True) → pyflink.table.types.MultisetType[source]¶ Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its elements with a common subtype. Each unique value is mapped to some multiplicity.
There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.
- Parameters
element_type –
DataType
of each element in the multiset.nullable – boolean, whether the type can be null (None) or not.
-
static
NULL
() → pyflink.table.types.NullType[source]¶ Data type for representing untyped null (None) values. A null type has no other value except null (None), thus, it can be cast to any nullable type.
This type helps in representing unknown types in API calls that use a null (None) literal as well as bridging to formats such as JSON or Avro that define such a type as well.
The null type is an extension to the SQL standard.
Note
NullType is still not supported yet.
-
static
ROW
(row_fields: List = [], nullable: bool = True) → pyflink.table.types.RowType[source]¶ Data type of a sequence of fields. A field consists of a field name, field type, and an optional description. The most specific type of a row of a table is a row type. In this case, each column of the row corresponds to the field of the row type that has the same ordinal position as the column.
Compared to the SQL standard, an optional field description simplifies the handling with complex structures.
- Parameters
row_fields – a list of row field types which can be created via
DataTypes.FIELD()
.nullable – boolean, whether the type can be null (None) or not.
-
static
SECOND
(precision: int = 6) → pyflink.table.types.Resolution[source]¶ Resolution in seconds and (possibly) fractional seconds.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive), (default: 6).
- Returns
the specified
Resolution
.
Note
the precision must be 3 currently.
See also
-
static
SMALLINT
(nullable: bool = True) → pyflink.table.types.SmallIntType[source]¶ Data type of a 2-byte signed integer with values from -32,768 to 32,767.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
STRING
(nullable: bool = True) → pyflink.table.types.VarCharType[source]¶ Data type of a variable-length character string with defined maximum length. This is a shortcut for
DataTypes.VARCHAR(2147483647)
.- Parameters
nullable – boolean, whether the type can be null (None) or not.
See also
-
static
TIME
(precision: int = 0, nullable: bool = True) → pyflink.table.types.TimeType[source]¶ Data type of a time WITHOUT time zone.
An instance consists of hour:minute:second[.fractional with up to nanosecond precision and values ranging from
00:00:00.000000000
to23:59:59.999999999
.Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The precision must be 0 currently.
-
static
TIMESTAMP
(precision: int = 6, nullable: bool = True) → pyflink.table.types.TimestampType[source]¶ Data type of a timestamp WITHOUT time zone.
An instance consists of year-month-day hour:minute:second[.fractional with up to nanosecond precision and values ranging from
0000-01-01 00:00:00.000000000
to9999-12-31 23:59:59.999999999
.Compared to the SQL standard, leap seconds (
23:59:60
and23:59:61
) are not supported.This class does not store or represent a time-zone. Instead, it is a description of the date, as used for birthdays, combined with the local time as seen on a wall clock. It cannot represent an instant on the time-line without additional information such as an offset or time-zone.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). (default: 6)
nullable – boolean, whether the type can be null (None) or not.
Note
The precision must be 3 currently.
-
static
TIMESTAMP_LTZ
(precision: int = 6, nullable: bool = True) → pyflink.table.types.LocalZonedTimestampType[source]¶ Data type of a timestamp WITH LOCAL time zone. This is a shortcut for
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision, nullable)
.- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). (default: 6)
nullable – boolean, whether the type can be null (None) or not.
See also
TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision, nullable)()
-
static
TIMESTAMP_WITH_LOCAL_TIME_ZONE
(precision: int = 6, nullable: bool = True) → pyflink.table.types.LocalZonedTimestampType[source]¶ Data type of a timestamp WITH LOCAL time zone.
An instance consists of year-month-day hour:minute:second[.fractional with up to nanosecond precision and values ranging from
0000-01-01 00:00:00.000000000 +14:59
to9999-12-31 23:59:59.999999999 -14:59
.Compared to the SQL standard, leap seconds (
23:59:60
and23:59:61
) are not supported.The value will be stored internally as a long value which stores all date and time fields, to a precision of nanoseconds, as well as the offset from UTC/Greenwich.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). (default: 6)
nullable – boolean, whether the type can be null (None) or not.
Note
LocalZonedTimestampType only supports precision of 3 currently.
-
static
TINYINT
(nullable: bool = True) → pyflink.table.types.TinyIntType[source]¶ Data type of a 1-byte signed integer with values from -128 to 127.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
VARBINARY
(length: int, nullable: bool = True) → pyflink.table.types.VarBinaryType[source]¶ Data type of a variable-length binary string (=a sequence of bytes)
- Parameters
length – int, the maximum number of bytes. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The length limit must be 0x7fffffff(2147483647) currently.
See also
-
static
VARCHAR
(length: int, nullable: bool = True) → pyflink.table.types.VarCharType[source]¶ Data type of a variable-length character string.
- Parameters
length – int, the maximum string representation length. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The length limit must be 0x7fffffff(2147483647) currently.
See also
-
static
YEAR
(precision: int = 2) → pyflink.table.types.Resolution[source]¶ Resolution in years with 2 digits for the number of years by default.
- Parameters
precision – the number of digits of years. It must have a value between 1 and 4 (both inclusive), (default 2).
- Returns
the specified
Resolution
.
See also
-
static
-
class
pyflink.table.
UserDefinedType
(nullable=True)[source]¶ Bases:
pyflink.table.types.DataType
User-defined type (UDT).
Note
WARN: Flink Internal Use Only
-
classmethod
java_udt
()[source]¶ The class name of the paired Java UDT (could be ‘’, if there is no corresponding one).
-
classmethod
-
class
pyflink.table.
Expression
(j_expr_or_property_name)[source]¶ Bases:
typing.Generic
Expressions represent a logical tree for producing a computation result. Expressions might be literal values, function calls, or field references.
New in version 1.12.0.
-
property
abs
¶ Calculates the absolute value of given value.
New in version 1.12.0.
-
property
acos
¶ Calculates the arc cosine of a given number.
New in version 1.12.0.
-
alias
(name: str, *extra_names: str) → pyflink.table.expression.Expression[~T][T][source]¶ Specifies a name for an expression i.e. a field.
Example:
>>> tab.select(col('a').alias('b'))
- Parameters
name – name for one field.
extra_names – additional names if the expression expands to multiple fields
New in version 1.12.0.
-
property
asc
¶ Specifies ascending order of an expression i.e. a field for order_by.
Example:
>>> tab.order_by(col('a').asc)
See also
New in version 1.12.0.
-
property
asin
¶ Calculates the arc sine of a given number.
New in version 1.12.0.
-
at
(index) → pyflink.table.expression.Expression[source]¶ Accesses the element of an array or map based on a key or an index (starting at 1).
- Parameters
index – index key or position of the element (array index starting at 1)
See also
New in version 1.12.0.
-
property
atan
¶ Calculates the arc tangent of a given number.
New in version 1.12.0.
-
property
avg
¶ Returns the average (arithmetic mean) of the numeric field across all input values.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
between
(lower_bound, upper_bound) → pyflink.table.expression.Expression[bool][bool][source]¶ Returns true if the given expression is between lower_bound and upper_bound (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.
e.g. lit(2.1).between(2.1, 2.1) leads to true, lit(“2018-05-05”).to_date.between(lit(“2018-05-01”).to_date, lit(“2018-05-10”).to_date) leads to true.
- Parameters
lower_bound – numeric or comparable expression
upper_bound – numeric or comparable expression
See also
New in version 1.12.0.
-
property
bin
¶ Returns a string representation of an integer numeric value in binary format. Returns null if numeric is null. E.g. “4” leads to “100”, “12” leads to “1100”.
See also
New in version 1.12.0.
-
property
cardinality
¶ Returns the number of elements of an array or number of entries of a map.
New in version 1.12.0.
-
cast
(data_type: pyflink.table.types.DataType) → pyflink.table.expression.Expression[source]¶ Converts a value to a given data type.
e.g. lit(“42”).cast(DataTypes.INT()) leads to 42.
New in version 1.12.0.
-
ceil
(time_interval_unit: pyflink.table.expression.TimeIntervalUnit = None) → pyflink.table.expression.Expression[source]¶ If time_interval_unit is specified, it rounds up a time point to the given unit, e.g. lit(“12:44:31”).to_date.floor(TimeIntervalUnit.MINUTE) leads to 12:45:00. Otherwise, it calculates the smallest integer greater than or equal to a given number.
New in version 1.12.0.
-
property
char_length
¶ Returns the length of a string.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
collect
¶ Returns multiset aggregate of a given expression.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
cos
¶ Calculates the cosine of a given number.
New in version 1.12.0.
-
property
cosh
¶ Calculates the hyperbolic cosine of a given number.
New in version 1.12.0.
-
property
cot
¶ Calculates the cotangent of a given number.
New in version 1.12.0.
-
property
count
¶ Returns the number of input rows for which the field is not null.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
day
¶ Creates an interval of the given number of days.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
days
¶ Creates an interval of the given number of days.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
desc
¶ Specifies descending order of an expression i.e. a field for order_by.
Example:
>>> tab.order_by(col('a').desc)
See also
New in version 1.12.0.
-
property
distinct
¶ Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an aggregation function is only applied on distinct input values.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), col("b").sum.distinct.alias("d"))
New in version 1.12.0.
-
property
element
¶ Returns the sole element of an array with a single element. Returns null if the array is empty. Throws an exception if the array has more than one element.
See also
New in version 1.12.0.
-
property
end
¶ Returns the end time (exclusive) of a window when applied on a window reference.
e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
Example:
>>> orders.window(Tumble >>> .over(row_interval(2)) >>> .on(col("a")) >>> .alias("w")) \ >>> .group_by(col("c"), col("w")) \ >>> .select(col("c"), col("w").start, col("w").end, col("w").proctime)
See also
New in version 1.12.0.
-
property
exp
¶ Calculates the Euler’s number raised to the given power.
New in version 1.12.0.
-
extract
(time_interval_unit: pyflink.table.expression.TimeIntervalUnit) → pyflink.table.expression.Expression[source]¶ Extracts parts of a time point or time interval. Returns the part as a long value. e.g. lit(“2006-06-05”).to_date.extract(TimeIntervalUnit.DAY) leads to 5.
New in version 1.12.0.
-
property
flatten
¶ Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field.
See also
New in version 1.12.0.
-
floor
(time_interval_unit: pyflink.table.expression.TimeIntervalUnit = None) → pyflink.table.expression.Expression[source]¶ If time_interval_unit is specified, it rounds down a time point to the given unit, e.g. lit(“12:44:31”).to_date.floor(TimeIntervalUnit.MINUTE) leads to 12:44:00. Otherwise, it calculates the largest integer less than or equal to a given number.
New in version 1.12.0.
-
property
from_base64
¶ Returns the base string decoded with base64.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
get
(name_or_index: Union[str, int]) → pyflink.table.expression.Expression[source]¶ Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name or index and returns it’s value.
- Parameters
name_or_index – name or index of the field (similar to Flink’s field expressions)
See also
New in version 1.12.0.
-
property
hex
¶ Returns a string representation of an integer numeric value or a string in hex format. Returns null if numeric or string is null.
E.g. a numeric 20 leads to “14”, a numeric 100 leads to “64”, and a string “hello,world” leads to “68656c6c6f2c776f726c64”.
See also
New in version 1.12.0.
-
property
hour
¶ Creates an interval of the given number of hours.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
hours
¶ Creates an interval of the given number of hours.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
if_null
(null_replacement) → pyflink.table.expression.Expression[source]¶ Returns null_replacement if the given expression is null; otherwise the expression is returned.
This function returns a data type that is very specific in terms of nullability. The returned type is the common type of both arguments but only nullable if the null_replacement is nullable.
The function allows to pass nullable columns into a function or table that is declared with a NOT NULL constraint.
e.g. col(“nullable_column”).if_null(5) returns never null.
New in version 1.12.0.
-
in_
(first_element_or_table, *remaining_elements) → pyflink.table.expression.Expression[source]¶ If first_element_or_table is a Table, Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
Note
This operation is not supported in a streaming environment yet if first_element_or_table is a Table.
Otherwise, Returns true if an expression exists in a given list of expressions. This is a shorthand for multiple OR conditions.
If the testing set contains null, the result will be null if the element can not be found and true if it can be found. If the element is null, the result is always null.
e.g. lit(“42”).in(1, 2, 3) leads to false.
Example:
>>> tab.where(col("a").in_(1, 2, 3)) >>> table_a.where(col("x").in_(table_b.select("y")))
New in version 1.12.0.
-
property
init_cap
¶ Converts the initial letter of each word in a string to uppercase. Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
is_false
¶ Returns true if given boolean expression is false. False otherwise (for null and true).
See also
New in version 1.12.0.
-
property
is_not_false
¶ Returns true if given boolean expression is not false (for null and true). False otherwise.
See also
New in version 1.12.0.
-
property
is_not_null
¶ Returns true if the given expression is not null.
See also
New in version 1.12.0.
-
property
is_not_true
¶ Returns true if given boolean expression is not true (for null and false). False otherwise.
See also
New in version 1.12.0.
-
property
is_null
¶ Returns true if the given expression is null.
See also
New in version 1.12.0.
-
property
is_true
¶ Returns true if given boolean expression is true. False otherwise (for null and false).
See also
New in version 1.12.0.
-
json_exists
(path: str, on_error: pyflink.table.expression.JsonExistsOnError = None) → pyflink.table.expression.Expression[bool][bool][source]¶ Determines whether a JSON string satisfies a given search criterion.
This follows the ISO/IEC TR 19075-6 specification for JSON support in SQL.
Examples:
>>> lit('{"a": true}').json_exists('$.a') // True >>> lit('{"a": true}').json_exists('$.b') // False >>> lit('{"a": [{ "b": 1 }]}').json_exists('$.a[0].b') // True >>> lit('{"a": true}').json_exists('strict $.b', JsonExistsOnError.TRUE) // True >>> lit('{"a": true}').json_exists('strict $.b', JsonExistsOnError.FALSE) // False
New in version 1.12.0.
-
json_value
(path: str, returning_type: pyflink.table.types.DataType = VarCharType(2147483647, true), on_empty: pyflink.table.expression.JsonValueOnEmptyOrError = <JsonValueOnEmptyOrError.NULL: (0, )>, default_on_empty: Any = None, on_error: pyflink.table.expression.JsonValueOnEmptyOrError = <JsonValueOnEmptyOrError.NULL: (0, )>, default_on_error: Any = None) → pyflink.table.expression.Expression[source]¶ Extracts a scalar from a JSON string.
This method searches a JSON string for a given path expression and returns the value if the value at that path is scalar. Non-scalar values cannot be returned. By default, the value is returned as DataTypes.STRING(). Using returningType a different type can be chosen, with the following types being supported:
STRING
BOOLEAN
INT
DOUBLE
For empty path expressions or errors a behavior can be defined to either return null, raise an error or return a defined default value instead.
Examples:
>>> lit('{"a": true}').json_value('$.a') >>> lit('{"a": true}').json_value('$.a', DataTypes.BOOLEAN()) >>> lit('{"a": true}').json_value('lax $.b', JsonValueOnEmptyOrError.DEFAULT, False) >>> lit('{"a": true}').json_value('strict $.b', JsonValueOnEmptyOrError.NULL, None, JsonValueOnEmptyOrError.DEFAULT, False)
New in version 1.12.0.
-
like
(pattern: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[bool][bool][source]¶ Returns true, if a string matches the specified LIKE pattern. e.g. ‘Jo_n%’ matches all strings that start with ‘Jo(arbitrary letter)n’
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
ln
¶ Calculates the natural logarithm of the given value.
New in version 1.12.0.
-
log
(base=None) → pyflink.table.expression.Expression[float][float][source]¶ Calculates the natural logarithm of the given value if base is not specified. Otherwise, calculates the logarithm of the given value to the given base.
New in version 1.12.0.
-
property
log10
¶ Calculates the base 10 logarithm of the given value.
New in version 1.12.0.
-
property
log2
¶ Calculates the base 2 logarithm of the given value.
New in version 1.12.0.
-
property
lower_case
¶ Returns all of the characters in a string in lower case using the rules of the default locale.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
lpad
(length: Union[int, Expression[int]], pad: Union[str, Expression[str]]) → pyflink.table.expression.Expression[str][str][source]¶ Returns a string left-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. e.g. lit(‘hi’).lpad(4, ‘??’) returns ‘??hi’, lit(‘hi’).lpad(1, ‘??’) returns ‘h’
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
ltrim
¶ Returns a string that removes the left whitespaces from the given string.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
max
¶ Returns the maximum value of field across all input values.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
md5
¶ Returns the MD5 hash of the string argument; null if string is null.
- Returns
string of 32 hexadecimal digits or null.
New in version 1.12.0.
-
property
milli
¶ Creates an interval of the given number of millis.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
millis
¶ Creates an interval of the given number of millis.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
min
¶ Returns the minimum value of field across all input values.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
minute
¶ Creates an interval of the given number of minutes.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
minutes
¶ Creates an interval of the given number of minutes.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
month
¶ Creates an interval of the given number of months.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
months
¶ Creates an interval of the given number of months.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
not_between
(lower_bound, upper_bound) → pyflink.table.expression.Expression[bool][bool][source]¶ Returns true if the given expression is not between lower_bound and upper_bound (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.
e.g. lit(2.1).not_between(2.1, 2.1) leads to false, lit(“2018-05-05”).to_date.not_between(lit(“2018-05-01”).to_date, lit(“2018-05-10”).to_date) leads to false.
- Parameters
lower_bound – numeric or comparable expression
upper_bound – numeric or comparable expression
See also
New in version 1.12.0.
-
over
(alias) → pyflink.table.expression.Expression[source]¶ Defines an aggregation to be used for a previously specified over window.
Example:
>>> tab.window(Over >>> .partition_by(col('c')) >>> .order_by(col('rowtime')) >>> .preceding(row_interval(2)) >>> .following(CURRENT_ROW) >>> .alias("w")) \ >>> .select(col('c'), col('a'), col('a').count.over(col('w')))
New in version 1.12.0.
-
overlay
(new_string: Union[str, Expression[str]], starting: Union[int, Expression[int]], length: Union[int, Expression[int]] = None) → pyflink.table.expression.Expression[str][str][source]¶ Replaces a substring of string with a string starting at a position (starting at 1). e.g. lit(‘xxxxxtest’).overlay(‘xxxx’, 6) leads to ‘xxxxxxxxx’ lit(‘xxxxxtest’).overlay(‘xxxx’, 6, 2) leads to ‘xxxxxxxxxst’
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
position
(haystack: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[int][int][source]¶ Returns the position of string in an other string starting at 1. Returns 0 if string could not be found. e.g. lit(‘a’).position(‘bbbbba’) leads to 6.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
proctime
¶ Declares a field as the proctime attribute for indicating, accessing, and working in Flink’s processing time.
See also
New in version 1.12.0.
-
property
quarter
¶ Creates an interval of the given number of quarters.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
quarters
¶ Creates an interval of the given number of quarters.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
regexp_extract
(regex: Union[str, Expression[str]], extract_index: Union[int, Expression[int]] = None) → pyflink.table.expression.Expression[str][str][source]¶ Returns a string extracted with a specified regular expression and a regex match group index.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
regexp_replace
(regex: Union[str, Expression[str]], replacement: Union[str, Expression[str]]) → pyflink.table.expression.Expression[str][str][source]¶ Returns a string with all substrings that match the regular expression consecutively being replaced.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
repeat
(n: Union[int, Expression[int]]) → pyflink.table.expression.Expression[str][str][source]¶ Returns a string that repeats the base string n times.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
replace
(search: Union[str, Expression[str]] = None, replacement: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶ Returns a new string which replaces all the occurrences of the search target with the replacement string (non-overlapping).
e.g. lit(‘This is a test String.’).replace(’ ‘, ‘_’) leads to This_is_a_test_String.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
round
(places: Union[int, Expression[int]])[source]¶ Rounds the given number to integer places right to the decimal point.
e.g. lit(646.646).round(2) leads to 646.65, lit(646.646).round(3) leads to 646.646, lit(646.646).round(0) leads to 647, lit(646.646).round(-2) leads to 600.
New in version 1.12.0.
-
property
rowtime
¶ Declares a field as the rowtime attribute for indicating, accessing, and working in Flink’s event time.
See also
New in version 1.12.0.
-
rpad
(length: Union[int, Expression[int]], pad: Union[str, Expression[str]]) → pyflink.table.expression.Expression[str][str][source]¶ Returns a string right-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. e.g. lit(‘hi’).rpad(4, ‘??’) returns ‘hi??’, lit(‘hi’).rpad(1, ‘??’) returns ‘h’
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
rtrim
¶ Returns a string that removes the right whitespaces from the given string.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
second
¶ Creates an interval of the given number of seconds.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
seconds
¶ Creates an interval of the given number of seconds.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
sha1
¶ Returns the SHA-1 hash of the string argument; null if string is null.
- Returns
string of 40 hexadecimal digits or null.
New in version 1.12.0.
-
sha2
(hash_length: Union[int, Expression[int]]) → pyflink.table.expression.Expression[str][str][source]¶ Returns the hash for the given string expression using the SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, or SHA-512).
- Parameters
hash_length – bit length of the result (either 224, 256, 384, or 512)
- Returns
string or null if one of the arguments is null.
New in version 1.12.0.
-
property
sha224
¶ Returns the SHA-224 hash of the string argument; null if string is null.
- Returns
string of 56 hexadecimal digits or null.
New in version 1.12.0.
-
property
sha256
¶ Returns the SHA-256 hash of the string argument; null if string is null.
- Returns
string of 64 hexadecimal digits or null.
New in version 1.12.0.
-
property
sha384
¶ Returns the SHA-384 hash of the string argument; null if string is null.
- Returns
string of 96 hexadecimal digits or null.
New in version 1.12.0.
-
property
sha512
¶ Returns the SHA-512 hash of the string argument; null if string is null.
- Returns
string of 128 hexadecimal digits or null.
New in version 1.12.0.
-
property
sign
¶ Calculates the signum of a given number.
e.g. lit(1.23).sign leads to 1.00, lit(-1.23).sign leads to -1.00.
New in version 1.12.0.
-
similar
(pattern: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[bool][bool][source]¶ Returns true, if a string matches the specified SQL regex pattern. e.g. ‘A+’ matches all strings that consist of at least one A
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
sin
¶ Calculates the sine of a given number.
New in version 1.12.0.
-
property
sinh
¶ Calculates the hyperbolic sine of a given number.
New in version 1.12.0.
-
property
sqrt
¶ Calculates the square root of a given value.
New in version 1.12.0.
-
property
start
¶ Returns the start time (inclusive) of a window when applied on a window reference.
Example:
>>> tab.window(Tumble >>> .over(row_interval(2)) >>> .on(col("a")) >>> .alias("w")) \ >>> .group_by(col("c"), col("w")) \ >>> .select(col("c"), col("w").start, col("w").end, col("w").proctime)
See also
New in version 1.12.0.
-
property
stddev_pop
¶ Returns the population standard deviation of an expression(the square root of var_pop).
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
stddev_samp
¶ Returns the sample standard deviation of an expression(the square root of var_samp).
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
substring
(begin_index: Union[int, Expression[int]], length: Union[int, Expression[int]] = None) → pyflink.table.expression.Expression[str][str][source]¶ Creates a substring of the given string at given index for a given length.
- Parameters
begin_index – first character of the substring (starting at 1, inclusive)
length – number of characters of the substring
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
sum
¶ Returns the sum of the numeric field across all input values. If all values are null, null is returned.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
sum0
¶ Returns the sum of the numeric field across all input values. If all values are null, 0 is returned.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
tan
¶ Calculates the tangent of a given number.
New in version 1.12.0.
-
property
tanh
¶ Calculates the hyperbolic tangent of a given number.
New in version 1.12.0.
-
then
(if_true, if_false) → pyflink.table.expression.Expression[source]¶ Ternary conditional operator that decides which of two other expressions should be evaluated based on a evaluated boolean condition.
e.g. lit(42).is_greater(5).then(“A”, “B”) leads to “A”
- Parameters
if_true – expression to be evaluated if condition holds
if_false – expression to be evaluated if condition does not hold
New in version 1.12.0.
-
property
to_base64
¶ Returns the base64-encoded result of the input string.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
to_date
¶ Parses a date string in the form “yyyy-MM-dd” to a SQL Date. It’s equivalent to col.cast(DataTypes.DATE()).
Example:
>>> lit("2016-06-15").to_date
New in version 1.12.0.
-
property
to_time
¶ Parses a time string in the form “HH:mm:ss” to a SQL Time. It’s equivalent to col.cast(DataTypes.TIME()).
Example:
>>> lit("3:30:00").to_time
New in version 1.12.0.
-
property
to_timestamp
¶ Parses a timestamp string in the form “yyyy-MM-dd HH:mm:ss[.SSS]” to a SQL Timestamp. It’s equivalent to col.cast(DataTypes.TIMESTAMP(3)).
Example:
>>> lit('2016-06-15 3:30:00.001').to_timestamp
New in version 1.12.0.
-
trim
(character: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶ Removes leading and trailing space characters from the given string if character is None. Otherwise, removes leading and trailing specified characters from the given string.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
trim_leading
(character: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶ Removes leading space characters from the given string if character is None. Otherwise, removes leading specified characters from the given string.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
trim_trailing
(character: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶ Removes trailing space characters from the given string if character is None. Otherwise, removes trailing specified characters from the given string.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
truncate
(n: Union[int, Expression[int]] = 0) → pyflink.table.expression.Expression[~T][T][source]¶ Returns a number of truncated to n decimal places. If n is 0, the result has no decimal point or fractional part. n can be negative to cause n digits left of the decimal point of the value to become zero. E.g. truncate(42.345, 2) to 42.34, 42.truncate(-1) to 40
New in version 1.12.0.
-
property
upper_case
¶ Returns all of the characters in a string in upper case using the rules of the default locale.
See also
trim_leading()
,trim_trailing()
,trim()
,replace()
,char_length
,upper_case
,lower_case
,init_cap
,like()
,similar()
,position()
,lpad()
,rpad()
,overlay()
,regexp_replace()
,regexp_extract()
,substring()
,from_base64
,to_base64
,ltrim
,rtrim
,repeat()
New in version 1.12.0.
-
property
var_pop
¶ Returns the population standard variance of an expression.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
var_samp
¶ Returns the sample variance of a given expression.
Example:
>>> tab \ >>> .group_by(col("a")) \ >>> .select(col("a"), >>> col("b").sum.alias("d"), >>> col("b").sum0.alias("e"), >>> col("b").min.alias("f"), >>> col("b").max.alias("g"), >>> col("b").count.alias("h"), >>> col("b").avg.alias("i"), >>> col("b").stddev_pop.alias("j"), >>> col("b").stddev_samp.alias("k"), >>> col("b").var_pop.alias("l"), >>> col("b").var_samp.alias("m"), >>> col("b").collect.alias("n"))
New in version 1.12.0.
-
property
week
¶ Creates an interval of the given number of weeks.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
weeks
¶ Creates an interval of the given number of weeks.
The produced expression is of type
INTERVAL()
.See also
year
,years
,quarter
,quarters
,month
,months
,week
,weeks
,day
,days
,hour
,hours
,minute
,minutes
,second
,seconds
,milli
,millis
New in version 1.12.0.
-
property
-
class
pyflink.table.
TableSchema
(field_names: List[str] = None, data_types: List[pyflink.table.types.DataType] = None, j_table_schema=None)[source]¶ Bases:
object
A table schema that represents a table’s structure with field names and data types.
-
class
Builder
[source]¶ Bases:
object
Builder for creating a
TableSchema
.-
build
() → pyflink.table.table_schema.TableSchema[source]¶ Returns a
TableSchema
instance.- Returns
The
TableSchema
instance.
-
field
(name: str, data_type: pyflink.table.types.DataType) → pyflink.table.table_schema.TableSchema.Builder[source]¶ Add a field with name and data type.
The call order of this method determines the order of fields in the schema.
- Parameters
name – The field name.
data_type – The field data type.
- Returns
This object.
-
-
copy
() → pyflink.table.table_schema.TableSchema[source]¶ Returns a deep copy of the table schema.
- Returns
A deep copy of the table schema.
-
get_field_data_type
(field: Union[int, str]) → Optional[pyflink.table.types.DataType][source]¶ Returns the specified data type for the given field index or field name.
- Parameters
field – The index of the field or the name of the field.
- Returns
The data type of the specified field.
-
get_field_data_types
() → List[pyflink.table.types.DataType][source]¶ Returns all field data types as a list.
- Returns
A list of all field data types.
-
get_field_name
(field_index: int) → Optional[str][source]¶ Returns the specified name for the given field index.
- Parameters
field_index – The index of the field.
- Returns
The field name.
-
class
-
class
pyflink.table.
TableResult
(j_table_result)[source]¶ Bases:
object
A
TableResult
is the representation of the statement execution result.New in version 1.11.0.
-
collect
() → pyflink.table.table_result.CloseableIterator[source]¶ Get the result contents as a closeable row iterator.
Note:
For SELECT operation, the job will not be finished unless all result data has been collected. So we should actively close the job to avoid resource leak through CloseableIterator#close method. Calling CloseableIterator#close method will cancel the job and release related resources.
For DML operation, Flink does not support getting the real affected row count now. So the affected row count is always -1 (unknown) for every sink, and them will be returned until the job is finished. Calling CloseableIterator#close method will cancel the job.
For other operations, no flink job will be submitted (get_job_client() is always empty), and the result is bounded. Do noting when calling CloseableIterator#close method.
Recommended code to call CloseableIterator#close method looks like:
>>> table_result = t_env.execute("select ...") >>> with table_result.collect() as results: >>> for result in results: >>> ...
In order to fetch result to local, you can call either collect() and print(). But, they can not be called both on the same TableResult instance.
- Returns
A CloseableIterator.
New in version 1.12.0.
-
get_job_client
() → Optional[pyflink.common.job_client.JobClient][source]¶ For DML and DQL statement, return the JobClient which associates the submitted Flink job. For other statements (e.g. DDL, DCL) return empty.
- Returns
The job client, optional.
- Return type
New in version 1.11.0.
-
get_result_kind
() → pyflink.table.result_kind.ResultKind[source]¶ Return the ResultKind which represents the result type.
For DDL operation and USE operation, the result kind is always SUCCESS. For other operations, the result kind is always SUCCESS_WITH_CONTENT.
- Returns
The result kind.
New in version 1.11.0.
-
get_table_schema
() → pyflink.table.table_schema.TableSchema[source]¶ Get the schema of result.
The schema of DDL, USE, EXPLAIN:
+-------------+-------------+----------+ | column name | column type | comments | +-------------+-------------+----------+ | result | STRING | | +-------------+-------------+----------+
The schema of SHOW:
+---------------+-------------+----------+ | column name | column type | comments | +---------------+-------------+----------+ | <object name> | STRING | | +---------------+-------------+----------+ The column name of `SHOW CATALOGS` is "catalog name", the column name of `SHOW DATABASES` is "database name", the column name of `SHOW TABLES` is "table name", the column name of `SHOW VIEWS` is "view name", the column name of `SHOW FUNCTIONS` is "function name".
The schema of DESCRIBE:
+------------------+-------------+-------------------------------------------------+ | column name | column type | comments | +------------------+-------------+-------------------------------------------------+ | name | STRING | field name | +------------------+-------------+-------------------------------------------------+ | type | STRING | field type expressed as a String | +------------------+-------------+-------------------------------------------------+ | null | BOOLEAN | field nullability: true if a field is nullable, | | | | else false | +------------------+-------------+-------------------------------------------------+ | key | BOOLEAN | key constraint: 'PRI' for primary keys, | | | | 'UNQ' for unique keys, else null | +------------------+-------------+-------------------------------------------------+ | computed column | STRING | computed column: string expression | | | | if a field is computed column, else null | +------------------+-------------+-------------------------------------------------+ | watermark | STRING | watermark: string expression if a field is | | | | watermark, else null | +------------------+-------------+-------------------------------------------------+
The schema of INSERT: (one column per one sink)
+----------------------------+-------------+-----------------------+ | column name | column type | comments | +----------------------------+-------------+-----------------------+ | (name of the insert table) | BIGINT | the insert table name | +----------------------------+-------------+-----------------------+
The schema of SELECT is the selected field names and types.
- Returns
The schema of result.
- Return type
New in version 1.11.0.
-
print
()[source]¶ Print the result contents as tableau form to client console.
This method has slightly different behaviors under different checkpointing settings.
For batch jobs or streaming jobs without checkpointing, this method has neither exactly-once nor at-least-once guarantee. Query results are immediately accessible by the clients once they’re produced, but exceptions will be thrown when the job fails and restarts.
For streaming jobs with exactly-once checkpointing, this method guarantees an end-to-end exactly-once record delivery. A result will be accessible by clients only after its corresponding checkpoint completes.
For streaming jobs with at-least-once checkpointing, this method guarantees an end-to-end at-least-once record delivery. Query results are immediately accessible by the clients once they’re produced, but it is possible for the same result to be delivered multiple times.
New in version 1.11.0.
-
wait
(timeout_ms: int = None)[source]¶ Wait if necessary for at most the given time (milliseconds) for the data to be ready.
For a select operation, this method will wait until the first row can be accessed locally. For an insert operation, this method will wait for the job to finish, because the result contains only one row. For other operations, this method will return immediately, because the result is already available locally.
New in version 1.12.0.
-
-
class
pyflink.table.
Row
(*args, **kwargs)[source]¶ Bases:
object
A row in Table. The fields in it can be accessed:
like attributes (
row.key
)like dictionary values (
row[key]
)
key in row
will search through row keys.Row can be used to create a row object by using named arguments, the fields will be sorted by names. It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case.
>>> row = Row(name="Alice", age=11) >>> row Row(age=11, name='Alice') >>> row['name'], row['age'] ('Alice', 11) >>> row.name, row.age ('Alice', 11) >>> 'name' in row True >>> 'wrong_key' in row False
Row can also be used to create another Row like class, then it could be used to create Row objects, such as
>>> Person = Row("name", "age") >>> Person <Row(name, age)> >>> 'name' in Person True >>> 'wrong_key' in Person False >>> Person("Alice", 11) Row(name='Alice', age=11)
-
as_dict
(recursive=False)[source]¶ Returns as a dict.
Example:
>>> Row(name="Alice", age=11).as_dict() == {'name': 'Alice', 'age': 11} True >>> row = Row(key=1, value=Row(name='a', age=2)) >>> row.as_dict() == {'key': 1, 'value': Row(age=2, name='a')} True >>> row.as_dict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}} True
- Parameters
recursive – turns the nested Row as dict (default: False).
-
class
pyflink.table.
RowKind
[source]¶ Bases:
enum.Enum
An enumeration.
-
DELETE
= 3¶
-
INSERT
= 0¶
-
UPDATE_AFTER
= 2¶
-
UPDATE_BEFORE
= 1¶
-
-
class
pyflink.table.
ChangelogMode
(j_changelog_mode)[source]¶ Bases:
object
The set of changes contained in a changelog.
-
class
pyflink.table.
ExplainDetail
[source]¶ Bases:
object
ExplainDetail defines the types of details for explain result.
New in version 1.11.0.
-
CHANGELOG_MODE
= 1¶
-
ESTIMATED_COST
= 0¶
-
JSON_EXECUTION_PLAN
= 2¶
-
-
class
pyflink.table.
TableSource
(j_table_source)[source]¶ Bases:
object
Defines a table from an external system or location.
-
class
pyflink.table.
TableSink
(j_table_sink)[source]¶ Bases:
object
A
TableSink
specifies how to emit a table to an external system or location.
-
class
pyflink.table.
CsvTableSource
(source_path, field_names, field_types, field_delim=None, line_delim=None, quote_character=None, ignore_first_line=None, ignore_comments=None, lenient=None, empty_column_as_null=None)[source]¶ Bases:
pyflink.table.sources.TableSource
A
TableSource
for simple CSV files with a (logically) unlimited number of fields.Example:
>>> CsvTableSource("/csv/file/path", ["a", "b"], [DataTypes.INT(), DataTypes.STRING()])
- Parameters
source_path (str) – The path to the CSV file.
field_names (collections.Iterable[str]) – The names of the table fields.
field_types (collections.Iterable[str]) – The types of the table fields.
field_delim (str, optional) – The field delimiter, “,” by default.
line_delim (str, optional) – The row delimiter, “n” by default.
quote_character (str, optional) – An optional quote character for String values, null by default.
ignore_first_line (bool, optional) – Flag to ignore the first line, false by default.
ignore_comments (str, optional) – An optional prefix to indicate comments, null by default.
lenient (bool, optional) – Flag to skip records with parse error instead to fail, false by default.
empty_column_as_null (bool, optional) – Treat empty column as null, false by default.
-
class
pyflink.table.
CsvTableSink
(field_names, field_types, path, field_delimiter=', ', num_files=-1, write_mode=None)[source]¶ Bases:
pyflink.table.sinks.TableSink
A simple
TableSink
to emit data as CSV files.Example:
>>> CsvTableSink(["a", "b"], [DataTypes.INT(), DataTypes.STRING()], ... "/csv/file/path", "|", 1, WriteMode.OVERWRITE)
- Parameters
field_names – The list of field names.
field_types – The list of field data types.
path – The output path to write the Table to.
field_delimiter – The field delimiter.
num_files – The number of files to write to.
write_mode – The write mode to specify whether existing files are overwritten or not, which contains:
WriteMode.NO_OVERWRITE
andWriteMode.OVERWRITE
.
-
class
pyflink.table.
ResultKind
[source]¶ Bases:
object
ResultKind defines the types of the result.
The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple “OK”.
The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important content.
New in version 1.11.0.
-
SUCCESS
= 0¶
-
SUCCESS_WITH_CONTENT
= 1¶
-
pyflink.table.expressions module¶
-
pyflink.table.expressions.
if_then_else
(condition: Union[bool, pyflink.table.expression.Expression[bool][bool]], if_true, if_false) → pyflink.table.expression.Expression[source]¶ Ternary conditional operator that decides which of two other expressions should be evaluated based on a evaluated boolean condition.
e.g. if_then_else(col(“f0”) > 5, “A”, “B”) leads to “A”
- Parameters
condition – condition boolean condition
if_true – expression to be evaluated if condition holds
if_false – expression to be evaluated if condition does not hold
New in version 1.12.0.
-
pyflink.table.expressions.
lit
(v, data_type: pyflink.table.types.DataType = None) → pyflink.table.expression.Expression[source]¶ Creates a SQL literal.
The data type is derived from the object’s class and its value. For example, lit(12) leads to INT, lit(“abc”) leads to CHAR(3).
Example:
>>> tab.select(col("key"), lit("abc"))
New in version 1.12.0.
-
pyflink.table.expressions.
col
(name: str) → pyflink.table.expression.Expression[source]¶ Creates an expression which refers to a table’s field.
Example:
>>> tab.select(col("key"), col("value"))
- Parameters
name – the field name to refer to
New in version 1.12.0.
-
pyflink.table.expressions.
range_
(start: Union[str, int], end: Union[str, int]) → pyflink.table.expression.Expression[source]¶ Indicates a range from ‘start’ to ‘end’, which can be used in columns selection.
Example:
>>> tab.select(with_columns(range_('b', 'c')))
See also
New in version 1.12.0.
-
pyflink.table.expressions.
and_
(predicate0: Union[bool, pyflink.table.expression.Expression[bool][bool]], predicate1: Union[bool, pyflink.table.expression.Expression[bool][bool]], *predicates: Union[bool, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.expression.Expression[bool][bool][source]¶ Boolean AND in three-valued logic.
New in version 1.12.0.
-
pyflink.table.expressions.
or_
(predicate0: Union[bool, pyflink.table.expression.Expression[bool][bool]], predicate1: Union[bool, pyflink.table.expression.Expression[bool][bool]], *predicates: Union[bool, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.expression.Expression[bool][bool][source]¶ Boolean OR in three-valued logic.
New in version 1.12.0.
-
pyflink.table.expressions.
UNBOUNDED_ROW
: Expression = <pyflink.table.expression.Expression object>¶ Offset constant to be used in the preceding clause of unbounded
Over
windows. Use this constant for a row-count interval. Unbounded over windows start with the first row of a partition.New in version 1.12.0.
-
pyflink.table.expressions.
UNBOUNDED_RANGE
: Expression = <pyflink.table.expression.Expression object>¶ Offset constant to be used in the following clause of
Over
windows. Use this for setting the upper bound of the window to the current row.New in version 1.12.0.
-
pyflink.table.expressions.
CURRENT_ROW
: Expression = <pyflink.table.expression.Expression object>¶ Offset constant to be used in the following clause of
Over
windows. Use this for setting the upper bound of the window to the sort key of the current row, i.e., all rows with the same sort key as the current row are included in the window.New in version 1.12.0.
-
pyflink.table.expressions.
current_date
() → pyflink.table.expression.Expression[source]¶ Returns the current SQL date in local time zone.
New in version 1.12.0.
-
pyflink.table.expressions.
current_time
() → pyflink.table.expression.Expression[source]¶ Returns the current SQL time in local time zone.
New in version 1.12.0.
-
pyflink.table.expressions.
current_timestamp
() → pyflink.table.expression.Expression[source]¶ Returns the current SQL timestamp in local time zone, the return type of this expression is TIMESTAMP_LTZ.
New in version 1.12.0.
-
pyflink.table.expressions.
current_watermark
(rowtimeAttribute) → pyflink.table.expression.Expression[source]¶ Returns the current watermark for the given rowtime attribute, or NULL if no common watermark of all upstream operations is available at the current operation in the pipeline.
The function returns the watermark with the same type as the rowtime attribute, but with an adjusted precision of 3. For example, if the rowtime attribute is TIMESTAMP_LTZ(9), the function will return TIMESTAMP_LTZ(3).
If no watermark has been emitted yet, the function will return NULL. Users must take care of this when comparing against it, e.g. in order to filter out late data you can use
WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)
New in version 1.12.0.
-
pyflink.table.expressions.
local_time
() → pyflink.table.expression.Expression[source]¶ Returns the current SQL time in local time zone.
New in version 1.12.0.
-
pyflink.table.expressions.
local_timestamp
() → pyflink.table.expression.Expression[source]¶ Returns the current SQL timestamp in local time zone, the return type of this expression s TIMESTAMP.
New in version 1.12.0.
-
pyflink.table.expressions.
temporal_overlaps
(left_time_point, left_temporal, right_time_point, right_temporal) → pyflink.table.expression.Expression[source]¶ Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates left_end >= right_start && right_end >= left_start.
- e.g.
- temporal_overlaps(
lit(“2:55:00”).to_time, lit(1).hours, lit(“3:30:00”).to_time, lit(2).hours) leads to true.
- Parameters
left_time_point – The left time point
left_temporal – The time interval from the left time point
right_time_point – The right time point
right_temporal – The time interval from the right time point
- Returns
An expression which indicates whether two anchored time intervals overlap.
New in version 1.12.0.
-
pyflink.table.expressions.
date_format
(timestamp, format) → pyflink.table.expression.Expression[source]¶ Formats a timestamp as a string using a specified format. The format must be compatible with MySQL’s date formatting syntax as used by the date_parse function.
For example date_format(col(“time”), “%Y, %d %M”) results in strings formatted as “2017, 05 May”.
- Parameters
timestamp – The timestamp to format as string.
format – The format of the string.
- Returns
The formatted timestamp as string.
New in version 1.12.0.
-
pyflink.table.expressions.
timestamp_diff
(time_point_unit: pyflink.table.expression.TimePointUnit, time_point1, time_point2) → pyflink.table.expression.Expression[source]¶ Returns the (signed) number of
TimePointUnit
between time_point1 and time_point2.For example, timestamp_diff(TimePointUnit.DAY, lit(“2016-06-15”).to_date, lit(“2016-06-18”).to_date leads to 3.
- Parameters
time_point_unit – The unit to compute diff.
time_point1 – The first point in time.
time_point2 – The second point in time.
- Returns
The number of intervals as integer value.
New in version 1.12.0.
-
pyflink.table.expressions.
array
(head, *tail) → pyflink.table.expression.Expression[source]¶ Creates an array of literals.
Example:
>>> tab.select(array(1, 2, 3))
New in version 1.12.0.
-
pyflink.table.expressions.
row
(head, *tail) → pyflink.table.expression.Expression[source]¶ Creates a row of expressions.
Example:
>>> tab.select(row("key1", 1))
New in version 1.12.0.
-
pyflink.table.expressions.
map_
(key, value, *tail) → pyflink.table.expression.Expression[source]¶ Creates a map of expressions.
Example:
>>> tab.select( >>> map_( >>> "key1", 1, >>> "key2", 2, >>> "key3", 3 >>> ))
Note
keys and values should have the same types for all entries.
New in version 1.12.0.
-
pyflink.table.expressions.
row_interval
(rows: int) → pyflink.table.expression.Expression[source]¶ Creates an interval of rows.
Example:
>>> tab.window(Over >>> .partition_by(col('a')) >>> .order_by(col('proctime')) >>> .preceding(row_interval(4)) >>> .following(CURRENT_ROW) >>> .alias('w'))
- Parameters
rows – the number of rows
New in version 1.12.0.
-
pyflink.table.expressions.
pi
() → pyflink.table.expression.Expression[float][float][source]¶ Returns a value that is closer than any other value to pi.
New in version 1.12.0.
-
pyflink.table.expressions.
e
() → pyflink.table.expression.Expression[float][float][source]¶ Returns a value that is closer than any other value to e.
New in version 1.12.0.
-
pyflink.table.expressions.
rand
(seed: Union[int, pyflink.table.expression.Expression[int][int]] = None) → pyflink.table.expression.Expression[float][float][source]¶ Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed if specified. Two rand() functions will return identical sequences of numbers if they have same initial seed.
New in version 1.12.0.
-
pyflink.table.expressions.
rand_integer
(bound: Union[int, pyflink.table.expression.Expression[int][int]], seed: Union[int, pyflink.table.expression.Expression[int][int]] = None) → pyflink.table.expression.Expression[source]¶ Returns a pseudorandom integer value between 0 (inclusive) and the specified value (exclusive) with a initial seed if specified. Two rand_integer() functions will return identical sequences of numbers if they have same initial seed and same bound.
New in version 1.12.0.
-
pyflink.table.expressions.
atan2
(y, x) → pyflink.table.expression.Expression[float][float][source]¶ Calculates the arc tangent of a given coordinate.
New in version 1.12.0.
-
pyflink.table.expressions.
negative
(v) → pyflink.table.expression.Expression[source]¶ Returns negative numeric.
New in version 1.12.0.
-
pyflink.table.expressions.
concat
(first: Union[str, pyflink.table.expression.Expression[str][str]], *others: Union[str, pyflink.table.expression.Expression[str][str]]) → pyflink.table.expression.Expression[str][str][source]¶ Returns the string that results from concatenating the arguments. Returns NULL if any argument is NULL.
New in version 1.12.0.
-
pyflink.table.expressions.
concat_ws
(separator: Union[str, pyflink.table.expression.Expression[str][str]], first: Union[str, pyflink.table.expression.Expression[str][str]], *others: Union[str, pyflink.table.expression.Expression[str][str]]) → pyflink.table.expression.Expression[str][str][source]¶ Returns the string that results from concatenating the arguments and separator. Returns NULL If the separator is NULL.
Note
this function does not skip empty strings. However, it does skip any NULL values after the separator argument.
New in version 1.12.0.
-
pyflink.table.expressions.
uuid
() → pyflink.table.expression.Expression[str][str][source]¶ Returns an UUID (Universally Unique Identifier) string (e.g., “3d3c68f7-f608-473f-b60c-b0c44ad4cc4e”) according to RFC 4122 type 4 (pseudo randomly generated) UUID. The UUID is generated using a cryptographically strong pseudo random number generator.
New in version 1.12.0.
-
pyflink.table.expressions.
null_of
(data_type: pyflink.table.types.DataType) → pyflink.table.expression.Expression[source]¶ Returns a null literal value of a given data type.
New in version 1.12.0.
-
pyflink.table.expressions.
log
(v, base=None) → pyflink.table.expression.Expression[float][float][source]¶ If base is specified, calculates the logarithm of the given value to the given base. Otherwise, calculates the natural logarithm of the given value.
New in version 1.12.0.
-
pyflink.table.expressions.
with_columns
(head, *tails) → pyflink.table.expression.Expression[source]¶ Creates an expression that selects a range of columns. It can be used wherever an array of expression is accepted such as function calls, projections, or groupings.
A range can either be index-based or name-based. Indices start at 1 and boundaries are inclusive.
e.g. with_columns(range_(“b”, “c”)) or with_columns(col(“*”))
See also
New in version 1.12.0.
-
pyflink.table.expressions.
without_columns
(head, *tails) → pyflink.table.expression.Expression[source]¶ Creates an expression that selects all columns except for the given range of columns. It can be used wherever an array of expression is accepted such as function calls, projections, or groupings.
A range can either be index-based or name-based. Indices start at 1 and boundaries are inclusive.
e.g. without_columns(range_(“b”, “c”)) or without_columns(col(“c”))
See also
New in version 1.12.0.
-
pyflink.table.expressions.
call
(f: Union[str, pyflink.table.udf.UserDefinedFunctionWrapper], *args) → pyflink.table.expression.Expression[source]¶ The first parameter f could be a str or a Python user-defined function.
When it is str, this is a call to a function that will be looked up in a catalog. There are two kinds of functions:
System functions - which are identified with one part names
- Catalog functions - which are identified always with three parts names
(catalog, database, function)
Moreover each function can either be a temporary function or permanent one (which is stored in an external catalog).
Based on that two properties the resolution order for looking up a function based on the provided function_name is following:
Temporary system function
System function
Temporary catalog function
Catalog function
- Parameters
f – the path of the function or the Python user-defined function.
args – parameters of the user-defined function.
New in version 1.12.0.
-
pyflink.table.expressions.
call_sql
(sql_expression: str) → pyflink.table.expression.Expression[source]¶ A call to a SQL expression.
The given string is parsed and translated into a Table API expression during planning. Only the translated expression is evaluated during runtime.
Note: Currently, calls are limited to simple scalar expressions. Calls to aggregate or table-valued functions are not supported. Sub-queries are also not allowed.
- Parameters
sql_expression – SQL expression to be translated
New in version 1.12.0.
-
pyflink.table.expressions.
source_watermark
() → pyflink.table.expression.Expression[source]¶ Source watermark declaration for schema.
This is a marker function that doesn’t have concrete runtime implementation. It can only be used as a single expression for watermark strategies in schema declarations. The declaration will be pushed down into a table source that implements the SupportsSourceWatermark interface. The source will emit system-defined watermarks afterwards.
Please check the documentation whether the connector supports source watermarks.
New in version 1.12.0.
pyflink.table.window module¶
-
class
pyflink.table.window.
Tumble
[source]¶ Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups elements in 5 minutes intervals.
Example:
>>> from pyflink.table import expressions as expr >>> Tumble.over(expr.lit(10).minutes) ... .on(expr.col("rowtime")) ... .alias("w") >>> Tumble.over("10.minutes").on("rowtime").alias("w")
-
classmethod
over
(size: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.TumbleWithSize[source]¶ Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups elements in 5 minutes intervals.
- Parameters
size – The size of the window as time or row-count interval.
- Returns
A partially defined tumbling window.
-
classmethod
-
class
pyflink.table.window.
Session
[source]¶ Helper class for creating a session window. The boundary of session windows are defined by intervals of inactivity, i.e., a session window is closes if no event appears for a defined gap period.
Example:
>>> from pyflink.table import expressions as expr >>> Session.with_gap(expr.lit(10).minutes) ... .on(expr.col("rowtime")) ... .alias("w") >>> Session.with_gap("10.minutes").on("rowtime").alias("w")
-
classmethod
with_gap
(gap: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.SessionWithGap[source]¶ Creates a session window. The boundary of session windows are defined by intervals of inactivity, i.e., a session window is closes if no event appears for a defined gap period.
- Parameters
gap – Specifies how long (as interval of milliseconds) to wait for new data before closing the session window.
- Returns
A partially defined session window.
-
classmethod
-
class
pyflink.table.window.
Slide
[source]¶ Helper class for creating a sliding window. Sliding windows have a fixed size and slide by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, an element can be assigned to multiple windows.
For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive window evaluations.
Example:
>>> from pyflink.table import expressions as expr >>> Slide.over(expr.lit(10).minutes) ... .every(expr.lit(5).minutes) ... .on(expr.col("rowtime")) ... .alias("w") >>> Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
-
classmethod
over
(size: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.SlideWithSize[source]¶ Creates a sliding window. Sliding windows have a fixed size and slide by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, an element can be assigned to multiple windows.
For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive window evaluations.
- Parameters
size – The size of the window as time or row-count interval.
- Returns
A partially specified sliding window.
-
classmethod
-
class
pyflink.table.window.
Over
[source]¶ Helper class for creating an over window. Similar to SQL, over window aggregates compute an aggregate for each input row over a range of its neighboring rows.
Over-windows for batch tables are currently not supported.
Example:
>>> from pyflink.table import expressions as expr >>> Over.partition_by(col("a")) \ ... .order_by(col("rowtime")) \ ... .preceding(expr.UNBOUNDED_RANGE) \ ... .alias("w") >>> Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
-
classmethod
order_by
(order_by: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.OverWindowPartitionedOrdered[source]¶ Specifies the time attribute on which rows are ordered.
For streaming tables, reference a rowtime or proctime time attribute here to specify the time mode.
- Parameters
order_by – Field reference.
- Returns
An over window with defined order.
-
classmethod
partition_by
(*partition_by: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.OverWindowPartitioned[source]¶ Partitions the elements on some partition keys.
Each partition is individually sorted and aggregate functions are applied to each partition separately.
- Parameters
partition_by – List of field references.
- Returns
An over window with defined partitioning.
-
classmethod
-
class
pyflink.table.window.
GroupWindow
(java_window)[source]¶ A group window specification.
Group windows group rows based on time or row-count intervals and is therefore essentially a special type of groupBy. Just like groupBy, group windows allow to compute aggregates on groups of elements.
Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping is required to apply aggregations on streaming tables.
For finite batch tables, group windows provide shortcuts for time-based groupBy.
pyflink.table.descriptors module¶
-
class
pyflink.table.descriptors.
Rowtime
[source]¶ Rowtime descriptor for describing an event time attribute in the schema.
-
timestamps_from_extractor
(extractor: str) → pyflink.table.descriptors.Rowtime[source]¶ Sets a custom timestamp extractor to be used for the rowtime attribute.
- Parameters
extractor – The java fully-qualified class name of the TimestampExtractor to extract the rowtime attribute from the physical type. The TimestampExtractor must have a public no-argument constructor and can be founded by in current Java classloader.
- Returns
This rowtime descriptor.
-
timestamps_from_field
(field_name: str)[source]¶ Sets a built-in timestamp extractor that converts an existing LONG or TIMESTAMP field into the rowtime attribute.
- Parameters
field_name – The field to convert into a rowtime attribute.
- Returns
This rowtime descriptor.
-
timestamps_from_source
() → pyflink.table.descriptors.Rowtime[source]¶ Sets a built-in timestamp extractor that converts the assigned timestamps from a DataStream API record into the rowtime attribute and thus preserves the assigned timestamps from the source.
Note
This extractor only works in streaming environments.
- Returns
This rowtime descriptor.
-
watermarks_from_source
() → pyflink.table.descriptors.Rowtime[source]¶ Sets a built-in watermark strategy which indicates the watermarks should be preserved from the underlying DataStream API and thus preserves the assigned watermarks from the source.
- Returns
This rowtime descriptor.
-
watermarks_from_strategy
(strategy: str) → pyflink.table.descriptors.Rowtime[source]¶ Sets a custom watermark strategy to be used for the rowtime attribute.
- Parameters
strategy – The java fully-qualified class name of the WatermarkStrategy. The WatermarkStrategy must have a public no-argument constructor and can be founded by in current Java classloader.
- Returns
This rowtime descriptor.
-
watermarks_periodic_ascending
() → pyflink.table.descriptors.Rowtime[source]¶ Sets a built-in watermark strategy for ascending rowtime attributes.
Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late.
- Returns
This rowtime descriptor.
-
watermarks_periodic_bounded
(delay: int) → pyflink.table.descriptors.Rowtime[source]¶ Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
Emits watermarks which are the maximum observed timestamp minus the specified delay.
- Parameters
delay – Delay in milliseconds.
- Returns
This rowtime descriptor.
-
-
class
pyflink.table.descriptors.
Schema
(schema=None, fields=None, rowtime=None)[source]¶ Describes a schema of a table.
Note
Field names are matched by the exact name by default (case sensitive).
-
field
(field_name: str, field_type: Union[pyflink.table.types.DataType, str]) → pyflink.table.descriptors.Schema[source]¶ Adds a field with the field name and the data type or type string. Required. This method can be called multiple times. The call order of this method defines also the order of the fields in a row. Here is a document that introduces the type strings: https://nightlies.apache.org/flink/flink-docs-stable/dev/table/connect.html#type-strings
- Parameters
field_name – The field name.
field_type – The data type or type string of the field.
- Returns
This schema object.
-
fields
(fields: Dict[str, Union[pyflink.table.types.DataType, str]]) → pyflink.table.descriptors.Schema[source]¶ Adds a set of fields with the field name and the data type or type string stored in a list.
- Parameters
fields – Dict of fields with the field name and the data type or type string stored. E.g, [(‘int_field’, DataTypes.INT()), (‘string_field’, DataTypes.STRING())].
- Returns
This schema object.
New in version 1.11.0.
-
from_origin_field
(origin_field_name: str) → pyflink.table.descriptors.Schema[source]¶ Specifies the origin of the previously defined field. The origin field is defined by a connector or format.
E.g. field(“myString”, Types.STRING).from_origin_field(“CSV_MY_STRING”)
Note
Field names are matched by the exact name by default (case sensitive).
- Parameters
origin_field_name – The origin field name.
- Returns
This schema object.
-