StarRocks
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

StarRocks Connector #

StarRocks Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入StarRocks。 本文档介绍如何设置 StarRocks Pipeline 连接器。

连接器的功能 #

  • 自动建表
  • 表结构变更同步
  • 数据实时同步

示例 #

从 MySQL 读取数据同步到 StarRocks 的 Pipeline 可以定义如下:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to StarRocks Pipeline
   parallelism: 2

连接器配置项 #

Option Required Default Type Description
type required (none) String 指定要使用的连接器, 这里需要设置成 'starrocks'.
name optional (none) String Sink 的名称.
jdbc-url required (none) String 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:`jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2`。
load-url required (none) String 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:`fe_host1:fe_http_port1;fe_host2:fe_http_port2`。
username required (none) String StarRocks 集群的用户名。
password required (none) String StarRocks 集群的用户密码。
sink.label-prefix optional (none) String 指定 Stream Load 使用的 label 前缀。
sink.connect.timeout-ms optional 30000 String 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。
sink.wait-for-continue.timeout-ms optional 30000 String 等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。
sink.buffer-flush.max-bytes optional 157286400 Long 内存中缓冲的数据量大小,缓冲区由所有导入的表共享,达到阈值后将选择一个或多个表的数据写入到StarRocks。 达到阈值后取值范围:[64MB, 10GB]。
sink.buffer-flush.interval-ms optional 300000 Long 每个表缓冲数据发送的间隔,用于控制数据写入 StarRocks 的延迟。单位是毫秒,取值范围:[1000, 3600000]。
sink.scan-frequency.ms optional 50 Long 连接器会定期检查每个表是否到达发送间隔,该配置控制检查频率,单位为毫秒。
sink.io.thread-count optional 2 Integer 用来执行 Stream Load 的线程数,不同表之间的导入可以并发执行。
sink.at-least-once.use-transaction-stream-load optional true Boolean at-least-once 下是否使用 transaction stream load。
sink.properties.* optional (none) String Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 `sink.properties.timeout` 用来控制导入的超时时间。 全部参数和解释请参考 STREAM LOAD
table.create.num-buckets optional (none) Integer 自动创建 StarRocks 表时使用的桶数。对于 StarRocks 2.5 及之后的版本可以不设置,StarRocks 将会 自动设置分桶数量;对于 StarRocks 2.5 之前的版本必须设置。
table.create.properties.* optional (none) String 自动创建 StarRocks 表时使用的属性。比如: 如果使用 StarRocks 3.2 及之后的版本,'table.create.properties.fast_schema_evolution' = 'true' 将会打开 fast schema evolution 功能。 更多信息请参考 主键模型
table.schema-change.timeout optional 30min Duration StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。

使用说明 #

  • 只支持主键表,因此源表必须有主键

  • 暂不支持 exactly-once,连接器 通过 at-least-once 和主键表实现幂等写

  • 对于自动建表

    • 分桶键和主键相同
    • 没有分区键
    • 分桶数由 table.create.num-buckets 控制。如果使用的 StarRocks 2.5 及之后的版本可以不设置,StarRocks 能够 自动设置分桶数量。对于 StarRocks 2.5 之前的版本必须设置,否则无法自动创建表。
  • 对于表结构变更同步

    • 只支持增删列
    • 新增列只能添加到最后一列
    • 如果使用 StarRocks 3.2 及之后版本,并且通过连接器来自动建表, 可以通过配置 table.create.properties.fast_schema_evolutiontrue 来加速 StarRocks 执行变更。
  • 对于数据同步,pipeline 连接器使用 StarRocks Sink 连接器 将数据写入 StarRocks,具体可以参考 Sink 文档

数据类型映射 #

CDC type StarRocks type NOTE
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP DATETIME
TIMESTAMP_LTZ DATETIME
CHAR(n) where n <= 85 CHAR(n * 3) CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。
CHAR(n) where n > 85 VARCHAR(n * 3) CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。
VARCHAR(n) VARCHAR(n * 3) CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。

Back to top