Kinesis

Amazon Kinesis Data Streams SQL Connector #

Scan Source: Unbounded Sink: Batch Sink: Streaming Append Mode

The Kinesis connector allows for reading data from and writing data into Amazon Kinesis Data Streams (KDS).

Dependencies #

There is no connector (yet) available for Flink version 1.20.

The Kinesis connector is not part of the binary distribution. See how to link with it for cluster execution here.

How to create a Kinesis data stream table #

Follow the instructions from the Amazon KDS Developer Guide to set up a Kinesis stream. The following example shows how to create a table backed by a Kinesis data stream:

CREATE TABLE KinesisTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
  'connector' = 'kinesis',
  'stream' = 'user_behavior',
  'aws.region' = 'us-east-2',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);

Available Metadata #

The following metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key Data Type Description
timestamp TIMESTAMP_LTZ(3) NOT NULL The approximate time when the record was inserted into the stream.
shard-id VARCHAR(128) NOT NULL The unique identifier of the shard within the stream from which the record was read.
sequence-number VARCHAR(128) NOT NULL The unique identifier of the record within its shard.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE KinesisTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3),
  `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
  `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
  `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
)
PARTITIONED BY (user_id, item_id)
WITH (
  'connector' = 'kinesis',
  'stream' = 'user_behavior',
  'aws.region' = 'us-east-2',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);

Connector Options #

Option Required Forwarded Default Type Description
Common Options
connector
required no (none) String Specify what connector to use. For Kinesis use 'kinesis'.
stream
required yes (none) String Name of the Kinesis data stream backing this table.
format
required no (none) String The format used to deserialize and serialize Kinesis data stream records. See Data Type Mapping for details.
aws.region
optional no (none) String The AWS region where the stream is defined. Either this or aws.endpoint are required.
aws.endpoint
optional no (none) String The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or aws.region are required.
aws.trust.all.certificates
optional no false Boolean If true accepts all SSL certificates.
Authentication Options
aws.credentials.provider
optional no AUTO String A credentials provider to use when authenticating against the Kinesis endpoint. See Authentication for details.
aws.credentials.basic.accesskeyid
optional no (none) String The AWS access key ID to use when setting credentials provider type to BASIC.
aws.credentials.basic.secretkey
optional no (none) String The AWS secret key to use when setting credentials provider type to BASIC.
aws.credentials.profile.path
optional no (none) String Optional configuration for profile path if credential provider type is set to be PROFILE.
aws.credentials.profile.name
optional no (none) String Optional configuration for profile name if credential provider type is set to be PROFILE.
aws.credentials.role.arn
optional no (none) String The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.sessionName
optional no (none) String The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.externalId
optional no (none) String The external ID to use when credential provider type is set to ASSUME_ROLE.
aws.credentials.role.stsEndpoint
optional no (none) String The AWS endpoint for STS (derived from the AWS region setting if not set) to use when credential provider type is set to ASSUME_ROLE.
aws.credentials.role.provider
optional no (none) String The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE
aws.credentials.webIdentityToken.file
optional no (none) String The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.
aws.credentials.custom.class
required only if credential provider is set to CUSTOM no (none) String The full path (in Java package notation) to the user provided class to use if credential provider type is set to be CUSTOM e.g. org.user_company.auth.CustomAwsCredentialsProvider.
Source Options
scan.stream.initpos
optional no LATEST String Initial position to be used when reading from the table. See Start Reading Position for details.
scan.stream.initpos-timestamp
optional no (none) String The initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.
scan.stream.initpos-timestamp-format
optional no yyyy-MM-dd'T'HH:mm:ss.SSSXXX String The date format of initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.
scan.stream.recordpublisher
optional no POLLING String The RecordPublisher type to use for sources. See Enhanced Fan-Out for details.
scan.stream.efo.consumername
optional no (none) String The name of the EFO consumer to register with KDS. See Enhanced Fan-Out for details.
scan.stream.efo.registration
optional no LAZY String Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). See Enhanced Fan-Out for details.
scan.stream.efo.consumerarn
optional no (none) String The prefix of consumer ARN for a given stream. See Enhanced Fan-Out for details.
scan.stream.efo.http-client.max-concurrency
optional no 10000 Integer Maximum number of allowed concurrent requests for the EFO client. See Enhanced Fan-Out for details.
scan.shard-assigner
optional no default String The shard assigner used to map shards to Flink subtasks (default|uniform). You can also supply your own shard assigner via the Java Service Provider Interfaces (SPI).
scan.stream.describe.maxretries
optional no 50 Integer The maximum number of describeStream attempts if we get a recoverable exception.
scan.stream.describe.backoff.base
optional no 2000 Long The base backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams).
scan.stream.describe.backoff.max
optional no 5000 Long The maximum backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams).
scan.stream.describe.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each describeStream attempt (for consuming from DynamoDB streams).
scan.list.shards.maxretries
optional no 10 Integer The maximum number of listShards attempts if we get a recoverable exception.
scan.list.shards.backoff.base
optional no 1000 Long The base backoff time (in milliseconds) between each listShards attempt.
scan.list.shards.backoff.max
optional no 5000 Long The maximum backoff time (in milliseconds) between each listShards attempt.
scan.list.shards.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each listShards attempt.
scan.stream.describestreamconsumer.maxretries
optional no 50 Integer The maximum number of describeStreamConsumer attempts if we get a recoverable exception.
scan.stream.describestreamconsumer.backoff.base
optional no 2000 Long The base backoff time (in milliseconds) between each describeStreamConsumer attempt.
scan.stream.describestreamconsumer.backoff.max
optional no 5000 Long The maximum backoff time (in milliseconds) between each describeStreamConsumer attempt.
scan.stream.describestreamconsumer.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each describeStreamConsumer attempt.
scan.stream.registerstreamconsumer.maxretries
optional no 10 Integer The maximum number of registerStream attempts if we get a recoverable exception.
scan.stream.registerstreamconsumer.timeout
optional no 60 Integer The maximum time in seconds to wait for a stream consumer to become active before giving up.
scan.stream.registerstreamconsumer.backoff.base
optional no 500 Long The base backoff time (in milliseconds) between each registerStream attempt.
scan.stream.registerstreamconsumer.backoff.max
optional no 2000 Long The maximum backoff time (in milliseconds) between each registerStream attempt.
scan.stream.registerstreamconsumer.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each registerStream attempt.
scan.stream.deregisterstreamconsumer.maxretries
optional no 10 Integer The maximum number of deregisterStream attempts if we get a recoverable exception.
scan.stream.deregisterstreamconsumer.timeout
optional no 60 Integer The maximum time in seconds to wait for a stream consumer to deregister before giving up.
scan.stream.deregisterstreamconsumer.backoff.base
optional no 500 Long The base backoff time (in milliseconds) between each deregisterStream attempt.
scan.stream.deregisterstreamconsumer.backoff.max
optional no 2000 Long The maximum backoff time (in milliseconds) between each deregisterStream attempt.
scan.stream.deregisterstreamconsumer.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each deregisterStream attempt.
scan.shard.subscribetoshard.maxretries
optional no 10 Integer The maximum number of subscribeToShard attempts if we get a recoverable exception.
scan.shard.subscribetoshard.backoff.base
optional no 1000 Long The base backoff time (in milliseconds) between each subscribeToShard attempt.
scan.shard.subscribetoshard.backoff.max
optional no 2000 Long The maximum backoff time (in milliseconds) between each subscribeToShard attempt.
scan.shard.subscribetoshard.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each subscribeToShard attempt.
scan.shard.getrecords.maxrecordcount
optional no 10000 Integer The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.
scan.shard.getrecords.maxretries
optional no 3 Integer The maximum number of getRecords attempts if we get a recoverable exception.
scan.shard.getrecords.backoff.base
optional no 300 Long The base backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getrecords.backoff.max
optional no 1000 Long The maximum backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getrecords.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each getRecords attempt.
scan.shard.getrecords.intervalmillis
optional no 200 Long The interval (in milliseconds) between each getRecords request to a AWS Kinesis shard in milliseconds.
scan.shard.getiterator.maxretries
optional no 3 Integer The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException.
scan.shard.getiterator.backoff.base
optional no 300 Long The base backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getiterator.backoff.max
optional no 1000 Long The maximum backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException.
scan.shard.getiterator.backoff.expconst
optional no 1.5 Double The power constant for exponential backoff between each getShardIterator attempt.
scan.shard.discovery.intervalmillis
optional no 10000 Integer The interval between each attempt to discover new shards.
scan.shard.adaptivereads
optional no false Boolean The config to turn on adaptive reads from a shard. See the AdaptivePollingRecordPublisher documentation for details.
scan.shard.idle.interval
optional no -1 Long The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.
scan.watermark.sync.interval
optional no 30000 Long The interval (in milliseconds) for periodically synchronizing the shared watermark state.
scan.watermark.lookahead.millis
optional no 0 Long The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.
scan.watermark.sync.queue.capacity
optional no 100 Integer The maximum number of records that will be buffered before suspending consumption of a shard.
Sink Options
sink.partitioner
optional yes random or row-based String Optional output partitioning from Flink's partitions into Kinesis shards. See Sink Partitioning for details.
sink.partitioner-field-delimiter
optional yes | String Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See Sink Partitioning for details.
sink.producer.*
optional no (none) Deprecated options previously used by the legacy connector. Options with equivalant alternatives in KinesisStreamsSink are matched to their respective properties. Unsupported options are logged out to user as warnings.
sink.http-client.max-concurrency
optional no 10000 Integer Maximum number of allowed concurrent requests by KinesisAsyncClient.
sink.http-client.read-timeout
optional no 360000 Integer Maximum amount of time in ms for requests to be sent by KinesisAsyncClient.
sink.http-client.protocol.version
optional no HTTP2 String Http version used by Kinesis Client.
sink.batch.max-size
optional yes 500 Integer Maximum batch size of elements to be passed to KinesisAsyncClient to be written downstream.
sink.requests.max-inflight
optional yes 16 Integer Request threshold for uncompleted requests by KinesisAsyncClientbefore blocking new write requests and applying backpressure.
sink.requests.max-buffered
optional yes 10000 String Request buffer threshold for buffered requests by KinesisAsyncClient before blocking new write requests and applying backpressure.
sink.flush-buffer.size
optional yes 5242880 Long Threshold value in bytes for writer buffer in KinesisAsyncClient before flushing.
sink.flush-buffer.timeout
optional yes 5000 Long Threshold time in milliseconds for an element to be in a buffer ofKinesisAsyncClient before flushing.
sink.fail-on-error
optional yes false Boolean Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.

Features #

Authorization #

Make sure to create an appropriate IAM policy to allow reading from / writing to the Kinesis data streams.

Authentication #

Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. By default, the AUTO Credentials Provider is used. If the access key ID and secret key are set in the deployment configuration, this results in using the BASIC provider.

A specific AWSCredentialsProvider can be optionally set using the aws.credentials.provider setting. Supported values are:

  • AUTO - Use the default AWS Credentials Provider chain that searches for credentials in the following order: ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE, and EC2/ECS credentials provider.
  • BASIC - Use access key ID and secret key supplied as configuration.
  • ENV_VAR - Use AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY environment variables.
  • SYS_PROP - Use Java system properties aws.accessKeyId and aws.secretKey.
  • PROFILE - Use an AWS credentials profile to create the AWS credentials.
  • ASSUME_ROLE - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
  • WEB_IDENTITY_TOKEN - Create AWS credentials by assuming a role using Web Identity Token.
  • CUSTOM - Provide a custom class that implements the interface AWSCredentialsProvider and has a constructor MyCustomClass(java.util.Properties config). All connector properties will be passed down to this custom credential provider class via the constructor.

Start Reading Position #

You can configure table sources to start reading a table-backing Kinesis data stream from a specific position through the scan.stream.initpos option. Available values are:

  • LATEST: read shards starting from the latest record.
  • TRIM_HORIZON: read shards starting from the earliest record possible (data may be trimmed by Kinesis depending on the current retention settings of the backing stream).
  • AT_TIMESTAMP: read shards starting from a specified timestamp. The timestamp value should be specified through the scan.stream.initpos-timestamp in one of the following formats:
    • A non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, 1459799926.480).
    • A value conforming to a user-defined SimpleDateFormat specified at scan.stream.initpos-timestamp-format. If a user does not define a format, the default pattern will be yyyy-MM-dd'T'HH:mm:ss.SSSXXX. For example, timestamp value is 2016-04-04 and user-defined format is yyyy-MM-dd, or timestamp value is 2016-04-04T19:58:46.480-00:00 and a user-defined format is not provided.

Sink Partitioning #

Kinesis data streams consist of one or more shards, and the sink.partitioner option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards. Valid values are:

  • fixed: Kinesis PartitionKey values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime).
  • random: Kinesis PartitionKey values are assigned randomly. This is the default value for tables not defined with a PARTITION BY clause.
  • Custom FixedKinesisPartitioner subclass: e.g. 'org.mycompany.MyPartitioner'.
Records written into tables defining a PARTITION BY clause will always be partitioned based on a concatenated projection of the PARTITION BY fields. In this case, the sink.partitioner field cannot be used to modify this behavior (attempting to do this results in a configuration error). You can, however, use the sink.partitioner-field-delimiter option to set the delimiter of field values in the concatenated PartitionKey string (an empty string is also a valid delimiter).

Enhanced Fan-Out #

Enhanced Fan-Out (EFO) increases the maximum number of concurrent consumers per Kinesis data stream. Without EFO, all concurrent Kinesis consumers share a single read quota per shard. Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers.

Note Using EFO will incur additional cost.

You can enable and configure EFO with the following properties:

  • scan.stream.recordpublisher: Determines whether to use EFO or POLLING.
  • scan.stream.efo.consumername: A name to identify the consumer when the above value is EFO.
  • scan.stream.efo.registration: Strategy for (de-)registration of EFO consumers with the name given by the scan.stream.efo.consumername value. Valid strategies are:
    • LAZY (default): Stream consumers are registered when the Flink job starts running. If the stream consumer already exists, it will be reused. This is the preferred strategy for the majority of applications. However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN. For jobs with very large parallelism this can result in an increased start-up time. The DescribeStreamConsumer operation has a limit of 20 transactions per second, this means application startup time will increase by roughly parallelism/20 seconds.
    • EAGER: Stream consumers are registered in the FlinkKinesisConsumer constructor. If the stream consumer already exists, it will be reused. This will result in registration occurring when the job is constructed, either on the Flink Job Manager or client environment submitting the job. Using this strategy results in a single thread registering and retrieving the stream consumer ARN, reducing startup time over LAZY (with large parallelism). However, consider that the client environment will require access to the AWS services.
    • NONE: Stream consumer registration is not performed by FlinkKinesisConsumer. Registration must be performed externally using the AWS CLI or SDK to invoke RegisterStreamConsumer. Stream consumer ARNs should be provided to the job via the consumer configuration.
  • scan.stream.efo.consumerarn.<stream-name>: ARNs identifying externally registered ARN-consumers (substitute <stream-name> with the name of your stream in the parameter name). Use this if you choose to use NONE as a scan.stream.efo.registration strategy.

Note For a given Kinesis data stream, each EFO consumer must have a unique name. However, consumer names do not have to be unique across data streams. Reusing a consumer name will result in existing subscriptions being terminated.

Note With the LAZY strategy, stream consumers are de-registered when the job is shutdown gracefully. In the event that a job terminates without executing the shutdown hooks, stream consumers will remain active. In this situation the stream consumers will be gracefully reused when the application restarts. With the NONE and EAGER strategies, stream consumer de-registration is not performed by FlinkKinesisConsumer.

Data Type Mapping #

Kinesis stores records as Base64-encoded binary data objects, so it doesn’t have a notion of internal record structure. Instead, Kinesis records are deserialized and serialized by formats, e.g. ‘avro’, ‘csv’, or ‘json’. To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the format keyword. Please refer to the Formats pages for more details.

Updates in 1.15 #

Kinesis table API connector sink data stream depends on FlinkKinesisProducer till 1.14, with the introduction of KinesisStreamsSink in 1.15 kinesis table API sink connector has been migrated to the new KinesisStreamsSink. Authentication options have been migrated identically while sink configuration options are now compatible with KinesisStreamsSink.

Options configuring FlinkKinesisProducer are now deprecated with fallback support for common configuration options with KinesisStreamsSink.

KinesisStreamsSink uses KinesisAsyncClient to send records to kinesis, which doesn’t support aggregation. In consequence, table options configuring aggregation in the deprecated FlinkKinesisProducer are now deprecated and will be ignored, this includes sink.producer.aggregation-enabled and sink.producer.aggregation-count.

Note Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users.

Kinesis table API source connector still depends on FlinkKinesisConsumer with no change in configuration options.

Back to top