pyflink.table.table_environment.StreamTableEnvironment.from_data_stream#
- StreamTableEnvironment.from_data_stream(data_stream: pyflink.datastream.data_stream.DataStream, *fields_or_schema: Union[pyflink.table.expression.Expression, pyflink.table.schema.Schema]) pyflink.table.table.Table [source]#
When fields_or_schema is a sequence of Expression:
Converts the given DataStream into a Table with specified field names.
There are two modes for mapping original fields to the fields of the Table:
Reference input fields by name:
All fields in the schema definition are referenced by name (and possibly renamed using and alias (as). Moreover, we can define proctime and rowtime attributes at arbitrary positions using arbitrary names (except those that exist in the result schema). In this mode, fields can be reordered and projected out. This mode can be used for any input type.
Reference input fields by position:
In this mode, fields are simply renamed. Event-time attributes can replace the field on their position in the input data (if it is of correct type) or be appended at the end. Proctime attributes must be appended at the end. This mode can only be used if the input type has a defined field order (tuple, case class, Row) and none of the fields references a field of the input type.
When fields_or_schema is a Schema:
Converts the given DataStream into a Table.
Column names and types of the Table are automatically derived from the TypeInformation of the DataStream. If the outermost record’s TypeInformation is a CompositeType, it will be flattened in the first level. Composite nested fields will not be accessible.
Since the DataStream API does not support changelog processing natively, this method assumes append-only/insert-only semantics during the stream-to-table conversion. Records of class Row must describe RowKind.INSERT changes.
By default, the stream record’s timestamp and watermarks are not propagated unless explicitly declared.
This method allows to declare a Schema for the resulting table. The declaration is similar to a {@code CREATE TABLE} DDL in SQL and allows to:
enrich or overwrite automatically derived columns with a custom DataType
reorder columns
add computed or metadata columns next to the physical columns
access a stream record’s timestamp
declare a watermark strategy or propagate the DataStream watermarks
It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.
The following examples illustrate common schema declarations and their semantics:
Example:
=== EXAMPLE 1 === no physical columns defined, they will be derived automatically, e.g. BigDecimal becomes DECIMAL(38, 18) >>> Schema.new_builder() ... .column_by_expression("c1", "f1 + 42") ... .column_by_expression("c2", "f1 - 1") ... .build() equal to: CREATE TABLE (f0 STRING, f1 DECIMAL(38, 18), c1 AS f1 + 42, c2 AS f1 - 1) === EXAMPLE 2 === physical columns defined, input fields and columns will be mapped by name, columns are reordered and their data type overwritten, all columns must be defined to show up in the final table's schema >>> Schema.new_builder() ... .column("f1", "DECIMAL(10, 2)") ... .column_by_expression("c", "f1 - 1") ... .column("f0", "STRING") ... .build() equal to: CREATE TABLE (f1 DECIMAL(10, 2), c AS f1 - 1, f0 STRING) === EXAMPLE 3 === timestamp and watermarks can be added from the DataStream API, physical columns will be derived automatically >>> Schema.new_builder() ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") ... .watermark("rowtime", "SOURCE_WATERMARK()") ... .build() equal to: CREATE TABLE ( f0 STRING, f1 DECIMAL(38, 18), rowtime TIMESTAMP(3) METADATA, WATERMARK FOR rowtime AS SOURCE_WATERMARK() )
Note
create_temporary_view by providing a Schema (case 2.) was added from flink 1.14.0.
- Parameters
data_stream – The datastream to be converted.
fields_or_schema – The fields expressions to map original fields of the DataStream to the fields of the Table or the customized schema for the final table.
- Returns
The converted Table.
New in version 1.12.0.