pyflink.table.table_environment.StreamTableEnvironment.to_changelog_stream#
- StreamTableEnvironment.to_changelog_stream(table: pyflink.table.table.Table, target_schema: Optional[pyflink.table.schema.Schema] = None, changelog_mode: Optional[pyflink.table.changelog_mode.ChangelogMode] = None) pyflink.datastream.data_stream.DataStream [source]#
Converts the given Table into a DataStream of changelog entries.
Compared to
to_data_stream()
, this method produces instances of Row and sets the RowKind flag that is contained in every record during runtime. The runtime behavior is similar to that of a DynamicTableSink.If you don’t specify the changelog_mode, the changelog containing all kinds of changes (enumerated in RowKind) as the default ChangelogMode.
The given Schema is used to configure the table runtime to convert columns and internal data structures to the desired representation. The following example shows how to convert a table column into a Row type.
Example:
>>> table_env.to_changelog_stream( ... table, ... Schema.new_builder() ... .column("id", DataTypes.BIGINT()) ... .column("payload", DataTypes.ROW( ... [DataTypes.FIELD("name", DataTypes.STRING()), ... DataTypes.FIELD("age", DataTypes.INT())])) ... .build())
Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the Types semantics of the DataStream API need to be considered.
If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp. Watermarks will be propagated as well.
If the rowtime should not be a concrete field in the final Row anymore, or the schema should be symmetrical for both
from_changelog_stream()
andto_changelog_stream()
, the rowtime can also be declared as a metadata column that will be propagated into a stream record’s timestamp. It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.The following examples illustrate common schema declarations and their semantics:
Example:
given a Table of (id INT, name STRING, my_rowtime TIMESTAMP_LTZ(3)) === EXAMPLE 1 === no physical columns defined, they will be derived automatically, the last derived physical column will be skipped in favor of the metadata column >>> Schema.new_builder() ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") ... .build() equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA) === EXAMPLE 2 === physical columns defined, all columns must be defined >>> Schema.new_builder() ... .column("id", "INT") ... .column("name", "STRING") ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") ... .build() equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA)
- Parameters
table – The Table to convert. It can be updating or insert-only.
target_schema – The Schema that decides about the final external representation in DataStream records.
changelog_mode – The required kinds of changes in the result changelog. An exception will be thrown if the given updating table cannot be represented in this changelog mode.
- Returns
The converted changelog stream of Row.