T
- The type of elements deserialized from Kafka's byte records, and emitted into
the Flink data streams.KPH
- The type of topic/partition identifier used by Kafka in the specific version.@Internal public abstract class AbstractFetcher<T,KPH> extends Object
This fetcher base class implements the logic around emitting records and tracking offsets, as well as around the optional timestamp assignment and watermark generation.
Modifier and Type | Field and Description |
---|---|
protected SourceFunction.SourceContext<T> |
sourceContext
The source context to emit records and watermarks to.
|
protected ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> |
unassignedPartitionsQueue
Queue of partitions that are not yet assigned to any Kafka clients for consuming.
|
Modifier | Constructor and Description |
---|---|
protected |
AbstractFetcher(SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition,Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
MetricGroup consumerMetricGroup,
boolean useMetrics) |
Modifier and Type | Method and Description |
---|---|
void |
addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions)
Adds a list of newly discovered partitions to the fetcher for consuming.
|
abstract void |
cancel() |
void |
commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets,
KafkaCommitCallback commitCallback)
Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
older Kafka versions).
|
protected abstract KPH |
createKafkaPartitionHandle(KafkaTopicPartition partition)
Creates the Kafka version specific representation of the given
topic partition.
|
protected abstract void |
doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets,
KafkaCommitCallback commitCallback) |
protected void |
emitRecord(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset)
Emits a record without attaching an existing timestamp to it.
|
protected void |
emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset,
long timestamp)
Emits a record attaching a timestamp to it.
|
abstract void |
runFetchLoop() |
HashMap<KafkaTopicPartition,Long> |
snapshotCurrentState()
Takes a snapshot of the partition offsets.
|
protected List<KafkaTopicPartitionState<KPH>> |
subscribedPartitionStates()
Gets all partitions (with partition state) that this fetcher is subscribed to.
|
protected final SourceFunction.SourceContext<T> sourceContext
protected final ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> unassignedPartitionsQueue
runFetchLoop()
should continuously poll this queue for unassigned partitions, and start consuming
them accordingly.
All partitions added to this queue are guaranteed to have been added
to subscribedPartitionStates
already.
protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition,Long> seedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception
Exception
public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) throws IOException, ClassNotFoundException
This method creates the partition state holder for each new partition, using
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET
as the starting offset.
It uses the earliest offset because there may be delay in discovering a partition
after it was created and started receiving records.
After the state representation for a partition is created, it is added to the unassigned partitions queue to await to be consumed.
newPartitions
- discovered partitions to addIOException
ClassNotFoundException
protected final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates()
public abstract void cancel()
public final void commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception
OffsetCommitMode.ON_CHECKPOINTS
.
The given offsets are the internal checkpointed offsets, representing the last processed record of each partition. Version-specific implementations of this method need to hold the contract that the given offsets must be incremented by 1 before committing them, so that committed offsets to Kafka represent "the next record to process".
offsets
- The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).commitCallback
- The callback that the user should trigger when a commit request completes or fails.Exception
- This method forwards exceptions.protected abstract void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception
Exception
protected abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition)
partition
- The Flink representation of the Kafka topic partition.public HashMap<KafkaTopicPartition,Long> snapshotCurrentState()
Important: This method must be called under the checkpoint lock.
protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception
Implementation Note: This method is kept brief to be JIT inlining friendly. That makes the fast path efficient, the extended paths are called as separate methods.
record
- The record to emitpartitionState
- The state of the Kafka partition from which the record was fetchedoffset
- The offset of the recordException
protected void emitRecordWithTimestamp(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception
Implementation Note: This method is kept brief to be JIT inlining friendly. That makes the fast path efficient, the extended paths are called as separate methods.
record
- The record to emitpartitionState
- The state of the Kafka partition from which the record was fetchedoffset
- The offset of the recordException
Copyright © 2014–2019 The Apache Software Foundation. All rights reserved.