public abstract class KafkaTableSourceSinkFactoryBase extends Object implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row>
KafkaTableSourceBase
.TableSourceFactory.Context
TableSinkFactory.Context
Constructor and Description |
---|
KafkaTableSourceSinkFactoryBase() |
Modifier and Type | Method and Description |
---|---|
protected abstract KafkaTableSinkBase |
createKafkaTableSink(TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema)
Constructs the version-specific Kafka table sink.
|
protected abstract KafkaTableSourceBase |
createKafkaTableSource(TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Map<String,String> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition,Long> specificStartupOffsets,
long startupTimestampMillis)
Constructs the version-specific Kafka table source.
|
StreamTableSink<Row> |
createStreamTableSink(Map<String,String> properties)
Creates and configures a
StreamTableSink using the given properties. |
StreamTableSource<Row> |
createStreamTableSource(Map<String,String> properties)
Creates and configures a
StreamTableSource using the given properties. |
protected abstract String |
kafkaVersion()
Returns the Kafka version.
|
Map<String,String> |
requiredContext()
Specifies the context that this factory has been implemented for.
|
List<String> |
supportedProperties()
List of property keys that this factory can handle.
|
protected abstract boolean |
supportsKafkaTimestamps()
True if the Kafka source supports Kafka timestamps, false otherwise.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
createTableSource
createTableSource, createTableSource
createTableSink
createTableSink, createTableSink
public Map<String,String> requiredContext()
TableFactory
Typical properties might be: - connector.type - format.type
Specified property versions allow the framework to provide backwards compatible properties in case of string format changes: - connector.property-version - format.property-version
An empty context means that the factory matches for all requests.
requiredContext
in interface TableFactory
public List<String> supportedProperties()
TableFactory
Example properties might be: - schema.#.type - schema.#.name - connector.topic - format.line-delimiter - format.ignore-parse-errors - format.fields.#.type - format.fields.#.name
Note: Use "#" to denote an array of values where "#" represents one or more digits. Property versions like "format.property-version" must not be part of the supported properties.
In some cases it might be useful to declare wildcards "*". Wildcards can only be declared at the end of a property key.
For example, if an arbitrary format should be supported: - format.*
Note: Wildcards should be used with caution as they might swallow unsupported properties and thus might lead to undesired behavior.
supportedProperties
in interface TableFactory
public StreamTableSource<Row> createStreamTableSource(Map<String,String> properties)
StreamTableSourceFactory
StreamTableSource
using the given properties.createStreamTableSource
in interface StreamTableSourceFactory<Row>
properties
- normalized properties describing a stream table source.public StreamTableSink<Row> createStreamTableSink(Map<String,String> properties)
StreamTableSinkFactory
StreamTableSink
using the given properties.createStreamTableSink
in interface StreamTableSinkFactory<Row>
properties
- normalized properties describing a table sink.protected abstract String kafkaVersion()
protected abstract boolean supportsKafkaTimestamps()
protected abstract KafkaTableSourceBase createKafkaTableSource(TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, Map<String,String> fieldMapping, String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, StartupMode startupMode, Map<KafkaTopicPartition,Long> specificStartupOffsets, long startupTimestampMillis)
schema
- Schema of the produced table.proctimeAttribute
- Field name of the processing time attribute.rowtimeAttributeDescriptors
- Descriptor for a rowtime attributefieldMapping
- Mapping for the fields of the table schema to fields of the physical
returned type.topic
- Kafka topic to consume.properties
- Properties for the Kafka consumer.deserializationSchema
- Deserialization schema for decoding records from Kafka.startupMode
- Startup mode for the contained consumer.specificStartupOffsets
- Specific startup offsets; only relevant when startup mode is
StartupMode.SPECIFIC_OFFSETS
.protected abstract KafkaTableSinkBase createKafkaTableSink(TableSchema schema, String topic, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner, SerializationSchema<Row> serializationSchema)
schema
- Schema of the produced table.topic
- Kafka topic to consume.properties
- Properties for the Kafka consumer.partitioner
- Partitioner to select Kafka partition for each item.Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.