Class StreamTableEnvironmentImpl
- java.lang.Object
-
- org.apache.flink.table.api.internal.TableEnvironmentImpl
-
- org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl
-
- org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
-
- All Implemented Interfaces:
StreamTableEnvironment
,TableEnvironmentInternal
,TableEnvironment
@Internal public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnvironmentImpl implements StreamTableEnvironment
The implementation for a JavaStreamTableEnvironment
. This enables conversions from/toDataStream
.It binds to a given
StreamExecutionEnvironment
.
-
-
Field Summary
-
Fields inherited from class org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl
executionEnvironment
-
Fields inherited from class org.apache.flink.table.api.internal.TableEnvironmentImpl
execEnv, functionCatalog, planner, resourceManager, tableConfig
-
-
Constructor Summary
Constructors Constructor Description StreamTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, ResourceManager resourceManager, FunctionCatalog functionCatalog, TableConfig tableConfig, StreamExecutionEnvironment executionEnvironment, Planner planner, Executor executor, boolean isStreamingMode)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static StreamTableEnvironment
create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings)
Creates a table environment that is the entry point and central context for creating Table and SQL API programs that integrate with the Java-specificDataStream
API.StreamStatementSet
createStatementSet()
Returns aStatementSet
that accepts pipelines defined by DML statements orTable
objects.<T> void
createTemporaryView(String path, DataStream<T> dataStream)
Creates a view from the givenDataStream
in a given path.<T> void
createTemporaryView(String path, DataStream<T> dataStream, Schema schema)
Creates a view from the givenDataStream
in a given path.<T> void
createTemporaryView(String path, DataStream<T> dataStream, Expression... fields)
Creates a view from the givenDataStream
in a given path with specified field names.Table
fromChangelogStream(DataStream<Row> dataStream)
Converts the givenDataStream
of changelog entries into aTable
.Table
fromChangelogStream(DataStream<Row> dataStream, Schema schema)
Converts the givenDataStream
of changelog entries into aTable
.Table
fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode)
Converts the givenDataStream
of changelog entries into aTable
.<T> Table
fromDataStream(DataStream<T> dataStream)
Converts the givenDataStream
into aTable
.<T> Table
fromDataStream(DataStream<T> dataStream, Schema schema)
Converts the givenDataStream
into aTable
.<T> Table
fromDataStream(DataStream<T> dataStream, Expression... fields)
Converts the givenDataStream
into aTable
with specified field names.<T> void
registerDataStream(String name, DataStream<T> dataStream)
Creates a view from the givenDataStream
.<T,ACC>
voidregisterFunction(String name, AggregateFunction<T,ACC> aggregateFunction)
Registers anAggregateFunction
under a unique name in the TableEnvironment's catalog.<T,ACC>
voidregisterFunction(String name, TableAggregateFunction<T,ACC> tableAggregateFunction)
Registers anTableAggregateFunction
under a unique name in the TableEnvironment's catalog.<T> void
registerFunction(String name, TableFunction<T> tableFunction)
Registers aTableFunction
under a unique name in the TableEnvironment's catalog.<T> DataStream<T>
toAppendStream(Table table, Class<T> clazz)
Converts the givenTable
into an appendDataStream
of a specified type.<T> DataStream<T>
toAppendStream(Table table, TypeInformation<T> typeInfo)
Converts the givenTable
into an appendDataStream
of a specified type.DataStream<Row>
toChangelogStream(Table table)
Converts the givenTable
into aDataStream
of changelog entries.DataStream<Row>
toChangelogStream(Table table, Schema targetSchema)
Converts the givenTable
into aDataStream
of changelog entries.DataStream<Row>
toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode)
Converts the givenTable
into aDataStream
of changelog entries.DataStream<Row>
toDataStream(Table table)
Converts the givenTable
into aDataStream
.<T> DataStream<T>
toDataStream(Table table, Class<T> targetClass)
<T> DataStream<T>
toDataStream(Table table, AbstractDataType<?> targetDataType)
<T> DataStream<Tuple2<Boolean,T>>
toRetractStream(Table table, Class<T> clazz)
Converts the givenTable
into aDataStream
of add and retract messages.<T> DataStream<Tuple2<Boolean,T>>
toRetractStream(Table table, TypeInformation<T> typeInfo)
Converts the givenTable
into aDataStream
of add and retract messages.-
Methods inherited from class org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl
asQueryOperation, attachAsDataStream, execEnv, extractTypeInformation, fromStreamInternal, lookupExecutor, qualifyQueryOperation, toStreamInternal, toStreamInternal, validateTimeCharacteristic, wrapWithChangeFlag
-
Methods inherited from class org.apache.flink.table.api.internal.TableEnvironmentImpl
compilePlan, compilePlanSql, create, create, createCatalog, createFunction, createFunction, createFunction, createFunction, createTable, createTable, createTemporaryFunction, createTemporaryFunction, createTemporaryFunction, createTemporarySystemFunction, createTemporarySystemFunction, createTemporarySystemFunction, createTemporaryTable, createTemporaryView, dropFunction, dropTemporaryFunction, dropTemporarySystemFunction, dropTemporaryTable, dropTemporaryView, executeCachedPlanInternal, executeInternal, executeInternal, executePlan, executeSql, explainInternal, explainPlan, explainSql, from, from, fromTableSource, fromValues, fromValues, fromValues, fromValues, fromValues, fromValues, generatePipelineFromQueryOperation, getCatalog, getCatalogManager, getCompletionHints, getConfig, getCurrentCatalog, getCurrentDatabase, getOperationTreeBuilder, getParser, getPlanner, listCatalogs, listDatabases, listFullModules, listFunctions, listModules, listTables, listTables, listTemporaryTables, listTemporaryViews, listUserDefinedFunctions, listViews, loadModule, loadPlan, registerCatalog, registerFunction, registerTable, registerTableSinkInternal, registerTableSourceInternal, scan, sqlQuery, translate, unloadModule, useCatalog, useDatabase, useModules, validateTableSource
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.table.api.TableEnvironment
compilePlanSql, createCatalog, createFunction, createFunction, createFunction, createFunction, createTable, createTemporaryFunction, createTemporaryFunction, createTemporaryFunction, createTemporarySystemFunction, createTemporarySystemFunction, createTemporarySystemFunction, createTemporaryTable, createTemporaryView, dropFunction, dropTemporaryFunction, dropTemporarySystemFunction, dropTemporaryTable, dropTemporaryView, executePlan, executeSql, explainSql, explainSql, from, from, fromValues, fromValues, fromValues, fromValues, fromValues, fromValues, getCatalog, getCompletionHints, getConfig, getCurrentCatalog, getCurrentDatabase, listCatalogs, listDatabases, listFullModules, listFunctions, listModules, listTables, listTables, listTemporaryTables, listTemporaryViews, listUserDefinedFunctions, listViews, loadModule, loadPlan, registerCatalog, registerFunction, registerTable, scan, sqlQuery, unloadModule, useCatalog, useDatabase, useModules
-
Methods inherited from interface org.apache.flink.table.api.internal.TableEnvironmentInternal
explainInternal
-
-
-
-
Constructor Detail
-
StreamTableEnvironmentImpl
public StreamTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, ResourceManager resourceManager, FunctionCatalog functionCatalog, TableConfig tableConfig, StreamExecutionEnvironment executionEnvironment, Planner planner, Executor executor, boolean isStreamingMode)
-
-
Method Detail
-
create
public static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings)
Description copied from interface:StreamTableEnvironment
Creates a table environment that is the entry point and central context for creating Table and SQL API programs that integrate with the Java-specificDataStream
API.It is unified for bounded and unbounded data processing.
A stream table environment is responsible for:
- Convert a
DataStream
intoTable
and vice-versa. - Connecting to external systems.
- Registering and retrieving
Table
s and other meta objects from a catalog. - Executing SQL statements.
- Offering further configuration options.
Note: If you don't intend to use the
DataStream
API,TableEnvironment
is meant for pure table programs.- Parameters:
executionEnvironment
- The JavaStreamExecutionEnvironment
of theTableEnvironment
.settings
- The environment settings used to instantiate theTableEnvironment
.
- Convert a
-
registerFunction
public <T> void registerFunction(String name, TableFunction<T> tableFunction)
Description copied from interface:StreamTableEnvironment
Registers aTableFunction
under a unique name in the TableEnvironment's catalog. Registered functions can be referenced in Table API and SQL queries.- Specified by:
registerFunction
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of the output row.- Parameters:
name
- The name under which the function is registered.tableFunction
- The TableFunction to register.
-
registerFunction
public <T,ACC> void registerFunction(String name, AggregateFunction<T,ACC> aggregateFunction)
Description copied from interface:StreamTableEnvironment
Registers anAggregateFunction
under a unique name in the TableEnvironment's catalog. Registered functions can be referenced in Table API and SQL queries.- Specified by:
registerFunction
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of the output value.ACC
- The type of aggregate accumulator.- Parameters:
name
- The name under which the function is registered.aggregateFunction
- The AggregateFunction to register.
-
registerFunction
public <T,ACC> void registerFunction(String name, TableAggregateFunction<T,ACC> tableAggregateFunction)
Description copied from interface:StreamTableEnvironment
Registers anTableAggregateFunction
under a unique name in the TableEnvironment's catalog. Registered functions can only be referenced in Table API.- Specified by:
registerFunction
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of the output value.ACC
- The type of aggregate accumulator.- Parameters:
name
- The name under which the function is registered.tableAggregateFunction
- The TableAggregateFunction to register.
-
fromDataStream
public <T> Table fromDataStream(DataStream<T> dataStream)
Description copied from interface:StreamTableEnvironment
Converts the givenDataStream
into aTable
.Column names and types of the
Table
are automatically derived from theTypeInformation
of theDataStream
. If the outermost record'sTypeInformation
is aCompositeType
, it will be flattened in the first level.TypeInformation
that cannot be represented as one of the listedDataTypes
will be treated as a black-boxDataTypes.RAW(Class, TypeSerializer)
type. Thus, composite nested fields will not be accessible.Since the DataStream API does not support changelog processing natively, this method assumes append-only/insert-only semantics during the stream-to-table conversion. Records of type
Row
must describeRowKind.INSERT
changes.By default, the stream record's timestamp and watermarks are not propagated to downstream table operations unless explicitly declared via
StreamTableEnvironment.fromDataStream(DataStream, Schema)
.If the returned table is converted back to DataStream via
StreamTableEnvironment.toDataStream(Table)
, the input DataStream of this method would be returned.- Specified by:
fromDataStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The external type of theDataStream
.- Parameters:
dataStream
- TheDataStream
to be converted.- Returns:
- The converted
Table
. - See Also:
StreamTableEnvironment.fromChangelogStream(DataStream)
-
fromDataStream
public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema)
Description copied from interface:StreamTableEnvironment
Converts the givenDataStream
into aTable
.Column names and types of the
Table
are automatically derived from theTypeInformation
of theDataStream
. If the outermost record'sTypeInformation
is aCompositeType
, it will be flattened in the first level.TypeInformation
that cannot be represented as one of the listedDataTypes
will be treated as a black-boxDataTypes.RAW(Class, TypeSerializer)
type. Thus, composite nested fields will not be accessible.Since the DataStream API does not support changelog processing natively, this method assumes append-only/insert-only semantics during the stream-to-table conversion. Records of class
Row
must describeRowKind.INSERT
changes.By default, the stream record's timestamp and watermarks are not propagated to downstream table operations unless explicitly declared in the input schema.
This method allows to declare a
Schema
for the resulting table. The declaration is similar to aCREATE TABLE
DDL in SQL and allows to:- enrich or overwrite automatically derived columns with a custom
DataType
- reorder columns
- add computed or metadata columns next to the physical columns
- access a stream record's timestamp
- declare a watermark strategy or propagate the
DataStream
watermarks
It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.
The following examples illustrate common schema declarations and their semantics:
// given a DataStream of Tuple2 < String , BigDecimal > // === EXAMPLE 1 === // no physical columns defined, they will be derived automatically, // e.g. BigDecimal becomes DECIMAL(38, 18) Schema.newBuilder() .columnByExpression("c1", "f1 + 42") .columnByExpression("c2", "f1 - 1") .build() // equal to: CREATE TABLE (f0 STRING, f1 DECIMAL(38, 18), c1 AS f1 + 42, c2 AS f1 - 1) // === EXAMPLE 2 === // physical columns defined, input fields and columns will be mapped by name, // columns are reordered and their data type overwritten, // all columns must be defined to show up in the final table's schema Schema.newBuilder() .column("f1", "DECIMAL(10, 2)") .columnByExpression("c", "f1 - 1") .column("f0", "STRING") .build() // equal to: CREATE TABLE (f1 DECIMAL(10, 2), c AS f1 - 1, f0 STRING) // === EXAMPLE 3 === // timestamp and watermarks can be added from the DataStream API, // physical columns will be derived automatically Schema.newBuilder() .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") // extract timestamp into a column .watermark("rowtime", "SOURCE_WATERMARK()") // declare watermarks propagation .build() // equal to: // CREATE TABLE ( // f0 STRING, // f1 DECIMAL(38, 18), // rowtime TIMESTAMP(3) METADATA, // WATERMARK FOR rowtime AS SOURCE_WATERMARK() // )
- Specified by:
fromDataStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The external type of theDataStream
.- Parameters:
dataStream
- TheDataStream
to be converted.schema
- The customized schema for the final table.- Returns:
- The converted
Table
. - See Also:
StreamTableEnvironment.fromChangelogStream(DataStream, Schema)
- enrich or overwrite automatically derived columns with a custom
-
fromChangelogStream
public Table fromChangelogStream(DataStream<Row> dataStream)
Description copied from interface:StreamTableEnvironment
Converts the givenDataStream
of changelog entries into aTable
.Compared to
StreamTableEnvironment.fromDataStream(DataStream)
, this method consumes instances ofRow
and evaluates theRowKind
flag that is contained in every record during runtime. The runtime behavior is similar to that of aDynamicTableSource
.This method expects a changelog containing all kinds of changes (enumerated in
RowKind
) as the defaultChangelogMode
. UseStreamTableEnvironment.fromChangelogStream(DataStream, Schema, ChangelogMode)
to limit the kinds of changes (e.g. for upsert mode).Column names and types of the
Table
are automatically derived from theTypeInformation
of theDataStream
. If the outermost record'sTypeInformation
is aCompositeType
, it will be flattened in the first level.TypeInformation
that cannot be represented as one of the listedDataTypes
will be treated as a black-boxDataTypes.RAW(Class, TypeSerializer)
type. Thus, composite nested fields will not be accessible.By default, the stream record's timestamp and watermarks are not propagated to downstream table operations unless explicitly declared via
StreamTableEnvironment.fromChangelogStream(DataStream, Schema)
.- Specified by:
fromChangelogStream
in interfaceStreamTableEnvironment
- Parameters:
dataStream
- The changelog stream ofRow
.- Returns:
- The converted
Table
.
-
fromChangelogStream
public Table fromChangelogStream(DataStream<Row> dataStream, Schema schema)
Description copied from interface:StreamTableEnvironment
Converts the givenDataStream
of changelog entries into aTable
.Compared to
StreamTableEnvironment.fromDataStream(DataStream)
, this method consumes instances ofRow
and evaluates theRowKind
flag that is contained in every record during runtime. The runtime behavior is similar to that of aDynamicTableSource
.This method expects a changelog containing all kinds of changes (enumerated in
RowKind
) as the defaultChangelogMode
. UseStreamTableEnvironment.fromChangelogStream(DataStream, Schema, ChangelogMode)
to limit the kinds of changes (e.g. for upsert mode).Column names and types of the
Table
are automatically derived from theTypeInformation
of theDataStream
. If the outermost record'sTypeInformation
is aCompositeType
, it will be flattened in the first level.TypeInformation
that cannot be represented as one of the listedDataTypes
will be treated as a black-boxDataTypes.RAW(Class, TypeSerializer)
type. Thus, composite nested fields will not be accessible.By default, the stream record's timestamp and watermarks are not propagated to downstream table operations unless explicitly declared in the input schema.
This method allows to declare a
Schema
for the resulting table. The declaration is similar to aCREATE TABLE
DDL in SQL and allows to:- enrich or overwrite automatically derived columns with a custom
DataType
- reorder columns
- add computed or metadata columns next to the physical columns
- access a stream record's timestamp
- declare a watermark strategy or propagate the
DataStream
watermarks - declare a primary key
See
StreamTableEnvironment.fromDataStream(DataStream, Schema)
for more information and examples on how to declare aSchema
.- Specified by:
fromChangelogStream
in interfaceStreamTableEnvironment
- Parameters:
dataStream
- The changelog stream ofRow
.schema
- The customized schema for the final table.- Returns:
- The converted
Table
.
- enrich or overwrite automatically derived columns with a custom
-
fromChangelogStream
public Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode)
Description copied from interface:StreamTableEnvironment
Converts the givenDataStream
of changelog entries into aTable
.Compared to
StreamTableEnvironment.fromDataStream(DataStream)
, this method consumes instances ofRow
and evaluates theRowKind
flag that is contained in every record during runtime. The runtime behavior is similar to that of aDynamicTableSource
.This method requires an explicitly declared
ChangelogMode
. For example, useChangelogMode.upsert()
if the stream will not containRowKind.UPDATE_BEFORE
, orChangelogMode.insertOnly()
for non-updating streams.Column names and types of the
Table
are automatically derived from theTypeInformation
of theDataStream
. If the outermost record'sTypeInformation
is aCompositeType
, it will be flattened in the first level.TypeInformation
that cannot be represented as one of the listedDataTypes
will be treated as a black-boxDataTypes.RAW(Class, TypeSerializer)
type. Thus, composite nested fields will not be accessible.By default, the stream record's timestamp and watermarks are not propagated to downstream table operations unless explicitly declared in the input schema.
This method allows to declare a
Schema
for the resulting table. The declaration is similar to aCREATE TABLE
DDL in SQL and allows to:- enrich or overwrite automatically derived columns with a custom
DataType
- reorder columns
- add computed or metadata columns next to the physical columns
- access a stream record's timestamp
- declare a watermark strategy or propagate the
DataStream
watermarks - declare a primary key
See
StreamTableEnvironment.fromDataStream(DataStream, Schema)
for more information and examples of how to declare aSchema
.- Specified by:
fromChangelogStream
in interfaceStreamTableEnvironment
- Parameters:
dataStream
- The changelog stream ofRow
.schema
- The customized schema for the final table.changelogMode
- The expected kinds of changes in the incoming changelog.- Returns:
- The converted
Table
.
- enrich or overwrite automatically derived columns with a custom
-
createTemporaryView
public <T> void createTemporaryView(String path, DataStream<T> dataStream)
Description copied from interface:StreamTableEnvironment
Creates a view from the givenDataStream
in a given path. Registered views can be referenced in SQL queries.See
StreamTableEnvironment.fromDataStream(DataStream)
for more information on how aDataStream
is translated into a table.Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
- Specified by:
createTemporaryView
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of theDataStream
.- Parameters:
path
- The path under which theDataStream
is created. See also theTableEnvironment
class description for the format of the path.dataStream
- TheDataStream
out of which to create the view.
-
createTemporaryView
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema)
Description copied from interface:StreamTableEnvironment
Creates a view from the givenDataStream
in a given path. Registered views can be referenced in SQL queries.See
StreamTableEnvironment.fromDataStream(DataStream, Schema)
for more information on how aDataStream
is translated into a table.Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
- Specified by:
createTemporaryView
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of theDataStream
.- Parameters:
path
- The path under which theDataStream
is created. See also theTableEnvironment
class description for the format of the path.dataStream
- TheDataStream
out of which to create the view.schema
- The customized schema for the final table.
-
toDataStream
public DataStream<Row> toDataStream(Table table)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
.Since the DataStream API does not support changelog processing natively, this method assumes append-only/insert-only semantics during the table-to-stream conversion. The records of class
Row
will always describeRowKind.INSERT
changes. Updating tables are not supported by this method and will produce an exception.If you want to convert the
Table
to a specific class or data type, useStreamTableEnvironment.toDataStream(Table, Class)
orStreamTableEnvironment.toDataStream(Table, AbstractDataType)
instead.Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the
Types
semantics of the DataStream API need to be considered.If the input table contains a single rowtime column, it will be propagated into a stream record's timestamp. Watermarks will be propagated as well.
- Specified by:
toDataStream
in interfaceStreamTableEnvironment
- Parameters:
table
- TheTable
to convert. It must be insert-only.- Returns:
- The converted
DataStream
. - See Also:
StreamTableEnvironment.toDataStream(Table, AbstractDataType)
,StreamTableEnvironment.toChangelogStream(Table)
-
toDataStream
public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
of the givenClass
.See
StreamTableEnvironment.toDataStream(Table, AbstractDataType)
for more information on how aTable
is translated into aDataStream
.This method is a shortcut for:
tableEnv.toDataStream(table, DataTypes.of(targetClass))
Calling this method with a class of
Row
will redirect toStreamTableEnvironment.toDataStream(Table)
.- Specified by:
toDataStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- External record.- Parameters:
table
- TheTable
to convert. It must be insert-only.targetClass
- TheClass
that decides about the final external representation inDataStream
records.- Returns:
- The converted
DataStream
. - See Also:
StreamTableEnvironment.toChangelogStream(Table, Schema)
-
toDataStream
public <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
of the givenDataType
.The given
DataType
is used to configure the table runtime to convert columns and internal data structures to the desired representation. The following example shows how to convert the table columns into the fields of a POJO type.// given a Table of (name STRING, age INT) public static class MyPojo { public String name; public Integer age; // default constructor for DataStream API public MyPojo() {} // fully assigning constructor for field order in Table API public MyPojo(String name, Integer age) { this.name = name; this.age = age; } } tableEnv.toDataStream(table, DataTypes.of(MyPojo.class));
Since the DataStream API does not support changelog processing natively, this method assumes append-only/insert-only semantics during the table-to-stream conversion. Updating tables are not supported by this method and will produce an exception.
Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the
Types
semantics of the DataStream API need to be considered.If the input table contains a single rowtime column, it will be propagated into a stream record's timestamp. Watermarks will be propagated as well.
- Specified by:
toDataStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- External record.- Parameters:
table
- TheTable
to convert. It must be insert-only.targetDataType
- TheDataType
that decides about the final external representation inDataStream
records.- Returns:
- The converted
DataStream
. - See Also:
StreamTableEnvironment.toDataStream(Table)
,StreamTableEnvironment.toChangelogStream(Table, Schema)
-
toChangelogStream
public DataStream<Row> toChangelogStream(Table table)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
of changelog entries.Compared to
StreamTableEnvironment.toDataStream(Table)
, this method produces instances ofRow
and sets theRowKind
flag that is contained in every record during runtime. The runtime behavior is similar to that of aDynamicTableSink
.This method can emit a changelog containing all kinds of changes (enumerated in
RowKind
) that the given updating table requires as the defaultChangelogMode
. UseStreamTableEnvironment.toChangelogStream(Table, Schema, ChangelogMode)
to limit the kinds of changes (e.g. for upsert mode).Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the
Types
semantics of the DataStream API need to be considered.If the input table contains a single rowtime column, it will be propagated into a stream record's timestamp. Watermarks will be propagated as well.
- Specified by:
toChangelogStream
in interfaceStreamTableEnvironment
- Parameters:
table
- TheTable
to convert. It can be updating or insert-only.- Returns:
- The converted changelog stream of
Row
.
-
toChangelogStream
public DataStream<Row> toChangelogStream(Table table, Schema targetSchema)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
of changelog entries.Compared to
StreamTableEnvironment.toDataStream(Table)
, this method produces instances ofRow
and sets theRowKind
flag that is contained in every record during runtime. The runtime behavior is similar to that of aDynamicTableSink
.This method can emit a changelog containing all kinds of changes (enumerated in
RowKind
) that the given updating table requires as the defaultChangelogMode
. UseStreamTableEnvironment.toChangelogStream(Table, Schema, ChangelogMode)
to limit the kinds of changes (e.g. for upsert mode).The given
Schema
is used to configure the table runtime to convert columns and internal data structures to the desired representation. The following example shows how to convert a table column into a POJO type.// given a Table of (id BIGINT, payload ROW < name STRING , age INT >) public static class MyPojo { public String name; public Integer age; // default constructor for DataStream API public MyPojo() {} // fully assigning constructor for field order in Table API public MyPojo(String name, Integer age) { this.name = name; this.age = age; } } tableEnv.toChangelogStream( table, Schema.newBuilder() .column("id", DataTypes.BIGINT()) .column("payload", DataTypes.of(MyPojo.class)) // force an implicit conversion .build());
Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the
Types
semantics of the DataStream API need to be considered.If the input table contains a single rowtime column, it will be propagated into a stream record's timestamp. Watermarks will be propagated as well.
If the rowtime should not be a concrete field in the final
Row
anymore, or the schema should be symmetrical for bothStreamTableEnvironment.fromChangelogStream(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row>)
andStreamTableEnvironment.toChangelogStream(org.apache.flink.table.api.Table)
, the rowtime can also be declared as a metadata column that will be propagated into a stream record's timestamp. It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.The following examples illustrate common schema declarations and their semantics:
// given a Table of (id INT, name STRING, my_rowtime TIMESTAMP_LTZ(3)) // === EXAMPLE 1 === // no physical columns defined, they will be derived automatically, // the last derived physical column will be skipped in favor of the metadata column Schema.newBuilder() .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .build() // equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA) // === EXAMPLE 2 === // physical columns defined, all columns must be defined Schema.newBuilder() .column("id", "INT") .column("name", "STRING") .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .build() // equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA)
- Specified by:
toChangelogStream
in interfaceStreamTableEnvironment
- Parameters:
table
- TheTable
to convert. It can be updating or insert-only.targetSchema
- TheSchema
that decides about the final external representation inDataStream
records.- Returns:
- The converted changelog stream of
Row
.
-
toChangelogStream
public DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
of changelog entries.Compared to
StreamTableEnvironment.toDataStream(Table)
, this method produces instances ofRow
and sets theRowKind
flag that is contained in every record during runtime. The runtime behavior is similar to that of aDynamicTableSink
.This method requires an explicitly declared
ChangelogMode
. For example, useChangelogMode.upsert()
if the stream will not containRowKind.UPDATE_BEFORE
, orChangelogMode.insertOnly()
for non-updating streams.Note that the type system of the table ecosystem is richer than the one of the DataStream API. The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API. Afterwards, the
Types
semantics of the DataStream API need to be considered.If the input table contains a single rowtime column, it will be propagated into a stream record's timestamp. Watermarks will be propagated as well. However, it is also possible to write out the rowtime as a metadata column. See
StreamTableEnvironment.toChangelogStream(Table, Schema)
for more information and examples on how to declare aSchema
.- Specified by:
toChangelogStream
in interfaceStreamTableEnvironment
- Parameters:
table
- TheTable
to convert. It can be updating or insert-only.targetSchema
- TheSchema
that decides about the final external representation inDataStream
records.changelogMode
- The required kinds of changes in the result changelog. An exception will be thrown if the given updating table cannot be represented in this changelog mode.- Returns:
- The converted changelog stream of
Row
.
-
createStatementSet
public StreamStatementSet createStatementSet()
Description copied from interface:TableEnvironment
Returns aStatementSet
that accepts pipelines defined by DML statements orTable
objects. The planner can optimize all added statements together and then submit them as one job.- Specified by:
createStatementSet
in interfaceStreamTableEnvironment
- Specified by:
createStatementSet
in interfaceTableEnvironment
- Overrides:
createStatementSet
in classTableEnvironmentImpl
- Returns:
- statement set builder for the Java-specific
DataStream
API
-
fromDataStream
public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields)
Description copied from interface:StreamTableEnvironment
Converts the givenDataStream
into aTable
with specified field names.There are two modes for mapping original fields to the fields of the
Table
:1. Reference input fields by name: All fields in the schema definition are referenced by name (and possibly renamed using an alias (as). Moreover, we can define proctime and rowtime attributes at arbitrary positions using arbitrary names (except those that exist in the result schema). In this mode, fields can be reordered and projected out. This mode can be used for any input type, including POJOs.
Example:
DataStream<Tuple2<String, Long>> stream = ... Table table = tableEnv.fromDataStream( stream, $("f1"), // reorder and use the original field $("rowtime").rowtime(), // extract the internally attached timestamp into an event-time // attribute named 'rowtime' $("f0").as("name") // reorder and give the original field a better name );
2. Reference input fields by position: In this mode, fields are simply renamed. Event-time attributes can replace the field on their position in the input data (if it is of correct type) or be appended at the end. Proctime attributes must be appended at the end. This mode can only be used if the input type has a defined field order (tuple, case class, Row) and none of the
fields
references a field of the input type.Example:
DataStream<Tuple2<String, Long>> stream = ... Table table = tableEnv.fromDataStream( stream, $("a"), // rename the first field to 'a' $("b"), // rename the second field to 'b' $("rowtime").rowtime() // extract the internally attached timestamp into an event-time // attribute named 'rowtime' );
- Specified by:
fromDataStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of theDataStream
.- Parameters:
dataStream
- TheDataStream
to be converted.fields
- The fields expressions to map original fields of the DataStream to the fields of theTable
.- Returns:
- The converted
Table
.
-
registerDataStream
public <T> void registerDataStream(String name, DataStream<T> dataStream)
Description copied from interface:StreamTableEnvironment
Creates a view from the givenDataStream
. Registered views can be referenced in SQL queries.The field names of the
Table
are automatically derived from the type of theDataStream
.The view is registered in the namespace of the current catalog and database. To register the view in a different catalog use
StreamTableEnvironment.createTemporaryView(String, DataStream)
.Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
- Specified by:
registerDataStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of theDataStream
to register.- Parameters:
name
- The name under which theDataStream
is registered in the catalog.dataStream
- TheDataStream
to register.
-
createTemporaryView
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields)
Description copied from interface:StreamTableEnvironment
Creates a view from the givenDataStream
in a given path with specified field names. Registered views can be referenced in SQL queries.There are two modes for mapping original fields to the fields of the View:
1. Reference input fields by name: All fields in the schema definition are referenced by name (and possibly renamed using an alias (as). Moreover, we can define proctime and rowtime attributes at arbitrary positions using arbitrary names (except those that exist in the result schema). In this mode, fields can be reordered and projected out. This mode can be used for any input type, including POJOs.
Example:
DataStream<Tuple2<String, Long>> stream = ... tableEnv.createTemporaryView( "cat.db.myTable", stream, $("f1"), // reorder and use the original field $("rowtime").rowtime(), // extract the internally attached timestamp into an event-time // attribute named 'rowtime' $("f0").as("name") // reorder and give the original field a better name );
2. Reference input fields by position: In this mode, fields are simply renamed. Event-time attributes can replace the field on their position in the input data (if it is of correct type) or be appended at the end. Proctime attributes must be appended at the end. This mode can only be used if the input type has a defined field order (tuple, case class, Row) and none of the
fields
references a field of the input type.Example:
DataStream<Tuple2<String, Long>> stream = ... tableEnv.createTemporaryView( "cat.db.myTable", stream, $("a"), // rename the first field to 'a' $("b"), // rename the second field to 'b' $("rowtime").rowtime() // adds an event-time attribute named 'rowtime' );
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
- Specified by:
createTemporaryView
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of theDataStream
.- Parameters:
path
- The path under which theDataStream
is created. See also theTableEnvironment
class description for the format of the path.dataStream
- TheDataStream
out of which to create the view.fields
- The fields expressions to map original fields of the DataStream to the fields of the View.
-
toAppendStream
public <T> DataStream<T> toAppendStream(Table table, Class<T> clazz)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into an appendDataStream
of a specified type.The
Table
must only have insert (append) changes. If theTable
is also modified by update or delete changes, the conversion will fail.The fields of the
Table
are mapped toDataStream
fields as follows:Row
andTuple
types: Fields are mapped by position, field types must match.- POJO
DataStream
types: Fields are mapped by field name, field types must match.
- Specified by:
toAppendStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of the resultingDataStream
.- Parameters:
table
- TheTable
to convert.clazz
- The class of the type of the resultingDataStream
.- Returns:
- The converted
DataStream
.
-
toAppendStream
public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into an appendDataStream
of a specified type.The
Table
must only have insert (append) changes. If theTable
is also modified by update or delete changes, the conversion will fail.The fields of the
Table
are mapped toDataStream
fields as follows:Row
andTuple
types: Fields are mapped by position, field types must match.- POJO
DataStream
types: Fields are mapped by field name, field types must match.
- Specified by:
toAppendStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of the resultingDataStream
.- Parameters:
table
- TheTable
to convert.typeInfo
- TheTypeInformation
that specifies the type of theDataStream
.- Returns:
- The converted
DataStream
.
-
toRetractStream
public <T> DataStream<Tuple2<Boolean,T>> toRetractStream(Table table, Class<T> clazz)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
of add and retract messages. The message will be encoded asTuple2
. The first field is aBoolean
flag, the second field holds the record of the specified typeStreamTableEnvironment
.A true
Boolean
flag indicates an add message, a false flag indicates a retract message.The fields of the
Table
are mapped toDataStream
fields as follows:Row
andTuple
types: Fields are mapped by position, field types must match.- POJO
DataStream
types: Fields are mapped by field name, field types must match.
- Specified by:
toRetractStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of the requested record type.- Parameters:
table
- TheTable
to convert.clazz
- The class of the requested record type.- Returns:
- The converted
DataStream
.
-
toRetractStream
public <T> DataStream<Tuple2<Boolean,T>> toRetractStream(Table table, TypeInformation<T> typeInfo)
Description copied from interface:StreamTableEnvironment
Converts the givenTable
into aDataStream
of add and retract messages. The message will be encoded asTuple2
. The first field is aBoolean
flag, the second field holds the record of the specified typeStreamTableEnvironment
.A true
Boolean
flag indicates an add message, a false flag indicates a retract message.The fields of the
Table
are mapped toDataStream
fields as follows:Row
andTuple
types: Fields are mapped by position, field types must match.- POJO
DataStream
types: Fields are mapped by field name, field types must match.
- Specified by:
toRetractStream
in interfaceStreamTableEnvironment
- Type Parameters:
T
- The type of the requested record type.- Parameters:
table
- TheTable
to convert.typeInfo
- TheTypeInformation
of the requested record type.- Returns:
- The converted
DataStream
.
-
-