StarRocks
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

StarRocks Connector #

StarRocks connector can be used as the Data Sink of the pipeline, and write data to StarRocks. This document describes how to set up the StarRocks connector.

What can the connector do? #

  • Create table automatically if not exist
  • Schema change synchronization
  • Data synchronization

Example #

The pipeline for reading data from MySQL and sink to StarRocks can be defined as follows:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to StarRocks Pipeline
   parallelism: 2

Connector Options #

Option Required Default Type Description
type required (none) String Specify what connector to use, here should be 'starrocks'.
name optional (none) String The name of the sink.
jdbc-url required (none) String The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: `jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3`.
load-url required (none) String The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: `fe_host1:fe_http_port1;fe_host2:fe_http_port2`.
username required (none) String User name to use when connecting to the StarRocks database.
password required (none) String Password to use when connecting to the StarRocks database.
sink.label-prefix optional (none) String The label prefix used by Stream Load.
sink.connect.timeout-ms optional 30000 String The timeout for establishing HTTP connection. Valid values: 100 to 60000.
sink.wait-for-continue.timeout-ms optional 30000 String Timeout in millisecond to wait for 100-continue response from FE http server. Valid values: 3000 to 600000.
sink.buffer-flush.max-bytes optional 157286400 Long The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The value ranges from 64 MB to 10 GB. This buffer is shared by all tables in the sink. If the buffer is full, the connector will choose one or more tables to flush.
sink.buffer-flush.interval-ms optional 300000 Long The interval at which data is flushed for each table. The unit is in millisecond.
sink.scan-frequency.ms optional 50 Long Scan frequency in milliseconds to check whether the buffered data for a table should be flushed because of reaching the flush interval.
sink.io.thread-count optional 2 Integer Number of threads used for concurrent stream loads among different tables.
sink.at-least-once.use-transaction-stream-load optional true Boolean Whether to use transaction stream load for at-least-once when it's available.
sink.properties.* optional (none) String The parameters that control Stream Load behavior. For example, the parameter `sink.properties.timeout` specifies the timeout of Stream Load. For a list of supported parameters and their descriptions, see STREAM LOAD.
table.create.num-buckets optional (none) Integer Number of buckets when creating a StarRocks table automatically. For StarRocks 2.5 or later, it's not required to set the option because StarRocks can determine the number of buckets automatically. For StarRocks prior to 2.5, you must set this option.
table.create.properties.* optional (none) String Properties used for creating a StarRocks table. For example: 'table.create.properties.fast_schema_evolution' = 'true' will enable fast schema evolution if you are using StarRocks 3.2 or later. For more information, see how to create a primary key table.
table.schema-change.timeout optional 30min Duration Timeout for a schema change on StarRocks side, and must be an integral multiple of seconds. StarRocks will cancel the schema change after timeout which will cause the sink failure.

Usage Notes #

  • Only support StarRocks primary key table, so the source table must have primary keys.

  • Not support exactly-once. The connector uses at-least-once + primary key table for idempotent writing.

  • For creating table automatically

    • the distribution keys are the same as the primary keys
    • there is no partition key
    • the number of buckets is controlled by table.create.num-buckets. If you are using StarRocks 2.5 or later, it’s not required to set the option because StarRocks can determine the number of buckets automatically, otherwise you must set the option.
  • For schema change synchronization

    • only supports add/drop columns
    • the new column will always be added to the last position
    • if your StarRocks version is 3.2 or later, and using the connector to create table automatically, you can set table.create.properties.fast_schema_evolution to true to speed up the schema change.
  • For data synchronization, the pipeline connector uses StarRocks Sink Connector to write data to StarRocks. You can see sink documentation for how it works.

Data Type Mapping #

Flink CDC type StarRocks type Note
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP DATETIME
TIMESTAMP_LTZ DATETIME
CHAR(n) where n <= 85 CHAR(n * 3) CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks CHAR only when the CDC length is no larger than 85.
CHAR(n) where n > 85 VARCHAR(n * 3) CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks VARCHAR if the CDC length is larger than 85.
VARCHAR(n) VARCHAR(n * 3) CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese character is equal to three bytes, so the length for StarRocks is n * 3.

Back to top