T
- The type of elements deserialized from Kafka's byte records, and emitted into
the Flink data streams.KPH
- The type of topic/partition identifier used by Kafka in the specific version.public abstract class AbstractFetcher<T,KPH> extends Object
This fetcher base class implements the logic around emitting records and tracking offsets, as well as around the optional timestamp assignment and watermark generation.
Modifier and Type | Field and Description |
---|---|
protected Object |
checkpointLock
The lock that guarantees that record emission and state updates are atomic,
from the view of taking a checkpoint
|
protected static int |
NO_TIMESTAMPS_WATERMARKS |
protected static int |
PERIODIC_WATERMARKS |
protected static int |
PUNCTUATED_WATERMARKS |
protected SourceFunction.SourceContext<T> |
sourceContext
The source context to emit records and watermarks to
|
protected int |
timestampWatermarkMode
The mode describing whether the fetcher also generates timestamps and watermarks
|
protected boolean |
useMetrics
Flag whether to register metrics for the fetcher
|
Modifier | Constructor and Description |
---|---|
protected |
AbstractFetcher(SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition,Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
boolean useMetrics) |
Modifier and Type | Method and Description |
---|---|
protected void |
addOffsetStateGauge(MetricGroup metricGroup)
Add current and committed offsets to metric group
|
abstract void |
cancel() |
abstract void |
commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets,
KafkaCommitCallback commitCallback)
Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
older Kafka versions).
|
abstract KPH |
createKafkaPartitionHandle(KafkaTopicPartition partition)
Creates the Kafka version specific representation of the given
topic partition.
|
protected void |
emitRecord(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset)
Emits a record without attaching an existing timestamp to it.
|
protected void |
emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset,
long timestamp)
Emits a record attaching a timestamp to it.
|
protected void |
emitRecordWithTimestampAndPeriodicWatermark(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset,
long kafkaEventTimestamp)
Record emission, if a timestamp will be attached from an assigner that is
also a periodic watermark generator.
|
protected void |
emitRecordWithTimestampAndPunctuatedWatermark(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset,
long kafkaEventTimestamp)
Record emission, if a timestamp will be attached from an assigner that is
also a punctuated watermark generator.
|
abstract void |
runFetchLoop() |
HashMap<KafkaTopicPartition,Long> |
snapshotCurrentState()
Takes a snapshot of the partition offsets.
|
protected KafkaTopicPartitionState<KPH>[] |
subscribedPartitionStates()
Gets all partitions (with partition state) that this fetcher is subscribed to.
|
protected static final int NO_TIMESTAMPS_WATERMARKS
protected static final int PERIODIC_WATERMARKS
protected static final int PUNCTUATED_WATERMARKS
protected final SourceFunction.SourceContext<T> sourceContext
protected final Object checkpointLock
protected final int timestampWatermarkMode
protected final boolean useMetrics
protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition,Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, boolean useMetrics) throws Exception
Exception
protected final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates()
public abstract void cancel()
public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition)
partition
- The Flink representation of the Kafka topic partition.public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception
OffsetCommitMode.ON_CHECKPOINTS
.
The given offsets are the internal checkpointed offsets, representing
the last processed record of each partition. Version-specific implementations of this method
need to hold the contract that the given offsets must be incremented by 1 before
committing them, so that committed offsets to Kafka represent "the next record to process".offsets
- The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).commitCallback
- The callback that the user should trigger when a commit request completes or fails.Exception
- This method forwards exceptions.public HashMap<KafkaTopicPartition,Long> snapshotCurrentState()
Important: This method mus be called under the checkpoint lock.
protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception
Implementation Note: This method is kept brief to be JIT inlining friendly. That makes the fast path efficient, the extended paths are called as separate methods.
record
- The record to emitpartitionState
- The state of the Kafka partition from which the record was fetchedoffset
- The offset of the recordException
protected void emitRecordWithTimestamp(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception
Implementation Note: This method is kept brief to be JIT inlining friendly. That makes the fast path efficient, the extended paths are called as separate methods.
record
- The record to emitpartitionState
- The state of the Kafka partition from which the record was fetchedoffset
- The offset of the recordException
protected void emitRecordWithTimestampAndPeriodicWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
protected void emitRecordWithTimestampAndPunctuatedWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
protected void addOffsetStateGauge(MetricGroup metricGroup)
metricGroup
- The metric group to useCopyright © 2014–2018 The Apache Software Foundation. All rights reserved.