Amazon DynamoDB Connector #
The DynamoDB connector allows users to read/write from Amazon DynamoDB.
As a source, the connector allows users to read change data capture stream from DynamoDB tables using Amazon DynamoDB Streams.
As a sink, the connector allows users to write directly to Amazon DynamoDB tables using the BatchWriteItem API.
Dependency #
Apache Flink ships the connector for users to utilize.
To use the connector, add the following Maven dependency to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-dynamodb</artifactId>
<version>5.0.0-1.20</version>
</dependency>
Amazon DynamoDB Streams Source #
The DynamoDB Streams source reads from Amazon DynamoDB Streams using the AWS v2 SDK for Java. Follow the instructions from the AWS docs to set up and configure the change data capture stream.
The actual events streamed to the DynamoDB Stream depend on the StreamViewType
specified by the DynamoDB Stream itself.
See AWS docs for more information.
Usage #
The DynamoDbStreamsSource
provides a fluent builder to construct an instance of the DynamoDbStreamsSource
.
The code snippet below illustrates how to do so.
// Configure the DynamodbStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST
// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream.
DynamoDbStreamsSource<String> dynamoDbStreamsSource =
DynamoDbStreamsSource.<String>builder()
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
.setSourceConfig(sourceConfig)
// User must implement their own deserialization schema to translate change data capture events into custom data types
.setDeserializationSchema(dynamodbDeserializationSchema)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Specify watermarking strategy and the name of the DynamoDB Streams Source operator.
// Specify return type using TypeInformation.
// Specify UID of operator in line with Flink best practice.
DataStream<String> cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source")
.returns(TypeInformation.of(String.class))
.uid("custom-uid");
// Configure the DynamodbStreamsSource
val sourceConfig = new Configuration()
sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is optional, by default connector will read from LATEST
// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream.
val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]()
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
.setSourceConfig(sourceConfig)
// User must implement their own deserialization schema to translate change data capture events into custom data types
.setDeserializationSchema(dynamodbDeserializationSchema)
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// Specify watermarking strategy and the name of the DynamoDB Streams Source operator.
// Specify return type using TypeInformation.
// Specify UID of operator in line with Flink best practice.
val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source")
.uid("custom-uid")
The above is a simple example of using the DynamoDbStreamsSource
.
- The DynamoDB Stream being read from is specified using the stream ARN.
- Configuration for the
Source
is supplied using an instance of Flink’sConfiguration
class. The configuration keys can be taken fromAWSConfigOptions
(AWS-specific configuration) andDynamodbStreamsSourceConfigConstants
(DynamoDB Streams Source configuration). - The example specifies the starting position as
TRIM_HORIZON
(see Configuring Starting Position for more information). - The deserialization format is as
SimpleStringSchema
(see Deserialization Schema for more information). - The distribution of shards across subtasks is controlled using the
UniformShardAssigner
(see Shard Assignment Strategy for more information). - The example also specifies an increasing
WatermarkStrategy
, which means each record will be tagged with event time specified usingapproximateCreationDateTime
. Monotonically increasing watermarks will be generated, and subtasks will be considered idle if no record is emitted after 1 second.
Configuring Starting Position #
To specify the starting position of the DynamodbStreamsSource
, users can set the DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION
in configuration.
LATEST
: read all shards of the stream starting from the latest record.TRIM_HORIZON
: read all shards of the stream starting from the earliest record possible (data is trimmed by DynamoDB after 24 hours).
Deserialization Schema #
The DynamoDbStreamsSource
provides the DynamoDbStreamsDeserializationSchema<T>
interface to allow users to implement their own
deserialization schema to convert DynamoDB change data capture events into custom event types.
The DynamoDbStreamsDeserializationSchema<T>#deserialize
method takes in an instance of Record
from the DynamoDB model.
The Record
can contain different content, depending on the configuration of the DynamoDB Stream. See AWS docs for more information.
Event Ordering #
Events are written into DynamoDB Streams, maintaining ordering within the same primary key. This is done by ensuring that events within the same primary key are written to the same shard lineage. When there are shard splits (one parent shard splitting into two child shards), the ordering will be maintained as long as the parent shard is read completely before starting to read from the child shards.
The DynamoDbStreamsSource
ensures that shards are assigned in a manner that respects parent-child shard ordering.
This means that the shard will only be passed to the shard assigner if the parent shard has been completely read.
This helps to ensure that the events from the change data capture stream are read in-order within the same DynamoDB primary key.
Shard Assignment Strategy #
The UniformShardAssigner
allocates the shards of the DynamoDB Stream evenly across the parallel subtasks of the source operator.
DynamoDB Stream shards are ephemeral and are created and deleted automatically, as required.
The UniformShardAssigner
allocates new shards to the subtask with the lowest number of currently allocated shards.
Users can also implement their own shard assignment strategy by implementing the DynamoDbStreamsShardAssigner
interface.
Configuration #
Retry Strategy #
The DynamoDbStreamsSource
interacts with Amazon DynamoDB using the AWS v2 SDK for Java.
The retry strategy used by the AWS SDK client can be tuned using the following configuration options:
DYNAMODB_STREAMS_RETRY_COUNT
: Maximum number of API retries on retryable errors, before it will restart the Flink job.DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY
: The base delay used for calculation of the exponential backoff.DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY
: The maximum delay for exponential backoff.
Shard Discovery #
The DynamoDbStreamsSource
periodically discovers newly created shards on the DynamoDB Stream. This can come from shard splitting, or shard rotations.
By default this is set to discover shards every 60 seconds. However, users can customize this to a smaller value by configuring the SHARD_DISCOVERY_INTERVAL
.
There is an issue for shard discovery where the shard graph returned from DynamoDB might have inconsistencies.
In this case, the DynamoDbStreamsSource
automatically detects the inconsistency and retries the shard discovery process.
The maximum number of retries can be configured using DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT
.
Amazon DynamoDB Sink #
The DynamoDB sink writes to Amazon DynamoDB using the AWS v2 SDK for Java. Follow the instructions from the Amazon DynamoDB Developer Guide to setup a table.
Properties sinkProperties = new Properties();
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
// Optional, provide via alternative routes e.g. environment variables
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new CustomElementConverter();
DynamoDbSink<String> dynamoDbSink =
DynamoDbSink.<InputType>builder()
.setDynamoDbProperties(sinkProperties) // Required
.setTableName("my-dynamodb-table") // Required
.setElementConverter(elementConverter) // Required
.setOverwriteByPartitionKeys(singletonList("key")) // Optional
.setFailOnError(false) // Optional
.setMaxBatchSize(25) // Optional
.setMaxInFlightRequests(50) // Optional
.setMaxBufferedRequests(10_000) // Optional
.setMaxTimeInBufferMS(5000) // Optional
.build();
flinkStream.sinkTo(dynamoDbSink);
val sinkProperties = new Properties()
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
// Optional, provide via alternative routes e.g. environment variables
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
val elementConverter = new CustomElementConverter();
val dynamoDbSink =
DynamoDbSink.<InputType>builder()
.setDynamoDbProperties(sinkProperties) // Required
.setTableName("my-dynamodb-table") // Required
.setElementConverter(elementConverter) // Required
.setOverwriteByPartitionKeys(singletonList("key")) // Optional
.setFailOnError(false) // Optional
.setMaxBatchSize(25) // Optional
.setMaxInFlightRequests(50) // Optional
.setMaxBufferedRequests(10_000) // Optional
.setMaxTimeInBufferMS(5000) // Optional
.build()
flinkStream.sinkTo(dynamoDbSink)
Configurations #
Flink’s DynamoDB sink is created by using the static builder DynamoDBSink.<InputType>builder()
.
- setDynamoDbProperties(Properties sinkProperties)
- Required.
- Supplies credentials, region and other parameters to the DynamoDB client.
- setTableName(String tableName)
- Required.
- Name of the table to sink to.
- setElementConverter(ElementConverter<InputType, DynamoDbWriteRequest> elementConverter)
- Required.
- Converts generic records of type
InputType
toDynamoDbWriteRequest
.
- setOverwriteByPartitionKeys(List
partitionKeys) - Optional. Default: [].
- Used to deduplicate write requests within each batch pushed to DynamoDB.
- setFailOnError(boolean failOnError)
- Optional. Default:
false
. - Whether failed requests to write records are treated as fatal exceptions in the sink.
- Optional. Default:
- setMaxBatchSize(int maxBatchSize)
- Optional. Default:
25
. - Maximum size of a batch to write.
- Optional. Default:
- setMaxInFlightRequests(int maxInFlightRequests)
- Optional. Default:
50
. - The maximum number of in flight requests allowed before the sink applies backpressure.
- Optional. Default:
- setMaxBufferedRequests(int maxBufferedRequests)
- Optional. Default:
10_000
. - The maximum number of records that may be buffered in the sink before backpressure is applied.
- Optional. Default:
- setMaxBatchSizeInBytes(int maxBatchSizeInBytes)
- N/A.
- This configuration is not supported, see FLINK-29854.
- setMaxTimeInBufferMS(int maxTimeInBufferMS)
- Optional. Default:
5000
. - The maximum time a record may stay in the sink before being flushed.
- Optional. Default:
- setMaxRecordSizeInBytes(int maxRecordSizeInBytes)
- N/A.
- This configuration is not supported, see FLINK-29854.
- build()
- Constructs and returns the DynamoDB sink.
Element Converter #
An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided
DefaultDynamoDbElementConverter
which extracts item schema from element class, this requires the element class to be of composite type (i.e. Pojo, Tuple or Row). In case TypeInformation of the elements is present the schema is eagerly constructed by using DynamoDbTypeInformedElementConverter
as in new DynamoDbTypeInformedElementConverter(TypeInformation.of(MyPojo.class))
.
Alternatively when you are working with @DynamoDbBean
objects you can use DynamoDbBeanElementConverter
. For more information on supported
annotations see here.
A sample application using a custom ElementConverter
can be found here. A sample application using the DynamoDbBeanElementConverter
can be found here.
Using Custom DynamoDB Endpoints #
It is sometimes desirable to have Flink operate as a consumer or producer against a DynamoDB VPC endpoint or a non-AWS DynamoDB endpoint such as Localstack; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
To override the AWS endpoint, set the AWSConfigConstants.AWS_ENDPOINT
and AWSConfigConstants.AWS_REGION
properties. The region will be used to sign the endpoint URL.
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")