DynamoDB

Amazon DynamoDB SQL Connector #

Sink: Batch Sink: Streaming Append & Upsert Mode

The DynamoDB connector allows for writing data into Amazon DynamoDB.

Dependencies #

Maven dependency SQL Client
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-dynamodb</artifactId>
      <version>5.0.0-1.19</version>
    </dependency>
Download

How to create a DynamoDB table #

Follow the instructions from the Amazon DynamoDB Developer Guide to set up a DynamoDB table. The following example shows how to create a table backed by a DynamoDB table with minimum required options:

CREATE TABLE DynamoDbTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING
)
WITH (
  'connector' = 'dynamodb',
  'table-name' = 'user_behavior',
  'aws.region' = 'us-east-2'
);

Connector Options #

Option Required Default Type Description
Common Options
connector
required (none) String Specify what connector to use. For DynamoDB use 'dynamodb'.
table-name
required (none) String Name of the DynamoDB table to use.
aws.region
required (none) String The AWS region where the DynamoDB table is defined.
aws.endpoint
optional (none) String The AWS endpoint for DynamoDB.
aws.trust.all.certificates
optional false Boolean If true accepts all SSL certificates.
Authentication Options
aws.credentials.provider
optional AUTO String A credentials provider to use when authenticating against the Kinesis endpoint. See Authentication for details.
aws.credentials.basic.accesskeyid
optional (none) String The AWS access key ID to use when setting credentials provider type to BASIC.
aws.credentials.basic.secretkey
optional (none) String The AWS secret key to use when setting credentials provider type to BASIC.
aws.credentials.profile.path
optional (none) String Optional configuration for profile path if credential provider type is set to be PROFILE.
aws.credentials.profile.name
optional (none) String Optional configuration for profile name if credential provider type is set to be PROFILE.
aws.credentials.role.arn
optional (none) String The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.sessionName
optional (none) String The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
aws.credentials.role.externalId
optional (none) String The external ID to use when credential provider type is set to ASSUME_ROLE.
aws.credentials.role.provider
optional (none) String The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE
aws.credentials.webIdentityToken.file
optional (none) String The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.
aws.credentials.custom.class
required only if credential provider is set to CUSTOM (none) String The full path (in Java package notation) to the user provided class to use if credential provider type is set to be CUSTOM e.g. org.user_company.auth.CustomAwsCredentialsProvider.
Sink Options
sink.batch.max-size
optional 25 Integer Maximum batch size of elements to be written to DynamoDB.
sink.requests.max-inflight
optional 50 Integer Maximum number of parallel batch requests to DynamoDB.
sink.requests.max-buffered
optional 10000 String Size of input buffer before applying backpressure to upstream job graph
sink.flush-buffer.timeout
optional 5000 Long Threshold time in ms for an element to be in a buffer before flushing.
sink.fail-on-error
optional false Boolean Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.
sink.ignore-nulls
optional false Boolean Determines whether null values should be ignored in the sink. If set to true, null values are excluded from processing.
HTTP Client Options
sink.http-client.max-concurrency
optional 10000 Integer Maximum number of allowed concurrent requests by the HTTP client.
sink.http-client.read-timeout
optional 360000 Integer Timeout for each read to the underlying socket.

Authorization #

Make sure to create an appropriate IAM policy to allow writing to the DynamoDB table.

Authentication #

Depending on your deployment you would choose an appropriate Credentials Provider to allow access to DynamoDB. By default, the AUTO Credentials Provider is used. If the access key ID and secret key are set in the deployment configuration, this results in using the BASIC provider.

A specific AWSCredentialsProvider can be optionally set using the aws.credentials.provider setting. Supported values are:

  • AUTO - Use the default AWS Credentials Provider chain that searches for credentials in the following order: ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE, and EC2/ECS credentials provider.
  • BASIC - Use access key ID and secret key supplied as configuration.
  • ENV_VAR - Use AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY environment variables.
  • SYS_PROP - Use Java system properties aws.accessKeyId and aws.secretKey.
  • PROFILE - Use an AWS credentials profile to create the AWS credentials.
  • ASSUME_ROLE - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
  • WEB_IDENTITY_TOKEN - Create AWS credentials by assuming a role using Web Identity Token.
  • CUSTOM - Provide a custom class that implements the interface AWSCredentialsProvider and has a constructor MyCustomClass(java.util.Properties config). All connector properties will be passed down to this custom credential provider class via the constructor.

Sink Partitioning #

The DynamoDB sink supports client side deduplication of data via the PARTITIONED BY clause. You can specify a list of partition keys, the sink will only send the latest record for each composite key within a batch. For example:

CREATE TABLE DynamoDbTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING
) PARTITIONED BY ( user_id )
WITH (
  'connector' = 'dynamodb',
  'table-name' = 'user_behavior',
  'aws.region' = 'us-east-2'
);

Notice #

The current implementation of the DynamoDB SQL connector is write-only and doesn’t provide an implementation for source queries. Queries similar to:

SELECT * FROM DynamoDbTable;

should result in an error similar to

Connector dynamodb can only be used as a sink. It cannot be used as a source.

Back to top