StarRocks Connector #
StarRocks Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入StarRocks。 本文档介绍如何设置 StarRocks Pipeline 连接器。
连接器的功能 #
- 自动建表
- 表结构变更同步
- 数据实时同步
示例 #
从 MySQL 读取数据同步到 StarRocks 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to StarRocks Pipeline
parallelism: 2
连接器配置项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'starrocks' . |
name | optional | (none) | String | Sink 的名称. |
jdbc-url | required | (none) | String | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:`jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2`。 |
load-url | required | (none) | String | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:`fe_host1:fe_http_port1;fe_host2:fe_http_port2`。 |
username | required | (none) | String | StarRocks 集群的用户名。 |
password | required | (none) | String | StarRocks 集群的用户密码。 |
sink.label-prefix | optional | (none) | String | 指定 Stream Load 使用的 label 前缀。 |
sink.connect.timeout-ms | optional | 30000 | String | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 |
sink.wait-for-continue.timeout-ms | optional | 30000 | String | 等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 |
sink.buffer-flush.max-bytes | optional | 157286400 | Long | 内存中缓冲的数据量大小,缓冲区由所有导入的表共享,达到阈值后将选择一个或多个表的数据写入到StarRocks。 达到阈值后取值范围:[64MB, 10GB]。 |
sink.buffer-flush.interval-ms | optional | 300000 | Long | 每个表缓冲数据发送的间隔,用于控制数据写入 StarRocks 的延迟。单位是毫秒,取值范围:[1000, 3600000]。 |
sink.scan-frequency.ms | optional | 50 | Long | 连接器会定期检查每个表是否到达发送间隔,该配置控制检查频率,单位为毫秒。 |
sink.io.thread-count | optional | 2 | Integer | 用来执行 Stream Load 的线程数,不同表之间的导入可以并发执行。 |
sink.at-least-once.use-transaction-stream-load | optional | true | Boolean | at-least-once 下是否使用 transaction stream load。 |
sink.properties.* | optional | (none) | String | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 `sink.properties.timeout` 用来控制导入的超时时间。 全部参数和解释请参考 STREAM LOAD。 |
table.create.num-buckets | optional | (none) | Integer | 自动创建 StarRocks 表时使用的桶数。对于 StarRocks 2.5 及之后的版本可以不设置,StarRocks 将会 自动设置分桶数量;对于 StarRocks 2.5 之前的版本必须设置。 |
table.create.properties.* | optional | (none) | String | 自动创建 StarRocks 表时使用的属性。比如: 如果使用 StarRocks 3.2 及之后的版本,'table.create.properties.fast_schema_evolution' = 'true'
将会打开 fast schema evolution 功能。 更多信息请参考
主键模型。 |
table.schema-change.timeout | optional | 30min | Duration | StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。 |
使用说明 #
-
只支持主键表,因此源表必须有主键
-
暂不支持 exactly-once,连接器 通过 at-least-once 和主键表实现幂等写
-
对于自动建表
- 分桶键和主键相同
- 没有分区键
- 分桶数由
table.create.num-buckets
控制。如果使用的 StarRocks 2.5 及之后的版本可以不设置,StarRocks 能够 自动设置分桶数量。对于 StarRocks 2.5 之前的版本必须设置,否则无法自动创建表。
-
对于表结构变更同步
- 只支持增删列
- 新增列只能添加到最后一列
- 如果使用 StarRocks 3.2 及之后版本,并且通过连接器来自动建表, 可以通过配置
table.create.properties.fast_schema_evolution
为true
来加速 StarRocks 执行变更。
-
对于数据同步,pipeline 连接器使用 StarRocks Sink 连接器 将数据写入 StarRocks,具体可以参考 Sink 文档。
数据类型映射 #
CDC type | StarRocks type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
CHAR(n) where n <= 85 | CHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。 |
CHAR(n) where n > 85 | VARCHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。 |
VARCHAR(n) | VARCHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。 |