T
- The type of elements deserialized from Kafka's byte records, and emitted into the
Flink data streams.KPH
- The type of topic/partition identifier used by Kafka in the specific version.@Internal public abstract class AbstractFetcher<T,KPH> extends Object
This fetcher base class implements the logic around emitting records and tracking offsets, as well as around the optional timestamp assignment and watermark generation.
Modifier and Type | Field and Description |
---|---|
protected Object |
checkpointLock
The lock that guarantees that record emission and state updates are atomic, from the view of
taking a checkpoint.
|
protected SourceFunction.SourceContext<T> |
sourceContext
The source context to emit records and watermarks to.
|
protected ClosableBlockingQueue<KafkaTopicPartitionState<T,KPH>> |
unassignedPartitionsQueue
Queue of partitions that are not yet assigned to any Kafka clients for consuming.
|
protected WatermarkOutput |
watermarkOutput
Wrapper around our SourceContext for allowing the
WatermarkGenerator to emit watermarks and mark
idleness. |
Modifier | Constructor and Description |
---|---|
protected |
AbstractFetcher(SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition,Long> seedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
MetricGroup consumerMetricGroup,
boolean useMetrics) |
Modifier and Type | Method and Description |
---|---|
void |
addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions)
Adds a list of newly discovered partitions to the fetcher for consuming.
|
abstract void |
cancel() |
void |
commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets,
KafkaCommitCallback commitCallback)
Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for older Kafka
versions).
|
protected abstract KPH |
createKafkaPartitionHandle(KafkaTopicPartition partition)
Creates the Kafka version specific representation of the given topic partition.
|
protected abstract void |
doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets,
KafkaCommitCallback commitCallback) |
protected void |
emitRecordsWithTimestamps(Queue<T> records,
KafkaTopicPartitionState<T,KPH> partitionState,
long offset,
long kafkaEventTimestamp)
Emits a record attaching a timestamp to it.
|
abstract void |
runFetchLoop() |
HashMap<KafkaTopicPartition,Long> |
snapshotCurrentState()
Takes a snapshot of the partition offsets.
|
protected List<KafkaTopicPartitionState<T,KPH>> |
subscribedPartitionStates()
Gets all partitions (with partition state) that this fetcher is subscribed to.
|
protected final SourceFunction.SourceContext<T> sourceContext
protected final WatermarkOutput watermarkOutput
WatermarkGenerator
to emit watermarks and mark
idleness.protected final Object checkpointLock
protected final ClosableBlockingQueue<KafkaTopicPartitionState<T,KPH>> unassignedPartitionsQueue
runFetchLoop()
should
continuously poll this queue for unassigned partitions, and start consuming them accordingly.
All partitions added to this queue are guaranteed to have been added to subscribedPartitionStates
already.
protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition,Long> seedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception
Exception
public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) throws IOException, ClassNotFoundException
This method creates the partition state holder for each new partition, using KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET
as the starting offset. It uses the
earliest offset because there may be delay in discovering a partition after it was created
and started receiving records.
After the state representation for a partition is created, it is added to the unassigned partitions queue to await to be consumed.
newPartitions
- discovered partitions to addIOException
ClassNotFoundException
protected final List<KafkaTopicPartitionState<T,KPH>> subscribedPartitionStates()
public abstract void cancel()
public final void commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception
OffsetCommitMode.ON_CHECKPOINTS
.
The given offsets are the internal checkpointed offsets, representing the last processed record of each partition. Version-specific implementations of this method need to hold the contract that the given offsets must be incremented by 1 before committing them, so that committed offsets to Kafka represent "the next record to process".
offsets
- The offsets to commit to Kafka (implementations must increment offsets by 1
before committing).commitCallback
- The callback that the user should trigger when a commit request
completes or fails.Exception
- This method forwards exceptions.protected abstract void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception
Exception
protected abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition)
partition
- The Flink representation of the Kafka topic partition.public HashMap<KafkaTopicPartition,Long> snapshotCurrentState()
Important: This method must be called under the checkpoint lock.
protected void emitRecordsWithTimestamps(Queue<T> records, KafkaTopicPartitionState<T,KPH> partitionState, long offset, long kafkaEventTimestamp)
records
- The records to emitpartitionState
- The state of the Kafka partition from which the record was fetchedoffset
- The offset of the corresponding Kafka recordkafkaEventTimestamp
- The timestamp of the Kafka recordCopyright © 2014–2023 The Apache Software Foundation. All rights reserved.