This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
OceanBase
OceanBase Connector #
OceanBase connector can be used as the Data Sink of the pipeline, and write data to OceanBase. This document describes how to set up the OceanBase connector.
What can the connector do? #
- Create table automatically if not exist
- Schema change synchronization
- Data synchronization
Example #
The pipeline for reading data from MySQL and sink to OceanBase can be defined as follows:
source:
type: mysql
hostname: mysql
port: 3306
username: mysqluser
password: mysqlpw
tables: mysql_2_oceanbase_test_17l13vc.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: oceanbase
url: jdbc:mysql://oceanbase:2881/test
username: root@test
password:
pipeline:
name: MySQL to OceanBase Pipeline
parallelism: 1
Connector Options #
Option | Required by Table API | Default | Type | Description |
---|---|---|---|---|
url | Yes | String | JDBC url. | |
username | Yes | String | The username. | |
password | Yes | String | The password. | |
schema-name | Yes | String | The schema name or database name. | |
table-name | Yes | String | The table name. | |
driver-class-name | No | com.mysql.cj.jdbc.Driver | String | The driver class name, use 'com.mysql.cj.jdbc.Driver' by default. And the connector does not contain the corresponding driver and needs to be introduced manually. |
druid-properties | No | String | Druid connection pool properties, multiple values are separated by semicolons. | |
sync-write | No | false | Boolean | Whether to write data synchronously, will not use buffer if it's set to 'true'. |
buffer-flush.interval | No | 1s | Duration | Buffer flush interval. Set '0' to disable scheduled flushing. |
buffer-flush.buffer-size | No | 1000 | Integer | Buffer size. |
max-retries | No | 3 | Integer | Max retry times on failure. |
memstore-check.enabled | No | true | Boolean | Whether enable memstore check. |
memstore-check.threshold | No | 0.9 | Double | Memstore usage threshold ratio relative to the limit value. |
memstore-check.interval | No | 30s | Duration | Memstore check interval. |
partition.enabled | No | false | Boolean | Whether to enable partition calculation and flush records by partitions. Only works when 'sync-write' and 'direct-load.enabled' are 'false'. |
Usage Notes #
-
Currently only supports MySQL tenants of OceanBase
-
Provides at-least-once semantics, exactly-once is not supported yet
-
For creating table automatically
- there is no partition key
-
For schema change synchronization
- Currently, only adding new columns and renaming columns are supported
- the new column will always be added to the last position
-
For data synchronization, the pipeline connector uses OceanBase Sink Connector to write data to OceanBase. You can see sink documentation for how it works.
Data Type Mapping #
CDC type | OceanBase type under MySQL tenant | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
BINARY | BINARY | |
VARBINARY(n) when n <= 1048576 | VARBINARY(n) | |
VARBINARY(n) when n > 1048576 | LONGBLOB | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIME | TIME | |
TIMESTAMP | DATETIME | |
TIMESTAMP_TZ | TIMESTAMP | |
TIMESTAMP_LTZ | TIMESTAMP | |
CHAR(n) where n <= 256 | CHAR(n) | |
CHAR(n) where n > 256 | VARCHAR(n) | |
VARCHAR(n) where n <= 262144 | VARCHAR(n) | |
VARCHAR(n) where n > 262144 | TEXT |