This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
MaxCompute
MaxCompute Connector #
MaxCompute Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入MaxCompute。 本文档介绍如何设置 MaxCompute Pipeline 连接器。
连接器的功能 #
- 自动建表
- 表结构变更同步
- 数据实时同步
示例 #
从 MySQL 读取数据同步到 MaxCompute 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: maxcompute
name: MaxCompute Sink
accessId: ak
accessKey: sk
endpoint: endpoint
project: flink_cdc
bucketSize: 8
pipeline:
name: MySQL to MaxCompute Pipeline
parallelism: 2
连接器配置项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'maxcompute' . |
name | optional | (none) | String | Sink 的名称. |
accessId | required | (none) | String | 阿里云账号或RAM用户的AccessKey ID。您可以进入 AccessKey管理页面 获取AccessKey ID。 |
accessKey | required | (none) | String | AccessKey ID对应的AccessKey Secret。您可以进入 AccessKey管理页面 获取AccessKey Secret。 |
endpoint | required | (none) | String | MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见 Endpoint。 |
project | required | (none) | String | MaxCompute项目名称。您可以登录 MaxCompute控制台,在 工作区 > 项目管理 页面获取MaxCompute项目名称。 |
tunnelEndpoint | optional | (none) | String | MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的project所在的region进行自动路由。仅在使用代理等特殊网络环境下使用该配置。 |
quotaName | optional | (none) | String | MaxCompute 数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参考 使用 Maxcompute 独享资源组 |
stsToken | optional | (none) | String | 当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。 |
bucketsNum | optional | 16 | Integer | 自动创建 MaxCompute Delta 表时使用的桶数。使用方式可以参考 Delta Table 概述 |
compressAlgorithm | optional | zlib | String | 写入MaxCompute时使用的数据压缩算法,当前支持raw (不进行压缩),zlib 和snappy 。 |
totalBatchSize | optional | 64MB | String | 内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。 |
bucketBatchSize | optional | 4MB | String | 内存中缓冲的数据量大小,单位为桶级,仅写入 Delta 表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。 |
numCommitThreads | optional | 16 | Integer | checkpoint阶段,能够同时处理的分区(表)数量。 |
numFlushConcurrent | optional | 4 | Integer | 写入数据到MaxCompute时,能够同时写入的桶数量。仅写入 Delta 表时生效。 |
使用说明 #
- 链接器 支持自动建表,将MaxCompute表与源表的位置关系、数据类型进行自动映射(参见下文映射表),当源表有主键时,自动创建 MaxCompute Delta 表,否则创建普通 MaxCompute 表(Append表)
- 当写入普通 MaxCompute 表(Append表)时,会忽略
delete
操作,update
操作会被视为insert
操作 - 目前仅支持 at-least-once,Delta 表由于主键特性能够实现幂等写
- 对于表结构变更同步
- 新增列只能添加到最后一列
- 修改列类型,只能修改为兼容的类型。兼容表可以参考ALTER TABLE
表位置映射 #
链接器自动建表时,使用如下映射关系,将源表的位置信息映射到MaxCompute表的位置。注意,当MaxCompute项目不支持Schema模型时,每个同步任务仅能同步一个Mysql Database。(其他Datasource同理,链接器会忽略TableId.namespace信息)
Flink CDC 中抽象 | MaxCompute 位置 | Mysql 位置 |
---|---|---|
配置文件中project | project | (none) |
TableId.namespace | schema(仅当MaxCompute项目支持Schema模型时,如不支持,将忽略该配置) | database |
TableId.tableName | table | table |
数据类型映射 #
Flink Type | MaxCompute Type |
---|---|
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |