OceanBase
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

OceanBase CDC Connector #

The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to set up the OceanBase CDC connector to run SQL queries against OceanBase.

OceanBase CDC Solutions #

Glossary:

  • OceanBase CE: OceanBase Community Edition. It’s compatible with MySQL and has been open sourced at https://github.com/oceanbase/oceanbase.
  • OceanBase EE: OceanBase Enterprise Edition. It supports two compatibility modes: MySQL and Oracle. See https://en.oceanbase.com.
  • OceanBase Cloud: OceanBase Enterprise Edition on Cloud. See https://en.oceanbase.com/product/cloud.
  • Log Proxy CE: OceanBase Log Proxy Community Edition (CDC mode). It’s a proxy service which can fetch the commit log data of OceanBase CE. It has been open sourced at https://github.com/oceanbase/oblogproxy.
  • Log Proxy EE: OceanBase Log Proxy Enterprise Edition (CDC mode). It’s a proxy service which can fetch the commit log data of OceanBase EE. Limited support is available on OceanBase Cloud only, you can contact the provider support for more details.
  • Binlog Service CE: OceanBase Binlog Service Community Edition. It is a solution of OceanBase CE that is compatible with the MySQL replication protocol. See the docs of Log Proxy CE (Binlog mode) for details.
  • Binlog Service EE: OceanBase Binlog Service Enterprise Edition. It is a solution of OceanBase EE MySQL mode that is compatible with the MySQL replication protocol, and it’s only available for users of Alibaba Cloud, see User Guide.
  • MySQL Driver: mysql-connector-java which can be used with OceanBase CE and OceanBase EE MySQL mode.
  • OceanBase Driver: The Jdbc driver for OceanBase, which supports both MySQL mode and Oracle mode of all OceanBase versions. It’s open sourced at https://github.com/oceanbase/obconnector-j.

CDC Source Solutions for OceanBase:

Database Supported Driver CDC Source Connector Other Required Components
OceanBase CE MySQL Driver: 5.1.4x, 8.0.x
OceanBase Driver: 2.4.x
OceanBase CDC Connector Log Proxy CE
MySQL Driver: 8.0.x MySQL CDC Connector Binlog Service CE
OceanBase EE (MySQL Mode) MySQL Driver: 5.1.4x, 8.0.x
OceanBase Driver: 2.4.x
OceanBase CDC Connector Log Proxy EE
MySQL Driver: 8.0.x MySQL CDC Connector Binlog Service EE
OceanBase EE (Oracle Mode) OceanBase Driver: 2.4.x OceanBase CDC Connector Log Proxy EE (CDC Mode)

Note: For users of OceanBase CE or OceanBase EE MySQL Mode, we recommend that you follow the MySQL CDC documentation to use the MySQL CDC source connector with the Binlog service.

Dependencies #

In order to set up the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency #

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-oceanbase-cdc</artifactId>
    <version>3.2-SNAPSHOT</version>
</dependency>

SQL Client JAR #

Download link is available only for stable releases.

Download flink-sql-connector-oceanbase-cdc-3.0.1.jar and put it under <FLINK_HOME>/lib/.

Note: Refer to flink-sql-connector-oceanbase-cdc, more released versions will be available in the Maven central warehouse.

Since the licenses of MySQL Driver and OceanBase Driver are incompatible with Flink CDC project, we can’t provide them in prebuilt connector jar packages. You may need to configure the following dependencies manually.

Dependency Item Description
mysql:mysql-connector-java:8.0.27 Used for connecting to MySQL tenant of OceanBase.
com.oceanbase:oceanbase-client:2.4.9 Used for connecting to MySQL or Oracle tenant of OceanBase.

Setup OceanBase and LogProxy Server #

  1. Set up the OceanBase cluster following the doc.

  2. Create a user with password in sys tenant, this user is used in OceanBase LogProxy.

    mysql -h${host} -P${port} -uroot
    
    mysql> SHOW TENANT;
    mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
    mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
    
  3. Create a user in the tenant you want to monitor, this is used to read data for snapshot and change event.

  4. For users of OceanBase Community Edition, you need to get the rootserver-list. You can use the following command to get the value:

    mysql> show parameters like 'rootservice_list';
    

    For users of OceanBase Enterprise Edition, you need to get the config-url. You can use the following command to get the value:

    mysql> show parameters like 'obconfig_url';
    
  5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the docs (Chinese).

