public class KafkaSourceBuilder<OUT> extends Object
KafkaSource
to make it easier for the users to construct a KafkaSource
.
The following example shows the minimum setup to create a KafkaSource that reads the String values from a Kafka topic.
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(MY_BOOTSTRAP_SERVERS)
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
The bootstrap servers, topics/partitions to consume, and the record deserializer are required fields that must be set.
To specify the starting offsets of the KafkaSource, one can call setStartingOffsets(OffsetsInitializer)
.
By default the KafkaSource runs in an Boundedness.CONTINUOUS_UNBOUNDED
mode and never
stops until the Flink job is canceled or fails. To let the KafkaSource run in Boundedness.CONTINUOUS_UNBOUNDED
but stops at some given offsets, one can call setUnbounded(OffsetsInitializer)
. For example the following KafkaSource stops after it consumes
up to the latest partition offsets at the point when the Flink started.
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(MY_BOOTSTRAP_SERVERS)
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setUnbounded(OffsetsInitializer.latest())
.build();
Check the Java docs of each individual methods to learn more about the settings to build a KafkaSource.
Modifier and Type | Field and Description |
---|---|
protected Properties |
props |
Modifier and Type | Method and Description |
---|---|
KafkaSource<OUT> |
build()
Build the
KafkaSource . |
KafkaSourceBuilder<OUT> |
setBootstrapServers(String bootstrapServers)
Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.
|
KafkaSourceBuilder<OUT> |
setBounded(OffsetsInitializer stoppingOffsetsInitializer)
By default the KafkaSource is set to run in
Boundedness.CONTINUOUS_UNBOUNDED manner
and thus never stops until the Flink job fails or is canceled. |
KafkaSourceBuilder<OUT> |
setClientIdPrefix(String prefix)
Sets the client id prefix of this KafkaSource.
|
KafkaSourceBuilder<OUT> |
setDeserializer(KafkaRecordDeserializationSchema<OUT> recordDeserializer)
Sets the
deserializer of the ConsumerRecord for KafkaSource. |
KafkaSourceBuilder<OUT> |
setGroupId(String groupId)
Sets the consumer group id of the KafkaSource.
|
KafkaSourceBuilder<OUT> |
setPartitions(Set<org.apache.kafka.common.TopicPartition> partitions)
Set a set of partitions to consume from.
|
KafkaSourceBuilder<OUT> |
setProperties(Properties props)
Set arbitrary properties for the KafkaSource and KafkaConsumer.
|
KafkaSourceBuilder<OUT> |
setProperty(String key,
String value)
Set an arbitrary property for the KafkaSource and KafkaConsumer.
|
KafkaSourceBuilder<OUT> |
setStartingOffsets(OffsetsInitializer startingOffsetsInitializer)
Specify from which offsets the KafkaSource should start consume from by providing an
OffsetsInitializer . |
KafkaSourceBuilder<OUT> |
setTopicPattern(Pattern topicPattern)
Set a topic pattern to consume from use the java
Pattern . |
KafkaSourceBuilder<OUT> |
setTopics(List<String> topics)
Set a list of topics the KafkaSource should consume from.
|
KafkaSourceBuilder<OUT> |
setTopics(String... topics)
Set a list of topics the KafkaSource should consume from.
|
KafkaSourceBuilder<OUT> |
setUnbounded(OffsetsInitializer stoppingOffsetsInitializer)
By default the KafkaSource is set to run in
Boundedness.CONTINUOUS_UNBOUNDED manner
and thus never stops until the Flink job fails or is canceled. |
KafkaSourceBuilder<OUT> |
setValueOnlyDeserializer(DeserializationSchema<OUT> deserializationSchema)
Sets the
deserializer of the ConsumerRecord for KafkaSource. |
protected Properties props
public KafkaSourceBuilder<OUT> setBootstrapServers(String bootstrapServers)
bootstrapServers
- the bootstrap servers of the Kafka cluster.public KafkaSourceBuilder<OUT> setGroupId(String groupId)
groupId
- the group id of the KafkaSource.public KafkaSourceBuilder<OUT> setTopics(List<String> topics)
setTopicPattern(Pattern)
instead.topics
- the list of topics to consume from.KafkaConsumer.subscribe(Collection)
public KafkaSourceBuilder<OUT> setTopics(String... topics)
setTopicPattern(Pattern)
instead.topics
- the list of topics to consume from.KafkaConsumer.subscribe(Collection)
public KafkaSourceBuilder<OUT> setTopicPattern(Pattern topicPattern)
Pattern
.topicPattern
- the pattern of the topic name to consume from.KafkaConsumer.subscribe(Pattern)
public KafkaSourceBuilder<OUT> setPartitions(Set<org.apache.kafka.common.TopicPartition> partitions)
partitions
- the set of partitions to consume from.KafkaConsumer.assign(Collection)
public KafkaSourceBuilder<OUT> setStartingOffsets(OffsetsInitializer startingOffsetsInitializer)
OffsetsInitializer
.
The following OffsetsInitializer
s are commonly used and provided out of the box.
Users can also implement their own OffsetsInitializer
for custom behaviors.
OffsetsInitializer.earliest()
- starting from the earliest offsets. This is
also the default OffsetsInitializer
of the KafkaSource for starting offsets.
OffsetsInitializer.latest()
- starting from the latest offsets.
OffsetsInitializer.committedOffsets()
- starting from the committed offsets of
the consumer group.
OffsetsInitializer.committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy)
- starting from the committed offsets of the consumer group. If there is no committed
offsets, starting from the offsets specified by the OffsetResetStrategy
.
OffsetsInitializer.offsets(Map)
- starting from the specified offsets for each
partition.
OffsetsInitializer.timestamp(long)
- starting from the specified timestamp for
each partition. Note that the guarantee here is that all the records in Kafka whose
ConsumerRecord.timestamp()
is greater than
the given starting timestamp will be consumed. However, it is possible that some
consumer records whose timestamp is smaller than the given starting timestamp are also
consumed.
startingOffsetsInitializer
- the OffsetsInitializer
setting the starting offsets
for the Source.public KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer stoppingOffsetsInitializer)
Boundedness.CONTINUOUS_UNBOUNDED
manner
and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run as
a streaming source but still stops at some point, one can set an OffsetsInitializer
to specify the stopping offsets for each partition. When all the partitions have reached
their stopping offsets, the KafkaSource will then exit.
This method is different from setBounded(OffsetsInitializer)
that after setting
the stopping offsets with this method, KafkaSource.getBoundedness()
will still return
Boundedness.CONTINUOUS_UNBOUNDED
even though it will stop at the stopping offsets
specified by the stopping offsets OffsetsInitializer
.
The following OffsetsInitializer
are commonly used and provided out of the box.
Users can also implement their own OffsetsInitializer
for custom behaviors.
OffsetsInitializer.latest()
- stop at the latest offsets of the partitions when
the KafkaSource starts to run.
OffsetsInitializer.committedOffsets()
- stops at the committed offsets of the
consumer group.
OffsetsInitializer.offsets(Map)
- stops at the specified offsets for each
partition.
OffsetsInitializer.timestamp(long)
- stops at the specified timestamp for each
partition. The guarantee of setting the stopping timestamp is that no Kafka records
whose ConsumerRecord.timestamp()
is greater
than the given stopping timestamp will be consumed. However, it is possible that some
records whose timestamp is smaller than the specified stopping timestamp are not
consumed.
stoppingOffsetsInitializer
- The OffsetsInitializer
to specify the stopping
offset.setBounded(OffsetsInitializer)
public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer)
Boundedness.CONTINUOUS_UNBOUNDED
manner
and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in
Boundedness.BOUNDED
manner and stops at some point, one can set an OffsetsInitializer
to specify the stopping offsets for each partition. When all the
partitions have reached their stopping offsets, the KafkaSource will then exit.
This method is different from setUnbounded(OffsetsInitializer)
that after setting
the stopping offsets with this method, KafkaSource.getBoundedness()
will return
Boundedness.BOUNDED
instead of Boundedness.CONTINUOUS_UNBOUNDED
.
The following OffsetsInitializer
are commonly used and provided out of the box.
Users can also implement their own OffsetsInitializer
for custom behaviors.
OffsetsInitializer.latest()
- stop at the latest offsets of the partitions when
the KafkaSource starts to run.
OffsetsInitializer.committedOffsets()
- stops at the committed offsets of the
consumer group.
OffsetsInitializer.offsets(Map)
- stops at the specified offsets for each
partition.
OffsetsInitializer.timestamp(long)
- stops at the specified timestamp for each
partition. The guarantee of setting the stopping timestamp is that no Kafka records
whose ConsumerRecord.timestamp()
is greater
than the given stopping timestamp will be consumed. However, it is possible that some
records whose timestamp is smaller than the specified stopping timestamp are not
consumed.
stoppingOffsetsInitializer
- the OffsetsInitializer
to specify the stopping
offsets.setUnbounded(OffsetsInitializer)
public KafkaSourceBuilder<OUT> setDeserializer(KafkaRecordDeserializationSchema<OUT> recordDeserializer)
deserializer
of the ConsumerRecord
for KafkaSource.recordDeserializer
- the deserializer for Kafka ConsumerRecord
.public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(DeserializationSchema<OUT> deserializationSchema)
deserializer
of the ConsumerRecord
for KafkaSource. The given
DeserializationSchema
will be used to deserialize the value of ConsumerRecord. The
other information (e.g. key) in a ConsumerRecord will be ignored.deserializationSchema
- the DeserializationSchema
to use for deserialization.public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix)
prefix
- the client id prefix to use for this KafkaSource.public KafkaSourceBuilder<OUT> setProperty(String key, String value)
ConsumerConfig
and KafkaSourceOptions
.
Note that the following keys will be overridden by the builder when the KafkaSource is created.
key.deserializer
is always set to ByteArrayDeserializer
.
value.deserializer
is always set to ByteArrayDeserializer
.
auto.offset.reset.strategy
is overridden by OffsetsInitializer.getAutoOffsetResetStrategy()
for the starting offsets, which is by
default OffsetsInitializer.earliest()
.
partition.discovery.interval.ms
is overridden to -1 when setBounded(OffsetsInitializer)
has been invoked.
key
- the key of the property.value
- the value of the property.public KafkaSourceBuilder<OUT> setProperties(Properties props)
ConsumerConfig
and KafkaSourceOptions
.
Note that the following keys will be overridden by the builder when the KafkaSource is created.
key.deserializer
is always set to ByteArrayDeserializer
.
value.deserializer
is always set to ByteArrayDeserializer
.
auto.offset.reset.strategy
is overridden by OffsetsInitializer.getAutoOffsetResetStrategy()
for the starting offsets, which is by
default OffsetsInitializer.earliest()
.
partition.discovery.interval.ms
is overridden to -1 when setBounded(OffsetsInitializer)
has been invoked.
client.id
is overridden to the "client.id.prefix-RANDOM_LONG", or
"group.id-RANDOM_LONG" if the client id prefix is not set.
props
- the properties to set for the KafkaSource.public KafkaSource<OUT> build()
KafkaSource
.Copyright © 2014–2022 The Apache Software Foundation. All rights reserved.