Kinesis
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Amazon Kinesis Data Streams SQL Connector #

Scan Source: Unbounded Sink: Batch Sink: Streaming Append Mode

The Kinesis connector allows for reading data from and writing data into Amazon Kinesis Data Streams (KDS).

Dependencies #

Only available for stable versions.

The Kinesis connector is not part of the binary distribution. See how to link with it for cluster execution here.

Versioning #

There are two available Table API and SQL distributions for the Kinesis connector. This has resulted from an ongoing migration from the deprecated SourceFunction and SinkFunction interfaces to the new Source and Sink interfaces.

The Table API and SQL interfaces in Flink only allow one TableFactory for each connector identifier. Only one TableFactory with identifier kinesis can be included in your application’s dependencies.

The following table clarifies the underlying interface that is used depending on the distribution selected:

Dependency Connector Version Source connector identifier (interface) Sink connector identifier (interface)
flink-sql-connector-aws-kinesis-streams 5.x or later kinesis(Source) kinesis(Sink)
flink-sql-connector-aws-kinesis-streams 4.x or earlier N/A (no source packaged) kinesis(Sink)
flink-sql-connector-kinesis 5.x or later kinesis(Source), kinesis-legacy(SourceFunction) kinesis(Sink)
flink-sql-connector-kinesis 4.x or earlier kinesis(SourceFunction) kinesis(Sink)
Only include one artifact, either flink-sql-connector-aws-kinesis-streams or flink-sql-connector-kinesis. Including both will result in clashing TableFactory names.

These docs are targeted for versions 5.x onwards. The main configuration section targets kinesis identifier. For legacy configuration, please see Configuration (kinesis-legacy)

Migrating from v4.x to v5.x #

There is no state compatibility between Table API and SQL API between 4.x and 5.x. This is due to the underlying implementation being changed.

Consider starting the job with v5.x kinesis table with source.init.position of AT_TIMESTAMP slightly before the time when the job with v4.x kinesis table was stopped. Note that this may result in some re-processed some records.

How to create a Kinesis data stream table #

Follow the instructions from the Amazon KDS Developer Guide to set up a Kinesis stream. The following example shows how to create a table backed by a Kinesis data stream:

CREATE TABLE KinesisTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
  'connector' = 'kinesis',
  'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
  'aws.region' = 'us-east-1',
  'source.init.position' = 'LATEST',
  'format' = 'csv'
);

Available Metadata #

The kinesis table Source has a known bug that means VIRTUAL columns are not supported. Please use kinesis-legacy until the fix is completed.

The following metadata can be exposed as read-only (VIRTUAL) columns in a table definition. This is only available in the kinesis-legacy connector only.

