MaxCompute
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

MaxCompute Connector #

MaxCompute Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入MaxCompute。 本文档介绍如何设置 MaxCompute Pipeline 连接器。

连接器的功能 #

  • 自动建表
  • 表结构变更同步
  • 数据实时同步

示例 #

从 MySQL 读取数据同步到 MaxCompute 的 Pipeline 可以定义如下:

source:
  type: mysql
  name: MySQL Source
  hostname: 127.0.0.1
  port: 3306
  username: admin
  password: pass
  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
  server-id: 5401-5404

sink:
  type: maxcompute
  name: MaxCompute Sink
  accessId: ak
  accessKey: sk
  endpoint: endpoint
  project: flink_cdc
  bucketSize: 8

pipeline:
  name: MySQL to MaxCompute Pipeline
  parallelism: 2

连接器配置项 #

Option Required Default Type Description
type required (none) String 指定要使用的连接器, 这里需要设置成 'maxcompute'.
name optional (none) String Sink 的名称.
accessId required (none) String 阿里云账号或RAM用户的AccessKey ID。您可以进入 AccessKey管理页面 获取AccessKey ID。
accessKey required (none) String AccessKey ID对应的AccessKey Secret。您可以进入 AccessKey管理页面 获取AccessKey Secret。
endpoint required (none) String MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见 Endpoint
project required (none) String MaxCompute项目名称。您可以登录 MaxCompute控制台,在 工作区 > 项目管理 页面获取MaxCompute项目名称。
tunnelEndpoint optional (none) String MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的project所在的region进行自动路由。仅在使用代理等特殊网络环境下使用该配置。
quotaName optional (none) String MaxCompute 数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参考 使用 Maxcompute 独享资源组
stsToken optional (none) String 当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。
bucketsNum optional 16 Integer 自动创建 MaxCompute Delta 表时使用的桶数。使用方式可以参考 Delta Table 概述
compressAlgorithm optional zlib String 写入MaxCompute时使用的数据压缩算法,当前支持raw(不进行压缩),zlibsnappy
totalBatchSize optional 64MB String 内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。
bucketBatchSize optional 4MB String 内存中缓冲的数据量大小,单位为桶级,仅写入 Delta 表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。
numCommitThreads optional 16 Integer checkpoint阶段,能够同时处理的分区(表)数量。
numFlushConcurrent optional 4 Integer 写入数据到MaxCompute时,能够同时写入的桶数量。仅写入 Delta 表时生效。

使用说明 #

  • 链接器 支持自动建表,将MaxCompute表与源表的位置关系、数据类型进行自动映射(参见下文映射表),当源表有主键时,自动创建 MaxCompute Delta 表,否则创建普通 MaxCompute 表(Append表)
  • 当写入普通 MaxCompute 表(Append表)时,会忽略delete操作,update操作会被视为insert操作
  • 目前仅支持 at-least-once,Delta 表由于主键特性能够实现幂等写
  • 对于表结构变更同步
    • 新增列只能添加到最后一列
    • 修改列类型,只能修改为兼容的类型。兼容表可以参考ALTER TABLE

表位置映射 #

链接器自动建表时,使用如下映射关系,将源表的位置信息映射到MaxCompute表的位置。注意,当MaxCompute项目不支持Schema模型时,每个同步任务仅能同步一个Mysql Database。(其他Datasource同理,链接器会忽略TableId.namespace信息)

Flink CDC 中抽象 MaxCompute 位置 Mysql 位置
配置文件中project project (none)
TableId.namespace schema(仅当MaxCompute项目支持Schema模型时,如不支持,将忽略该配置) database
TableId.tableName table table

数据类型映射 #

Flink Type MaxCompute Type
CHAR/VARCHAR STRING
BOOLEAN BOOLEAN
BINARY/VARBINARY BINARY
DECIMAL DECIMAL
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
TIME_WITHOUT_TIME_ZONE STRING
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE TIMESTAMP_NTZ
TIMESTAMP_WITH_LOCAL_TIME_ZONE TIMESTAMP
TIMESTAMP_WITH_TIME_ZONE TIMESTAMP
ARRAY ARRAY
MAP MAP
ROW STRUCT

Back to top