How to create a OceanBase CDC table #

The OceanBase CDC table can be defined as following:

-- checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- register a OceanBase table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = 'user@test_tenant#cluster_name',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = '^test_db$',
    'table-name' = '^orders$',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'rootserver-list' = '127.0.0.1:2882:2881',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983',
    'working-mode' = 'memory'
);

-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;

If you want to use OceanBase Oracle mode, you need to add the OceanBase jdbc jar file to Flink and set up the enterprise edition of oblogproxy, then you can create a table in Flink as following:

Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = 'user@test_tenant#cluster_name',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = '^test_db$',
    'table-name' = '^orders$',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'compatible-mode' = 'oracle',
    'jdbc.driver' = 'com.oceanbase.jdbc.Driver',
    'config-url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983',
    'working-mode' = 'memory'
);

You can also try the quickstart tutorial that sync data from OceanBase to Elasticsearch, please refer Flink CDC Tutorial for more information.

Connector Options #

The OceanBase CDC Connector contains some options for both sql and stream api as the following sheet.

Note: The connector supports two ways to specify the table list to listen to, and will get the union of the results when both way are used at the same time.

  1. Use database-name and table-name to match database and table names in regex.
  2. Use table-list to match the exact value of database and table names.
Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'oceanbase-cdc'.
scan.startup.mode optional initial String Specify the startup mode for OceanBase CDC consumer, valid enumerations are 'initial','latest-offset','timestamp' or 'snapshot'.
scan.startup.timestamp optional (none) Long Timestamp in seconds of the start point, only used for 'timestamp' startup mode.
username required (none) String Username to be used when connecting to OceanBase.
password required (none) String Password to be used when connecting to OceanBase.
tenant-name optional (none) String Tenant name of OceanBase to monitor, should be exact value. Required when 'scan.startup.mode' is not 'snapshot'.
database-name optional (none) String Database name of OceanBase to monitor, should be regular expression.
table-name optional (none) String Table name of OceanBase to monitor, should be regular expression.
table-list optional (none) String List of full names of tables, separated by commas, e.g. "db1.table1, db2.table2".
hostname required (none) String IP address or hostname of the OceanBase database server or OceanBase Proxy server.
port required (none) Integer Integer port number to connect to OceanBase. It can be the SQL port of OceanBase server, which is 2881 by default, or the port of OceanBase proxy service, which is 2883 by default.
connect.timeout optional 30s Duration The maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out.
server-time-zone optional +00:00 String The session timezone which controls how temporal types are converted to STRING in OceanBase. Can be UTC offset in format "±hh:mm", or named time zones if the time zone information tables in the mysql database have been created and populated.
logproxy.host optional (none) String Hostname or IP address of OceanBase log proxy service. Required when 'scan.startup.mode' is not 'snapshot'.
logproxy.port optional (none) Integer Port number of OceanBase log proxy service. Required when 'scan.startup.mode' is not 'snapshot'.
logproxy.client.id optional By rule. String Id of a log proxy client connection, will be in format {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant} by default.
rootserver-list optional (none) String The semicolon-separated list of OceanBase root servers in format `ip:rpc_port:sql_port`, required for OceanBase CE.
config-url optional (none) String The url to get the server info from the config server, required for OceanBase EE.
working-mode optional storage String Working mode of `obcdc` in LogProxy, can be `storage` or `memory`.
compatible-mode optional mysql String Compatible mode of OceanBase, can be `mysql` or `oracle`.
jdbc.driver optional com.mysql.cj.jdbc.Driver String JDBC driver class for snapshot reading.
jdbc.properties.* optional (none) String Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.
obcdc.properties.* optional (none) String Option to pass custom configurations to the libobcdc, eg: 'obcdc.properties.sort_trans_participants' = '1'. Please refer to obcdc parameters for more details.

