OceanBase
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

OceanBase CDC 连接器 #

OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。本文介绍了如何设置 OceanBase CDC 连接器以对 OceanBase 进行 SQL 查询。

OceanBase CDC 方案 #

名词解释:

  • OceanBase CE: OceanBase 社区版。OceanBase 的开源版本,兼容 MySQL https://github.com/oceanbase/oceanbase
  • OceanBase EE: OceanBase 企业版。OceanBase 的商业版本,支持 MySQL 和 Oracle 两种兼容模式 https://www.oceanbase.com
  • OceanBase Cloud: OceanBase 云数据库 https://www.oceanbase.com/product/cloud
  • Log Proxy CE: OceanBase 日志代理服务社区版。单独使用时支持 CDC 模式,是一个获取 OceanBase 社区版事务日志(commit log)的代理服务 https://github.com/oceanbase/oblogproxy
  • Log Proxy EE: OceanBase 日志代理服务企业版。单独使用时支持 CDC 模式,是一个获取 OceanBase 企业版事务日志(commit log)的代理服务,目前仅在 OceanBase Cloud 上提供有限的支持, 详情请咨询相关技术支持。
  • Binlog Service CE: OceanBase Binlog 服务社区版。OceanBase 社区版的一个兼容 MySQL 复制协议的解决方案,详情参考 Log Proxy CE Binlog 模式的文档。
  • Binlog Service EE: OceanBase Binlog 服务企业版。OceanBase 企业版 MySQL 模式的一个兼容 MySQL 复制协议的解决方案,仅可在阿里云使用,详情见操作指南
  • MySQL Driver: mysql-connector-java,可用于 OceanBase 社区版和 OceanBase 企业版 MySQL 模式。
  • OceanBase Driver: OceanBase JDBC 驱动,支持所有版本的 MySQL 和 Oracle 兼容模式 https://github.com/oceanbase/obconnector-j

OceanBase CDC 源端读取方案:

数据库类型 支持的驱动 CDC 连接器 其他用到的组件
OceanBase CE MySQL Driver: 5.1.4x, 8.0.x
OceanBase Driver: 2.4.x
OceanBase CDC Connector Log Proxy CE
MySQL Driver: 8.0.x MySQL CDC Connector Binlog Service CE
OceanBase EE (MySQL 模式) MySQL Driver: 5.1.4x, 8.0.x
OceanBase Driver: 2.4.x
OceanBase CDC Connector Log Proxy EE
MySQL Driver: 8.0.x MySQL CDC Connector Binlog Service EE
OceanBase EE (Oracle 模式) OceanBase Driver: 2.4.x OceanBase CDC Connector Log Proxy EE (CDC 模式)

注意: 对于使用 OceanBase 社区版或 OceanBase 企业版 MySQL 模式的用户,我们推荐参考 MySQL CDC 的文档,使用 MySQL CDC 连接器搭配 Binlog 服务。

依赖 #

为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 SQL JAR 包的 SQL 客户端。

Maven dependency #

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-oceanbase-cdc</artifactId>
    <version>3.3-SNAPSHOT</version>
</dependency>

SQL Client JAR #

下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。

下载flink-sql-connector-oceanbase-cdc-3.3-SNAPSHOT.jar<FLINK_HOME>/lib/ 目录下。

注意: 参考 flink-sql-connector-oceanbase-cdc 当前已发布的所有版本都可以在 Maven 中央仓库获取。

由于 MySQL Driver 和 OceanBase Driver 使用的开源协议都与 Flink CDC 项目不兼容,我们无法在 jar 包中提供驱动。 您可能需要手动配置以下依赖:

依赖名称 说明
mysql:mysql-connector-java:8.0.27 用于连接到 OceanBase 数据库的 MySQL 租户。
com.oceanbase:oceanbase-client:2.4.9 用于连接到 OceanBase 数据库的 MySQL 或 Oracle 租户。

配置 OceanBase 数据库和 Log Proxy 服务 #

  1. 按照 文档 配置 OceanBase 集群。

  2. 在 sys 租户中,为 oblogproxy 创建一个带密码的用户。

    mysql -h${host} -P${port} -uroot
    
    mysql> SHOW TENANT;
    mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
    mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
    
  3. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。

  4. OceanBase 社区版用户需要获取rootserver-list,可以使用以下命令获取:

    mysql> SHOW PARAMETERS LIKE 'rootservice_list';
    

    OceanBase 企业版用户需要获取 config-url,可以使用以下命令获取:

    mysql> show parameters like 'obconfig_url';
    
  5. 设置 OceanBase LogProxy。 对于OceanBase社区版的用户,您可以按照此文档进行操作。

