@Internal public class KinesisDataFetcher<T> extends Object
The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed by multiple threads, these operations should only be done using the handler methods provided in this class.
Modifier and Type | Class and Description |
---|---|
static interface |
KinesisDataFetcher.FlinkKinesisProxyFactory
Factory to create Kinesis proxy instances used by a fetcher.
|
static interface |
KinesisDataFetcher.FlinkKinesisProxyV2Factory
Factory to create Kinesis proxy V@ instances used by a fetcher.
|
Modifier and Type | Field and Description |
---|---|
static KinesisShardAssigner |
DEFAULT_SHARD_ASSIGNER |
Modifier | Constructor and Description |
---|---|
protected |
KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
Object checkpointLock,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
WatermarkTracker watermarkTracker,
AtomicReference<Throwable> error,
List<KinesisStreamShardState> subscribedShardsState,
HashMap<String,String> subscribedStreamsToLastDiscoveredShardIds,
KinesisDataFetcher.FlinkKinesisProxyFactory kinesisProxyFactory,
KinesisDataFetcher.FlinkKinesisProxyV2Factory kinesisProxyV2Factory) |
|
KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
WatermarkTracker watermarkTracker)
Creates a Kinesis Data Fetcher.
|
Modifier and Type | Method and Description |
---|---|
void |
advanceLastDiscoveredShardOfStream(String stream,
String shardId)
Updates the last discovered shard of a subscribed stream; only updates if the update is
valid.
|
void |
awaitTermination()
After calling
shutdownFetcher() , this can be called to await the
fetcher shutdown. |
protected void |
closeRecordPublisherFactory()
Closes recordRecordPublisherFactory.
|
static StreamShardHandle |
convertToStreamShardHandle(StreamShardMetadata streamShardMetadata)
Utility function to convert
StreamShardMetadata into StreamShardHandle . |
static StreamShardMetadata |
convertToStreamShardMetadata(StreamShardHandle streamShardHandle)
Utility function to convert
StreamShardHandle into StreamShardMetadata . |
protected static HashMap<String,String> |
createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams)
Utility function to create an initial map of the last discovered shard id of each subscribed
stream, set to null; This is called in the constructor; correct values will be set later on
by calling advanceLastDiscoveredShardOfStream().
|
protected RecordPublisher |
createRecordPublisher(SequenceNumber sequenceNumber,
Properties configProps,
MetricGroup metricGroup,
StreamShardHandle subscribedShard) |
protected RecordPublisherFactory |
createRecordPublisherFactory() |
protected ShardConsumer<T> |
createShardConsumer(Integer subscribedShardStateIndex,
StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum,
MetricGroup metricGroup,
KinesisDeserializationSchema<T> shardDeserializer)
Create a new shard consumer.
|
protected ExecutorService |
createShardConsumersThreadPool(String subtaskName) |
protected void |
deregisterStreamConsumer()
Deregisters stream consumers.
|
List<StreamShardHandle> |
discoverNewShardsToSubscribe()
A utility function that does the following:
|
protected void |
emitRecordAndUpdateState(T record,
long recordTimestamp,
int shardStateIndex,
SequenceNumber lastSequenceNumber)
Prepare a record and hand it over to the
RecordEmitter , which may collect it
asynchronously. |
protected void |
emitWatermark()
Called periodically to emit a watermark.
|
protected Properties |
getConsumerConfiguration() |
protected long |
getCurrentTimeMillis()
Return the current system time.
|
List<KinesisStreamShardState> |
getSubscribedShardsState() |
static boolean |
isThisSubtaskShouldSubscribeTo(int shardHash,
int totalNumberOfConsumerSubtasks,
int indexOfThisConsumerSubtask)
Utility function to determine whether a shard should be subscribed by this consumer subtask.
|
int |
registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState)
Register a new subscribed shard state.
|
void |
runFetcher()
Starts the fetcher.
|
protected boolean |
shouldAdvanceLastDiscoveredShardId(String shardId,
String lastSeenShardIdOfStream)
Given lastSeenShardId, check if last discovered shardId should be advanced.
|
void |
shutdownFetcher()
Starts shutting down the fetcher.
|
HashMap<StreamShardMetadata,SequenceNumber> |
snapshotState()
Creates a snapshot of the current last processed sequence numbers of each subscribed shard.
|
protected void |
stopWithError(Throwable throwable)
Called by created threads to pass on errors.
|
protected void |
updateState(int shardStateIndex,
SequenceNumber lastSequenceNumber)
Update the shard to last processed sequence number state.
|
public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER
public KinesisDataFetcher(List<String> streams, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner, AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner, WatermarkTracker watermarkTracker)
streams
- the streams to subscribe tosourceContext
- context of the source functionruntimeContext
- this subtask's runtime contextconfigProps
- the consumer configuration propertiesdeserializationSchema
- deserialization schema@VisibleForTesting protected KinesisDataFetcher(List<String> streams, SourceFunction.SourceContext<T> sourceContext, Object checkpointLock, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner, AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner, WatermarkTracker watermarkTracker, AtomicReference<Throwable> error, List<KinesisStreamShardState> subscribedShardsState, HashMap<String,String> subscribedStreamsToLastDiscoveredShardIds, KinesisDataFetcher.FlinkKinesisProxyFactory kinesisProxyFactory, @Nullable KinesisDataFetcher.FlinkKinesisProxyV2Factory kinesisProxyV2Factory)
protected ShardConsumer<T> createShardConsumer(Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, MetricGroup metricGroup, KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException
subscribedShardStateIndex
- the state index of the shard this consumer is subscribed tosubscribedShard
- the shard this consumer is subscribed tolastSequenceNum
- the sequence number in the shard to start consumingmetricGroup
- the metric group to report metrics toInterruptedException
protected RecordPublisherFactory createRecordPublisherFactory()
protected RecordPublisher createRecordPublisher(SequenceNumber sequenceNumber, Properties configProps, MetricGroup metricGroup, StreamShardHandle subscribedShard) throws InterruptedException
InterruptedException
public void runFetcher() throws Exception
shutdownFetcher()
.Exception
- the first error or exception thrown by the fetcher or any of the threads
created by the fetcher.public HashMap<StreamShardMetadata,SequenceNumber> snapshotState()
public void shutdownFetcher()
runFetcher()
to complete. Once called, the shutdown procedure will be
executed and all shard consuming threads will be interrupted.@VisibleForTesting protected void closeRecordPublisherFactory()
@VisibleForTesting protected void deregisterStreamConsumer()
public void awaitTermination() throws InterruptedException
shutdownFetcher()
, this can be called to await the
fetcher shutdown.InterruptedException
protected void stopWithError(Throwable throwable)
public void advanceLastDiscoveredShardOfStream(String stream, String shardId)
protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream)
public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException
1. Find new shards for each stream that we haven't seen before 2. For each new shard, determine whether this consumer subtask should subscribe to them; if yes, it is added to the returned list of shards 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards that we have already seen before the next time this function is called
InterruptedException
protected Properties getConsumerConfiguration()
protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber)
RecordEmitter
, which may collect it
asynchronously. This method is called by ShardConsumer
s.record
- the record to collectrecordTimestamp
- timestamp to attach to the collected recordshardStateIndex
- index of the shard to update in subscribedShardsState; this index
should be the returned value from registerNewSubscribedShardState(KinesisStreamShardState)
, called when
the shard state was registered.lastSequenceNumber
- the last sequence number value to updateprotected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber)
ShardConsumer
s.shardStateIndex
- index of the shard to update in subscribedShardsState; this index
should be the returned value from registerNewSubscribedShardState(KinesisStreamShardState)
, called when
the shard state was registered.lastSequenceNumber
- the last sequence number value to updatepublic int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState)
newSubscribedShardState
- the new shard state that this fetcher is to be subscribed to@VisibleForTesting protected long getCurrentTimeMillis()
@VisibleForTesting protected void emitWatermark()
Shards that have not received an update for a certain interval are considered inactive so as to not hold back the watermark indefinitely. When all shards are inactive, the subtask will be marked as temporarily idle to not block downstream operators.
public static boolean isThisSubtaskShouldSubscribeTo(int shardHash, int totalNumberOfConsumerSubtasks, int indexOfThisConsumerSubtask)
shardHash
- hash code for the shardtotalNumberOfConsumerSubtasks
- total number of consumer subtasksindexOfThisConsumerSubtask
- index of this consumer subtask@VisibleForTesting protected ExecutorService createShardConsumersThreadPool(String subtaskName)
@VisibleForTesting public List<KinesisStreamShardState> getSubscribedShardsState()
protected static HashMap<String,String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams)
streams
- the list of subscribed streamspublic static StreamShardMetadata convertToStreamShardMetadata(StreamShardHandle streamShardHandle)
StreamShardHandle
into StreamShardMetadata
.streamShardHandle
- the StreamShardHandle
to be convertedStreamShardMetadata
objectpublic static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata)
StreamShardMetadata
into StreamShardHandle
.streamShardMetadata
- the StreamShardMetadata
to be convertedStreamShardHandle
objectCopyright © 2014–2021 The Apache Software Foundation. All rights reserved.