Available Metadata #

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
tenant_name STRING Name of the tenant that contains the row.
database_name STRING Name of the database that contains the row.
schema_name STRING Name of the schema that contains the row.
table_name STRING NOT NULL Name of the table that contains the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
   'connector' = 'oceanbase-cdc',
   'scan.startup.mode' = 'initial',
   'username' = 'user@test_tenant',
   'password' = 'pswd',
   'tenant-name' = 'test_tenant',
   'database-name' = '^test_db$',
   'table-name' = '^orders$',
   'hostname' = '127.0.0.1',
   'port' = '2881',
   'rootserver-list' = '127.0.0.1:2882:2881',
   'logproxy.host' = '127.0.0.1',
   'logproxy.port' = '2983',
   'working-mode' = 'memory'
);

Features #

At-Least-Once Processing #

The OceanBase CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with at-least-once processing.

OceanBase is a kind of distributed database whose log files are distributed on different servers. As there is no position information like MySQL binlog offset, we can only use timestamp as the position mark. In order to ensure the completeness of reading data, liboblog (a C++ library to read OceanBase log record) might read some log data before the given timestamp. So in this way we may read duplicate data whose timestamp is around the start point, and only ‘at-least-once’ can be guaranteed.

Startup Reading Position #

The config option scan.startup.mode specifies the startup mode for OceanBase CDC consumer. The valid enumerations are:

  • initial: Performs an initial snapshot on the monitored table upon first startup, and continue to read the latest commit log.
  • latest-offset: Never to perform snapshot on the monitored table upon first startup and just read the latest commit log since the connector is started.
  • timestamp: Never to perform snapshot on the monitored table upon first startup and just read the commit log from the given scan.startup.timestamp.
  • snapshot: Only perform snapshot on the monitored table.

Consume Commit Log #

The OceanBase CDC Connector using oblogclient to consume commit log from OceanBase LogProxy.

DataStream Source #

The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class OceanBaseSourceExample {
   public static void main(String[] args) throws Exception {
      SourceFunction<String> oceanBaseSource =
              OceanBaseSource.<String>builder()
                      .startupOptions(StartupOptions.initial())
                      .hostname("127.0.0.1")
                      .port(2881)
                      .username("user@test_tenant")
                      .password("pswd")
                      .compatibleMode("mysql")
                      .jdbcDriver("com.mysql.cj.jdbc.Driver")
                      .tenantName("test_tenant")
                      .databaseName("^test_db$")
                      .tableName("^test_table$")
                      .logProxyHost("127.0.0.1")
                      .logProxyPort(2983)
                      .rsList("127.0.0.1:2882:2881")
                      .serverTimeZone("+08:00")
                      .deserializer(new JsonDebeziumDeserializationSchema())
                      .build();

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // enable checkpoint
      env.enableCheckpointing(3000);

      env.addSource(oceanBaseSource).print().setParallelism(1);
      env.execute("Print OceanBase Snapshot + Change Events");
   }
}

Data Type Mapping #

Mysql Mode #

OceanBase type Flink SQL type NOTE
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <=65
STRING DECIMAL is equivalent to NUMERIC. The precision for DECIMAL data type is up to 65 in OceanBase, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
DATE DATE
TIME [(p)] TIME [(p)]
DATETIME [(p)] TIMESTAMP [(p)]
TIMESTAMP [(p)] TIMESTAMP_LTZ [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈(n + 7) / 8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
YEAR INT
ENUM STRING
SET ARRAY<STRING> As the SET data type in OceanBase is a string object that can have zero or more values, it should always be mapped to an array of string
JSON STRING The JSON data type will be converted into STRING with JSON format in Flink.

Oracle Mode #

OceanBase type Flink SQL type NOTE
NUMBER(1) BOOLEAN
NUMBER(p, s <= 0), p - s < 3 TINYINT
NUMBER(p, s <= 0), p - s < 5 SMALLINT
NUMBER(p, s <= 0), p - s < 10 INT
NUMBER(p, s <= 0), p - s < 19 BIGINT
NUMBER(p, s <= 0), 19 <=p - s <=38 DECIMAL(p - s, 0)
NUMBER(p, s > 0) DECIMAL(p, s)
NUMBER(p, s <= 0), p - s> 38 STRING
FLOAT
BINARY_FLOAT
FLOAT
BINARY_DOUBLE DOUBLE
DATE
TIMESTAMP [(p)]
TIMESTAMP [(p)]
CHAR(n)
NCHAR(n)
VARCHAR(n)
VARCHAR2(n)
NVARCHAR2(n)
CLOB
STRING
RAW
BLOB
ROWID
BYTES

Back to top