创建 OceanBase CDC 表 #

使用以下命令,创建 OceanBase CDC 表:

-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议 5 到 10 分钟                  
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- 在 Flink SQL 中创建 OceanBase 表 `orders`
Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = 'user@test_tenant#cluster_name',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = '^test_db$',
    'table-name' = '^orders$',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'rootserver-list' = '127.0.0.1:2882:2881',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983',
    'working-mode' = 'memory'
);

-- 从表 orders 中读取快照数据和 binlog 数据
Flink SQL> SELECT * FROM orders;

如果您使用的是企业版的 OceanBase Oracle 模式,您需要先添加 OceanBase 的官方 JDBC 驱动 jar 包到 Flink 环境,并且部署企业版的 oblogproxy 服务,然后通过以下命令创建 OceanBase CDC 表:

Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = 'user@test_tenant#cluster_name',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = '^test_db$',
    'table-name' = '^orders$',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'compatible-mode' = 'oracle',
    'jdbc.driver' = 'com.oceanbase.jdbc.Driver',
    'config-url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983',
    'working-mode' = 'memory'
);

您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 Flink CDC 官网文档

OceanBase CDC 连接器选项 #

OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表所示。

注意:连接器支持两种方式来指定需要监听的表,两种方式同时使用时会监听两种方式匹配的所有表。

  1. 使用 database-nametable-name 匹配正则表达式中的数据库和表名。
  2. 使用 table-list 去匹配数据库名和表名的准确列表。
配置项 是否必选 默认值 类型 描述
connector String 指定要使用的连接器,此处为 'oceanbase-cdc'
scan.startup.mode initial String 指定 OceanBase CDC 消费者的启动模式。可取值为 'initial''latest-offset''timestamp''snapshot'
scan.startup.timestamp Long 起始点的时间戳,单位为秒。仅在启动模式为 'timestamp' 时可用。
username String 连接 OceanBase 数据库的用户的名称。
password String 连接 OceanBase 数据库时使用的密码。
tenant-name String 待监控 OceanBase 数据库的租户名,应该填入精确值。
database-name String 待监控 OceanBase 数据库的数据库名,应该是正则表达式。
table-name String 待监控 OceanBase 数据库的表名,应该是正则表达式。
table-list String 待监控 OceanBase 数据库的全路径的表名列表,逗号分隔,如:"db1.table1, db2.table2"。
hostname String OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。
port Integer OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 2881)
或 OceanBase代理服务的端口号(默认值为 2883)
connect.timeout 30s Duration 连接器在尝试连接到 OceanBase 数据库服务器超时前的最长时间。
server-time-zone +00:00 String 数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。
合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,
如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。
logproxy.host String OceanBase 日志代理服务 的 IP 地址或主机名。
logproxy.port Integer OceanBase 日志代理服务 的端口号。
logproxy.client.id 规则生成 String OceanBase日志代理服务的客户端连接 ID,默认值的生成规则是 {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}。
rootserver-list String OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,
多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。
config-url String 从配置服务器获取服务器信息的 url, OceanBase 企业版本必填。
working-mode storage String 日志代理中 `libobcdc` 的工作模式 , 可以是 `storage` 或 `memory`。
compatible-mode mysql String OceanBase 的兼容模式,可以是 `mysql` 或 `oracle`。
jdbc.driver com.mysql.cj.jdbc.Driver String 全量读取时使用的 jdbc 驱动类名。
jdbc.properties.* String 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'。
obcdc.properties.* String 传递自定义 libobcdc 属性的选项,如 'obcdc.properties.sort_trans_participants' = '1'。详情参见 obcdc 配置项说明

支持的元数据 #

在创建表时,您可以使用以下格式的元数据作为只读列(VIRTUAL)。

列名 数据类型 描述
tenant_name STRING 当前记录所属的租户名称。
database_name STRING 当前记录所属的 db 名。
schema_name STRING 当前记录所属的 schema 名。
table_name STRING NOT NULL 当前记录所属的表名称。
op_ts TIMESTAMP_LTZ(3) NOT NULL 该值表示此修改在数据库中发生的时间。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。

如下 SQL 展示了如何在表中使用这些元数据列:

