@Internal public class KinesisProxy extends Object implements KinesisProxyInterface
NOTE: In the AWS KCL library, there is a similar implementation - com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy
. This implementation differs
mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
functionality for the Flink Kinesis Connector since the consumer may simultaneously read from
multiple Kinesis streams.
Modifier | Constructor and Description |
---|---|
protected |
KinesisProxy(Properties configProps)
Create a new KinesisProxy based on the supplied configuration properties.
|
Modifier and Type | Method and Description |
---|---|
static KinesisProxyInterface |
create(Properties configProps)
Creates a Kinesis proxy.
|
protected com.amazonaws.services.kinesis.AmazonKinesis |
createKinesisClient(Properties configProps)
Create the Kinesis client, using the provided configuration properties and default
ClientConfiguration . |
protected com.amazonaws.services.kinesis.model.DescribeStreamResult |
describeStream(String streamName,
String startShardId)
Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis
stream possess.
|
com.amazonaws.services.kinesis.model.GetRecordsResult |
getRecords(String shardIterator,
int maxRecordsToGet)
Get the next batch of data records using a specific shard iterator.
|
String |
getShardIterator(StreamShardHandle shard,
String shardIteratorType,
Object startingMarker)
Get a shard iterator from the specified position in a shard.
|
GetShardListResult |
getShardList(Map<String,String> streamNamesWithLastSeenShardIds)
Get shard list of multiple Kinesis streams, ignoring the shards of each stream before a
specified last seen shard id.
|
protected static boolean |
isRecoverableException(com.amazonaws.AmazonServiceException ex)
Determines whether the exception is recoverable using exponential-backoff.
|
protected boolean |
isRecoverableSdkClientException(com.amazonaws.SdkClientException ex)
Determines whether the exception is recoverable using exponential-backoff.
|
protected KinesisProxy(Properties configProps)
configProps
- configuration properties containing AWS credential and AWS region infoprotected com.amazonaws.services.kinesis.AmazonKinesis createKinesisClient(Properties configProps)
ClientConfiguration
. Derived classes can override this method to customize the client
configuration.public static KinesisProxyInterface create(Properties configProps)
configProps
- configuration propertiespublic com.amazonaws.services.kinesis.model.GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException
KinesisProxyInterface
getRecords
in interface KinesisProxyInterface
shardIterator
- a shard iterator that encodes info about which shard to read and where
to start readingmaxRecordsToGet
- the maximum amount of records to retrieve for this batchInterruptedException
- this method will retry with backoff if AWS Kinesis complains
that the operation has exceeded the rate limit; this exception will be thrown if the
backoff is interrupted.public GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException
KinesisProxyInterface
getShardList
in interface KinesisProxyInterface
streamNamesWithLastSeenShardIds
- a map with stream as key, and last seen shard id as
valueInterruptedException
- this method will retry with backoff if AWS Kinesis complains
that the operation has exceeded the rate limit; this exception will be thrown if the
backoff is interrupted.public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException
KinesisProxyInterface
KinesisProxyInterface.getRecords(String, int)
} to read data from the
Kinesis shard.getShardIterator
in interface KinesisProxyInterface
shard
- the shard to get the iteratorshardIteratorType
- the iterator type, defining how the shard is to be iterated (one of:
TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)startingMarker
- should be null
if shardIteratorType is TRIM_HORIZON or LATEST,
should be a Date
value if shardIteratorType is AT_TIMESTAMP, should be a String
representing the sequence number if shardIteratorType is AT_SEQUENCE_NUMBER,
AFTER_SEQUENCE_NUMBERInterruptedException
- this method will retry with backoff if AWS Kinesis complains
that the operation has exceeded the rate limit; this exception will be thrown if the
backoff is interrupted.protected boolean isRecoverableSdkClientException(com.amazonaws.SdkClientException ex)
ex
- Exception to inspecttrue
if the exception can be recovered from, else false
protected static boolean isRecoverableException(com.amazonaws.AmazonServiceException ex)
ex
- Exception to inspecttrue
if the exception can be recovered from, else false
protected com.amazonaws.services.kinesis.model.DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException
This method is using a "full jitter" approach described in AWS's article, "Exponential Backoff and Jitter". This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This jitter backoff approach will help distribute calls across the fetchers over time.
streamName
- the stream to describestartShardId
- which shard to start with for this describe operationInterruptedException
Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.