This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
MaxCompute
MaxCompute Connector #
MaxCompute connector can be used as the Data Sink of the pipeline, and write data to MaxCompute. This document describes how to set up the MaxCompute connector.
What can the connector do? #
- Create table automatically if not exist
- Schema change synchronization
- Data synchronization
Example #
The pipeline for reading data from MySQL and sink to MaxCompute can be defined as follows:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: maxcompute
name: MaxCompute Sink
accessId: ak
accessKey: sk
endpoint: endpoint
project: flink_cdc
bucketSize: 8
pipeline:
name: MySQL to MaxCompute Pipeline
parallelism: 2
Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | Specify what connector to use, here should be 'maxcompute' . |
name | optional | (none) | String | The name of the sink. |
accessId | required | (none) | String | AccessKey ID of Alibaba Cloud account or RAM user. You can enter AccessKey management page Obtain AccessKey ID. |
accessKey | required | (none) | String | AccessKey Secret corresponding to AccessKey ID. You can enter AccessKey management page Obtain AccessKey Secret. |
endpoint | required | (none) | String | The connection address for the MaxCompute service. You need to configure the Endpoint based on the region selected when creating the MaxCompute project and the network connection method. For values corresponding to each region and network, please refer to Endpoint. |
project | required | (none) | String | The name of the MaxCompute project. You can log in to the MaxCompute console and obtain the MaxCompute project name on the Workspace > Project Management page. |
tunnelEndpoint | optional | (none) | String | The connection address for the MaxCompute Tunnel service. Typically, this configuration can be auto-routed based on the region where the specified project is located. It is used only in special network environments such as when using a proxy. |
quotaName | optional | (none) | String | The name of the exclusive resource group for MaxCompute data transfer. If not specified, the shared resource group is used. For details, refer to Using exclusive resource groups for Maxcompute |
stsToken | optional | (none) | String | When using a temporary access token (STS Token) issued by a RAM role for authentication, this parameter must be specified. |
bucketsNum | optional | 16 | Integer | The number of buckets used when auto-creating MaxCompute Delta tables. For usage, refer to Delta Table Overview |
compressAlgorithm | optional | zlib | String | The data compression algorithm used when writing to MaxCompute. Currently supports raw (no compression), zlib , and snappy . |
totalBatchSize | optional | 64MB | String | The size of the data buffer in memory, by partition level (for non-partitioned tables, by table level). Buffers for different partitions (tables) are independent, and data is written to MaxCompute when the threshold is reached. |
bucketBatchSize | optional | 4MB | String | The size of the data buffer in memory, by bucket level. This is effective only when writing to Delta tables. Buffers for different data buckets are independent, and the bucket data is written to MaxCompute when the threshold is reached. |
numCommitThreads | optional | 16 | Integer | The number of partitions (tables) that can be processed simultaneously during the checkpoint stage. |
numFlushConcurrent | optional | 4 | Integer | The number of buckets that can be written to MaxCompute simultaneously. This is effective only when writing to Delta tables. |
Usage Instructions #
- The connector supports automatic table creation, automatically mapping the location relationship and data types between MaxCompute tables and source tables (see the mapping table below). When the source table has a primary key, a MaxCompute Delta table is automatically created; otherwise, a regular MaxCompute table (Append table) is created.
- When writing to a regular MaxCompute table (Append table), the delete operation will be ignored, and the update operation will be treated as an insert operation.
- Currently, only at-least-once is supported. Delta tables can achieve idempotent writes due to their primary key characteristics.
- For synchronization of table structure changes:
- A new column can only be added as the last column.
- Modifying a column type can only be changed to a compatible type. For compatible types, refer toALTER TABLE
Table Location Mapping #
When the connector automatically creates tables, it uses the following mapping relationship to map the location information of the source tables to the location of the MaxCompute tables. Note that when the MaxCompute project does not support the Schema model, each synchronization task can only synchronize one MySQL Database. (The same applies to other DataSources, the connector will ignore the TableId.namespace information)
Abstract in Flink CDC | MaxCompute Location | MySQL Location |
---|---|---|
project in the configuration file | project | (none) |
TableId.namespace | schema (Only when the MaxCompute project supports the Schema model. If not supported, this configuration will be ignored) | database |
TableId.tableName | table | table |
Data Type Mapping #
Flink Type | MaxCompute Type |
---|---|
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |