This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
StarRocks
StarRocks Connector #
StarRocks connector can be used as the Data Sink of the pipeline, and write data to StarRocks. This document describes how to set up the StarRocks connector.
What can the connector do? #
- Create table automatically if not exist
- Schema change synchronization
- Data synchronization
Example #
The pipeline for reading data from MySQL and sink to StarRocks can be defined as follows:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to StarRocks Pipeline
parallelism: 2
Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | Specify what connector to use, here should be 'starrocks' . |
name | optional | (none) | String | The name of the sink. |
jdbc-url | required | (none) | String | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: `jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3`. |
load-url | required | (none) | String | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: `fe_host1:fe_http_port1;fe_host2:fe_http_port2`. |
username | required | (none) | String | User name to use when connecting to the StarRocks database. |
password | required | (none) | String | Password to use when connecting to the StarRocks database. |
sink.label-prefix | optional | (none) | String | The label prefix used by Stream Load. |
sink.connect.timeout-ms | optional | 30000 | String | The timeout for establishing HTTP connection. Valid values: 100 to 60000. |
sink.wait-for-continue.timeout-ms | optional | 30000 | String | Timeout in millisecond to wait for 100-continue response from FE http server. Valid values: 3000 to 600000. |
sink.buffer-flush.max-bytes | optional | 157286400 | Long | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The value ranges from 64 MB to 10 GB. This buffer is shared by all tables in the sink. If the buffer is full, the connector will choose one or more tables to flush. |
sink.buffer-flush.interval-ms | optional | 300000 | Long | The interval at which data is flushed for each table. The unit is in millisecond. |
sink.scan-frequency.ms | optional | 50 | Long | Scan frequency in milliseconds to check whether the buffered data for a table should be flushed because of reaching the flush interval. |
sink.io.thread-count | optional | 2 | Integer | Number of threads used for concurrent stream loads among different tables. |
sink.at-least-once.use-transaction-stream-load | optional | true | Boolean | Whether to use transaction stream load for at-least-once when it's available. |
sink.properties.* | optional | (none) | String | The parameters that control Stream Load behavior. For example, the parameter `sink.properties.timeout` specifies the timeout of Stream Load. For a list of supported parameters and their descriptions, see STREAM LOAD. |
table.create.num-buckets | optional | (none) | Integer | Number of buckets when creating a StarRocks table automatically. For StarRocks 2.5 or later, it's not required to set the option because StarRocks can determine the number of buckets automatically. For StarRocks prior to 2.5, you must set this option. |
table.create.properties.* | optional | (none) | String | Properties used for creating a StarRocks table. For example: 'table.create.properties.fast_schema_evolution' = 'true'
will enable fast schema evolution if you are using StarRocks 3.2 or later. For more information,
see how to create a primary key table. |
table.schema-change.timeout | optional | 30min | Duration | Timeout for a schema change on StarRocks side, and must be an integral multiple of seconds. StarRocks will cancel the schema change after timeout which will cause the sink failure. |
Usage Notes #
-
Only support StarRocks primary key table, so the source table must have primary keys.
-
Not support exactly-once. The connector uses at-least-once + primary key table for idempotent writing.
-
For creating table automatically
- the distribution keys are the same as the primary keys
- there is no partition key
- the number of buckets is controlled by
table.create.num-buckets
. If you are using StarRocks 2.5 or later, it’s not required to set the option because StarRocks can determine the number of buckets automatically, otherwise you must set the option.
-
For schema change synchronization
- only supports add/drop columns
- the new column will always be added to the last position
- if your StarRocks version is 3.2 or later, and using the connector to create table automatically,
you can set
table.create.properties.fast_schema_evolution
totrue
to speed up the schema change.
-
For data synchronization, the pipeline connector uses StarRocks Sink Connector to write data to StarRocks. You can see sink documentation for how it works.
Data Type Mapping #
Flink CDC type | StarRocks type | Note |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
CHAR(n) where n <= 85 | CHAR(n * 3) | CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks CHAR only when the CDC length is no larger than 85. |
CHAR(n) where n > 85 | VARCHAR(n * 3) | CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks VARCHAR if the CDC length is larger than 85. |
VARCHAR(n) | VARCHAR(n * 3) | CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese character is equal to three bytes, so the length for StarRocks is n * 3. |