Key Data Type Description
timestamp TIMESTAMP_LTZ(3) NOT NULL The approximate time when the record was inserted into the stream.
shard-id VARCHAR(128) NOT NULL The unique identifier of the shard within the stream from which the record was read.
sequence-number VARCHAR(128) NOT NULL The unique identifier of the record within its shard.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE KinesisTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3),
  `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
  `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
  `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
)
PARTITIONED BY (user_id, item_id)
WITH (
  'connector' = 'kinesis-legacy',
  'stream' = 'user_behavior',
  'aws.region' = 'us-east-2',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);

Connector Options #

Option Required Forwarded Default Type Description
Common Options
connector
required no (none) String Specify what connector to use. For Kinesis use 'kinesis' or 'kinesis-legacy'. See Versioning for details.
stream.arn
required yes (none) String Name of the Kinesis data stream backing this table.
format
required no (none) String The format used to deserialize and serialize Kinesis data stream records. See Data Type Mapping for details.
aws.region
required no (none) String The AWS region where the stream is defined.
aws.endpoint
optional no (none) String The AWS endpoint for Kinesis (derived from the AWS region setting if not set).
aws.trust.all.certificates
optional no false Boolean If true accepts all SSL certificates. This is not recommended for production environments, but should only be used for testing purposes.
Authentication Options
aws.credentials.provider
optional no AUTO String A credentials provider to use when authenticating against the Kinesis endpoint. See Authentication for details.
aws.credentials.basic.accesskeyid
optional no (none) String The AWS access key ID to use when setting credentials provider type to BASIC.
aws.credentials.basic.secretkey
optional no (none) String The AWS secret key to use when setting credentials provider type to BASIC.
aws.credentials.profile.path
optional no (none) String Optional configuration for profile path if credential provider type is set to be PROFILE.
aws.credentials.profile.name
optional no (none) String Optional configuration for profile name if credential provider type is set to be PROFILE.
aws.credentials.role.arn
optional no (none) String The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.sessionName
optional no (none) String The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.externalId
optional no (none) String The external ID to use when credential provider type is set to ASSUME_ROLE.
aws.credentials.role.stsEndpoint
optional no (none) String The AWS endpoint for STS (derived from the AWS region setting if not set) to use when credential provider type is set to ASSUME_ROLE.
aws.credentials.role.provider
optional no (none) String The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE
aws.credentials.webIdentityToken.file
optional no (none) String The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.
aws.credentials.custom.class
required only if credential provider is set to CUSTOM no (none) String The full path (in Java package notation) to the user provided class to use if credential provider type is set to be CUSTOM e.g. org.user_company.auth.CustomAwsCredentialsProvider.
Source Options
source.init.position
optional no LATEST String Initial position to be used when reading from the table. See Start Reading Position for details.
source.init.timestamp
optional no (none) String The initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.
source.init.timestamp.format
optional no yyyy-MM-dd'T'HH:mm:ss.SSSXXX String The date format of initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.
source.shard.discovery.interval
optional no 10 s Duration The interval between each attempt to discover new shards.
source.reader.type
optional no POLLING String The ReaderType to use for sources (POLLING|EFO).
source.shard.get-records.max-record-count
optional no 10000 Integer Only applicable to POLLING ReaderType. The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.
source.efo.consumer.name
optional no (none) String Only applicable to EFO ReaderType. The name of the EFO consumer to register with KDS.
source.efo.lifecycle
optional no JOB_MANAGED String Only applicable to EFO ReaderType. Determine if the EFO consumer is managed by the Flink job JOB_MANAGED|SELF_MANAGED.
source.efo.subscription.timeout
optional no 60 s Duration Only applicable to EFO ReaderType. Timeout for EFO Consumer subscription.
source.efo.deregister.timeout
optional no 10 s Duration Only applicable to EFO ReaderType. Timeout for consumer deregistration. When timeout is reached, code will continue as per normal.
source.efo.describe.retry-strategy.attempts.max
optional no 100 Integer Only applicable to EFO ReaderType. Maximum number of attempts for the exponential backoff retry strategy when calling DescribeStreamConsumer.
source.efo.describe.retry-strategy.delay.min
optional no 2 s Duration Only applicable to EFO ReaderType. Base delay for the exponential backoff retry strategy when calling DescribeStreamConsumer.
source.efo.describe.retry-strategy.delay.max
optional no 60 s Duration Only applicable to EFO ReaderType. Max delay for the exponential backoff retry strategy when calling DescribeStreamConsumer.
Sink Options
sink.partitioner
optional yes random or row-based String Optional output partitioning from Flink's partitions into Kinesis shards. See Sink Partitioning for details.
sink.partitioner-field-delimiter
optional yes | String Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See Sink Partitioning for details.
sink.producer.*
optional no (none) Deprecated options previously used by the legacy connector. Options with equivalant alternatives in KinesisStreamsSink are matched to their respective properties. Unsupported options are logged out to user as warnings.
sink.http-client.max-concurrency
optional no 10000 Integer Maximum number of allowed concurrent requests by KinesisAsyncClient.
sink.http-client.read-timeout
optional no 360000 Integer Maximum amount of time in ms for requests to be sent by KinesisAsyncClient.
sink.http-client.protocol.version
optional no HTTP2 String Http version used by Kinesis Client.
sink.batch.max-size
optional yes 500 Integer Maximum batch size of elements to be passed to KinesisAsyncClient to be written downstream.
sink.requests.max-inflight
optional yes 16 Integer Request threshold for uncompleted requests by KinesisAsyncClientbefore blocking new write requests and applying backpressure.
sink.requests.max-buffered
optional yes 10000 String Request buffer threshold for buffered requests by KinesisAsyncClient before blocking new write requests and applying backpressure.
sink.flush-buffer.size
optional yes 5242880 Long Threshold value in bytes for writer buffer in KinesisAsyncClient before flushing.
sink.flush-buffer.timeout
optional yes 5000 Long Threshold time in milliseconds for an element to be in a buffer ofKinesisAsyncClient before flushing.
sink.fail-on-error
optional yes false Boolean Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.

Features #

Refer to the Kinesis Datastream API documentation for more detailed description of features.

Sink Partitioning #

Kinesis data streams consist of one or more shards, and the sink.partitioner option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards. Valid values are:

  • fixed: Kinesis PartitionKey values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime).
  • random: Kinesis PartitionKey values are assigned randomly. This is the default value for tables not defined with a PARTITION BY clause.
  • Custom FixedKinesisPartitioner subclass: e.g. 'org.mycompany.MyPartitioner'.
Records written into tables defining a PARTITION BY clause will always be partitioned based on a concatenated projection of the PARTITION BY fields. In this case, the sink.partitioner field cannot be used to modify this behavior (attempting to do this results in a configuration error). You can, however, use the sink.partitioner-field-delimiter option to set the delimiter of field values in the concatenated PartitionKey string (an empty string is also a valid delimiter).

Data Type Mapping #

Kinesis stores records as Base64-encoded binary data objects, so it doesn’t have a notion of internal record structure. Instead, Kinesis records are deserialized and serialized by formats, e.g. ‘avro’, ‘csv’, or ‘json’. To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the format keyword. Please refer to the Formats pages for more details.

Connector Options (kinesis-legacy) #

Option Required Forwarded Default Type Description
Common Options
connector
required no (none) String Specify what connector to use. For Kinesis use 'kinesis'.
stream
required yes (none) String Name of the Kinesis data stream backing this table.
format
required no (none) String The format used to deserialize and serialize Kinesis data stream records. See Data Type Mapping for details.
aws.region
optional no (none) String The AWS region where the stream is defined. Either this or aws.endpoint are required.
aws.endpoint
optional no (none) String The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or aws.region are required.
aws.trust.all.certificates
optional no false Boolean If true accepts all SSL certificates.
Authentication Options
aws.credentials.provider
optional no AUTO String A credentials provider to use when authenticating against the Kinesis endpoint. See Authentication for details.
aws.credentials.basic.accesskeyid
optional no (none) String The AWS access key ID to use when setting credentials provider type to BASIC.
aws.credentials.basic.secretkey
optional no (none) String The AWS secret key to use when setting credentials provider type to BASIC.
aws.credentials.profile.path
optional no (none) String Optional configuration for profile path if credential provider type is set to be PROFILE.
aws.credentials.profile.name
optional no (none) String Optional configuration for profile name if credential provider type is set to be PROFILE.
aws.credentials.role.arn
optional no (none) String The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.sessionName
optional no (none) String The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.externalId
optional no (none) String The external ID to use when credential provider type is set to ASSUME_ROLE.
aws.credentials.role.stsEndpoint
optional no (none) String The AWS endpoint for STS (derived from the AWS region setting if not set) to use when credential provider type is set to ASSUME_ROLE.
aws.credentials.role.provider
optional no (none) String The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE
aws.credentials.webIdentityToken.file
optional no (none) String The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.
aws.credentials.custom.class
required only if credential provider is set to CUSTOM no (none) String The full path (in Java package notation) to the user provided class to use if credential provider type is set to be CUSTOM e.g. org.user_company.auth.CustomAwsCredentialsProvider.
Source Options
scan.stream.initpos
optional no LATEST String Initial position to be used when reading from the table. See Start Reading Position for details.
scan.stream.initpos-timestamp
optional no (none) String The initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.
scan.stream.initpos-timestamp-format
optional no yyyy-MM-dd'T'HH:mm:ss.SSSXXX String The date format of initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.
scan.stream.recordpublisher
optional no POLLING String The RecordPublisher type to use for sources.
scan.stream.efo.consumername
optional no (none) String The name of the EFO consumer to register with KDS.
scan.stream.efo.registration
optional no LAZY String Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE).
scan.stream.efo.consumerarn
optional no (none) String The prefix of consumer ARN for a given stream.
scan.stream.efo.http-client.max-concurrency
optional no 10000 Integer Maximum number of allowed concurrent requests for the EFO client.
scan.shard-assigner
optional no default String The shard assigner used to map shards to Flink subtasks (default|uniform). You can also supply your own shard assigner via the Java Service Provider Interfaces (SPI).
scan.stream.describe.maxretries
optional no 50 Integer The maximum number of describeStream attempts if we get a recoverable exception.
scan.stream.describe.backoff.base
optional no 2000 Long The base backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams).
scan.stream.describe.backoff.max
optional no 5000 Long The maximum backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams).
scan.stream.describe.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each describeStream attempt (for consuming from DynamoDB streams).
scan.list.shards.maxretries
optional no 10 Integer The maximum number of listShards attempts if we get a recoverable exception.
scan.list.shards.backoff.base
optional no 1000 Long The base backoff time (in milliseconds) between each listShards attempt.
scan.list.shards.backoff.max
optional no 5000 Long The maximum backoff time (in milliseconds) between each listShards attempt.
scan.list.shards.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each listShards attempt.
scan.stream.describestreamconsumer.maxretries
optional no 50 Integer The maximum number of describeStreamConsumer attempts if we get a recoverable exception.
scan.stream.describestreamconsumer.backoff.base
optional no 2000 Long The base backoff time (in milliseconds) between each describeStreamConsumer attempt.
scan.stream.describestreamconsumer.backoff.max
optional no 5000 Long The maximum backoff time (in milliseconds) between each describeStreamConsumer attempt.
scan.stream.describestreamconsumer.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each describeStreamConsumer attempt.
scan.stream.registerstreamconsumer.maxretries
optional no 10 Integer The maximum number of registerStream attempts if we get a recoverable exception.
scan.stream.registerstreamconsumer.timeout
optional no 60 Integer The maximum time in seconds to wait for a stream consumer to become active before giving up.
scan.stream.registerstreamconsumer.backoff.base
optional no 500 Long The base backoff time (in milliseconds) between each registerStream attempt.
scan.stream.registerstreamconsumer.backoff.max
optional no 2000 Long The maximum backoff time (in milliseconds) between each registerStream attempt.
scan.stream.registerstreamconsumer.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each registerStream attempt.
scan.stream.deregisterstreamconsumer.maxretries
optional no 10 Integer The maximum number of deregisterStream attempts if we get a recoverable exception.
scan.stream.deregisterstreamconsumer.timeout
optional no 60 Integer The maximum time in seconds to wait for a stream consumer to deregister before giving up.
scan.stream.deregisterstreamconsumer.backoff.base
optional no 500 Long The base backoff time (in milliseconds) between each deregisterStream attempt.
scan.stream.deregisterstreamconsumer.backoff.max
optional no 2000 Long The maximum backoff time (in milliseconds) between each deregisterStream attempt.
scan.stream.deregisterstreamconsumer.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each deregisterStream attempt.
scan.shard.subscribetoshard.maxretries
optional no 10 Integer The maximum number of subscribeToShard attempts if we get a recoverable exception.
scan.shard.subscribetoshard.backoff.base
optional no 1000 Long The base backoff time (in milliseconds) between each subscribeToShard attempt.
scan.shard.subscribetoshard.backoff.max
optional no 2000 Long The maximum backoff time (in milliseconds) between each subscribeToShard attempt.
scan.shard.subscribetoshard.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each subscribeToShard attempt.
scan.shard.getrecords.maxrecordcount
optional no 10000 Integer The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.
scan.shard.getrecords.maxretries
optional no 3 Integer The maximum number of getRecords attempts if we get a recoverable exception.
scan.shard.getrecords.backoff.base
optional no 300 Long The base backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getrecords.backoff.max
optional no 1000 Long The maximum backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getrecords.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each getRecords attempt.
scan.shard.getrecords.intervalmillis
optional no 200 Long The interval (in milliseconds) between each getRecords request to a AWS Kinesis shard in milliseconds.
scan.shard.getiterator.maxretries
optional no 3 Integer The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException.
scan.shard.getiterator.backoff.base
optional no 300 Long The base backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getiterator.backoff.max
optional no 1000 Long The maximum backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getiterator.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each getShardIterator attempt.
scan.shard.discovery.intervalmillis
optional no 10000 Integer The interval between each attempt to discover new shards.
scan.shard.adaptivereads
optional no false Boolean The config to turn on adaptive reads from a shard. See the AdaptivePollingRecordPublisher documentation for details.
scan.shard.idle.interval
optional no -1 Long The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.
shard.consumer.error.recoverable[0].exception
optional no (none) String User-specified Exception to retry indefinitely. Example value: `java.net.UnknownHostException`. This configuration is a zero-based array. As such, the specified exceptions must start with index 0. Specified exceptions must be valid Throwables in classpath, or connector will fail to initialize and fail fast.
scan.watermark.sync.interval
optional no 30000 Long The interval (in milliseconds) for periodically synchronizing the shared watermark state.
scan.watermark.lookahead.millis
optional no 0 Long The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.
scan.watermark.sync.queue.capacity
optional no 100 Integer The maximum number of records that will be buffered before suspending consumption of a shard.
Sink Options
sink.partitioner
optional yes random or row-based String Optional output partitioning from Flink's partitions into Kinesis shards. See Sink Partitioning for details.
sink.partitioner-field-delimiter
optional yes | String Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See Sink Partitioning for details.
sink.producer.*
optional no (none) Deprecated options previously used by the legacy connector. Options with equivalant alternatives in KinesisStreamsSink are matched to their respective properties. Unsupported options are logged out to user as warnings.
sink.http-client.max-concurrency
optional no 10000 Integer Maximum number of allowed concurrent requests by KinesisAsyncClient.
sink.http-client.read-timeout
optional no 360000 Integer Maximum amount of time in ms for requests to be sent by KinesisAsyncClient.
sink.http-client.protocol.version
optional no HTTP2 String Http version used by Kinesis Client.
sink.batch.max-size
optional yes 500 Integer Maximum batch size of elements to be passed to KinesisAsyncClient to be written downstream.
sink.requests.max-inflight
optional yes 16 Integer Request threshold for uncompleted requests by KinesisAsyncClientbefore blocking new write requests and applying backpressure.
sink.requests.max-buffered
optional yes 10000 String Request buffer threshold for buffered requests by KinesisAsyncClient before blocking new write requests and applying backpressure.
sink.flush-buffer.size
optional yes 5242880 Long Threshold value in bytes for writer buffer in KinesisAsyncClient before flushing.
sink.flush-buffer.timeout
optional yes 5000 Long Threshold time in milliseconds for an element to be in a buffer ofKinesisAsyncClient before flushing.
sink.fail-on-error
optional yes false Boolean Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.

Back to top