CREATE TABLE products (
    tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
   'connector' = 'oceanbase-cdc',
   'scan.startup.mode' = 'initial',
   'username' = 'user@test_tenant',
   'password' = 'pswd',
   'tenant-name' = 'test_tenant',
   'database-name' = '^test_db$',
   'table-name' = '^orders$',
   'hostname' = '127.0.0.1',
   'port' = '2881',
   'rootserver-list' = '127.0.0.1:2882:2881',
   'logproxy.host' = '127.0.0.1',
   'logproxy.port' = '2983',
   'working-mode' = 'memory'
);

特性 #

At-Least-Once 处理 #

OceanBase CDC 连接器是一个 Flink Source 连接器。它将首先读取数据库快照,然后再读取变化事件,并进行 At-Least-Once 处理

OceanBase 数据库是一个分布式数据库,它的日志也分散在不同的服务器上。由于没有类似 MySQL binlog 偏移量的位置信息,OceanBase 数据库用时间戳作为位置标记。为确保读取完整的数据,liboblog(读取 OceanBase 日志记录的 C++ 库)可能会在给定的时间戳之前读取一些日志数据。因此,OceanBase 数据库可能会读到起始点附近时间戳的重复数据,可保证 At-Least-Once 处理

启动模式 #

配置选项 scan.startup.mode 指定 OceanBase CDC 连接器的启动模式。可用取值包括:

  • initial(默认):在首次启动时对受监视的数据库表执行初始快照,并继续读取最新的提交日志。
  • latest-offset:首次启动时,不对受监视的数据库表执行快照,仅从连接器启动时读取提交日志。
  • timestamp:在首次启动时不对受监视的数据库表执行初始快照,仅从指定的 scan.startup.timestamp 读取最新的提交日志。
  • snapshot: 仅对受监视的数据库表执行初始快照。

消费提交日志 #

OceanBase CDC 连接器使用 oblogclient 消费 OceanBase日志代理服务 中的事务日志。

DataStream Source #

OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以按照如下创建一个 SourceFunction:

import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class OceanBaseSourceExample {
   public static void main(String[] args) throws Exception {
      SourceFunction<String> oceanBaseSource =
              OceanBaseSource.<String>builder()
                      .startupOptions(StartupOptions.initial())
                      .hostname("127.0.0.1")
                      .port(2881)
                      .username("user@test_tenant")
                      .password("pswd")
                      .compatibleMode("mysql")
                      .jdbcDriver("com.mysql.cj.jdbc.Driver")
                      .tenantName("test_tenant")
                      .databaseName("^test_db$")
                      .tableName("^test_table$")
                      .logProxyHost("127.0.0.1")
                      .logProxyPort(2983)
                      .rsList("127.0.0.1:2882:2881")
                      .serverTimeZone("+08:00")
                      .deserializer(new JsonDebeziumDeserializationSchema())
                      .build();

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // enable checkpoint
      env.enableCheckpointing(3000);

      env.addSource(oceanBaseSource).print().setParallelism(1);
      env.execute("Print OceanBase Snapshot + Change Events");
   }
}

数据类型映射 #

Mysql 模式 #

OceanBase 数据类型 Flink SQL 类型 描述
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <=65
STRING DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。
但在 Flink 中,DECIMAL 的最高精度为 38。因此,
如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。
DATE DATE
TIME [(p)] TIME [(p)]
DATETIME [(p)] TIMESTAMP [(p)]
TIMESTAMP [(p)] TIMESTAMP_LTZ [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈(n + 7) / 8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
YEAR INT
ENUM STRING
SET ARRAY<STRING> 因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,
所以映射到 Flink 时是一个字符串数组
JSON STRING JSON 类型的数据在 Flink 中会转化为 JSON 格式的字符串

Oracle 模式 #

OceanBase type Flink SQL type NOTE
NUMBER(1) BOOLEAN
NUMBER(p, s <= 0), p - s < 3 TINYINT
NUMBER(p, s <= 0), p - s < 5 SMALLINT
NUMBER(p, s <= 0), p - s < 10 INT
NUMBER(p, s <= 0), p - s < 19 BIGINT
NUMBER(p, s <= 0), 19 <=p - s <=38 DECIMAL(p - s, 0)
NUMBER(p, s > 0) DECIMAL(p, s)
NUMBER(p, s <= 0), p - s> 38 STRING
FLOAT
BINARY_FLOAT
FLOAT
BINARY_DOUBLE DOUBLE
DATE
TIMESTAMP [(p)]
TIMESTAMP [(p)]
CHAR(n)
NCHAR(n)
VARCHAR(n)
VARCHAR2(n)
NVARCHAR2(n)
CLOB
STRING
RAW
BLOB
ROWID
BYTES

Back to top