MaxCompute
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

MaxCompute Connector #

MaxCompute connector can be used as the Data Sink of the pipeline, and write data to MaxCompute. This document describes how to set up the MaxCompute connector.

What can the connector do? #

  • Create table automatically if not exist
  • Schema change synchronization
  • Data synchronization

Example #

The pipeline for reading data from MySQL and sink to MaxCompute can be defined as follows:

source:
  type: mysql
  name: MySQL Source
  hostname: 127.0.0.1
  port: 3306
  username: admin
  password: pass
  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
  server-id: 5401-5404

sink:
  type: maxcompute
  name: MaxCompute Sink
  accessId: ak
  accessKey: sk
  endpoint: endpoint
  project: flink_cdc
  bucketSize: 8

pipeline:
  name: MySQL to MaxCompute Pipeline
  parallelism: 2

Connector Options #

Option Required Default Type Description
type required (none) String Specify what connector to use, here should be 'maxcompute'.
name optional (none) String The name of the sink.
accessId required (none) String AccessKey ID of Alibaba Cloud account or RAM user. You can enter AccessKey management page Obtain AccessKey ID.
accessKey required (none) String AccessKey Secret corresponding to AccessKey ID. You can enter AccessKey management page Obtain AccessKey Secret.
endpoint required (none) String The connection address for the MaxCompute service. You need to configure the Endpoint based on the region selected when creating the MaxCompute project and the network connection method. For values corresponding to each region and network, please refer to Endpoint.
project required (none) String The name of the MaxCompute project. You can log in to the MaxCompute console and obtain the MaxCompute project name on the Workspace > Project Management page.
tunnelEndpoint optional (none) String The connection address for the MaxCompute Tunnel service. Typically, this configuration can be auto-routed based on the region where the specified project is located. It is used only in special network environments such as when using a proxy.
quotaName optional (none) String The name of the exclusive resource group for MaxCompute data transfer. If not specified, the shared resource group is used. For details, refer to Using exclusive resource groups for Maxcompute
stsToken optional (none) String When using a temporary access token (STS Token) issued by a RAM role for authentication, this parameter must be specified.
bucketsNum optional 16 Integer The number of buckets used when auto-creating MaxCompute Delta tables. For usage, refer to Delta Table Overview
compressAlgorithm optional zlib String The data compression algorithm used when writing to MaxCompute. Currently supports raw (no compression), zlib, and snappy.
totalBatchSize optional 64MB String The size of the data buffer in memory, by partition level (for non-partitioned tables, by table level). Buffers for different partitions (tables) are independent, and data is written to MaxCompute when the threshold is reached.
bucketBatchSize optional 4MB String The size of the data buffer in memory, by bucket level. This is effective only when writing to Delta tables. Buffers for different data buckets are independent, and the bucket data is written to MaxCompute when the threshold is reached.
numCommitThreads optional 16 Integer The number of partitions (tables) that can be processed simultaneously during the checkpoint stage.
numFlushConcurrent optional 4 Integer The number of buckets that can be written to MaxCompute simultaneously. This is effective only when writing to Delta tables.

Usage Instructions #

  • The connector supports automatic table creation, automatically mapping the location relationship and data types between MaxCompute tables and source tables (see the mapping table below). When the source table has a primary key, a MaxCompute Delta table is automatically created; otherwise, a regular MaxCompute table (Append table) is created.
  • When writing to a regular MaxCompute table (Append table), the delete operation will be ignored, and the update operation will be treated as an insert operation.
  • Currently, only at-least-once is supported. Delta tables can achieve idempotent writes due to their primary key characteristics.
  • For synchronization of table structure changes:
    • A new column can only be added as the last column.
    • Modifying a column type can only be changed to a compatible type. For compatible types, refer toALTER TABLE

Table Location Mapping #

When the connector automatically creates tables, it uses the following mapping relationship to map the location information of the source tables to the location of the MaxCompute tables. Note that when the MaxCompute project does not support the Schema model, each synchronization task can only synchronize one MySQL Database. (The same applies to other DataSources, the connector will ignore the TableId.namespace information)

Abstract in Flink CDC MaxCompute Location MySQL Location
project in the configuration file project (none)
TableId.namespace schema (Only when the MaxCompute project supports the Schema model. If not supported, this configuration will be ignored) database
TableId.tableName table table

Data Type Mapping #

Flink Type MaxCompute Type
CHAR/VARCHAR STRING
BOOLEAN BOOLEAN
BINARY/VARBINARY BINARY
DECIMAL DECIMAL
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
TIME_WITHOUT_TIME_ZONE STRING
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE TIMESTAMP_NTZ
TIMESTAMP_WITH_LOCAL_TIME_ZONE TIMESTAMP
TIMESTAMP_WITH_TIME_ZONE TIMESTAMP
ARRAY ARRAY
MAP MAP
ROW STRUCT

Back to top