Db2 CDC Connector
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

Db2 CDC Connector #

The Db2 CDC connector allows for reading snapshot data and incremental data from Db2 database. This document describes how to setup the db2 CDC connector to run SQL queries against Db2 databases.

Supported Databases #

Connector Database Driver
Db2-cdc
  • Db2: 11.5
  • Db2 Driver: 11.5.0.0

    Dependencies #

    In order to set up the Db2 CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

    Maven dependency #

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-db2-cdc</artifactId>
        <version>3.0</version>
    </dependency>

    SQL Client JAR #

    Download link is available only for stable releases.

    Download flink-sql-connector-db2-cdc-3.0-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/.

    Note: flink-sql-connector-db2-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-db2-cdc-2.3.0.jar, the released version will be available in the Maven central warehouse.

    Setup Db2 server #

    Follow the steps in the Debezium Db2 Connector.

    Notes #

    Not support BOOLEAN type in SQL Replication on Db2 #

    Only snapshots can be taken from tables with BOOLEAN type columns. Currently, SQL Replication on Db2 does not support BOOLEAN, so Debezium can not perform CDC on those tables. Consider using another type to replace BOOLEAN type.

    How to create a Db2 CDC table #

    The Db2 CDC table can be defined as following:

    -- checkpoint every 3 seconds                     
    Flink SQL> SET 'execution.checkpointing.interval' = '3s';   
    
    -- register a Db2 table 'products' in Flink SQL
    Flink SQL> CREATE TABLE products (
         ID INT NOT NULL,
         NAME STRING,
         DESCRIPTION STRING,
         WEIGHT DECIMAL(10,3)
         ) WITH (
         'connector' = 'db2-cdc',
         'hostname' = 'localhost',
         'port' = '50000',
         'username' = 'root',
         'password' = '123456',
         'database-name' = 'mydb',
         'schema-name' = 'myschema',
         'table-name' = 'products');
      
    -- read snapshot and binlogs from products table
    Flink SQL> SELECT * FROM products;
    

    Connector Options #

    Option Required Default Type Description
    connector required (none) String Specify what connector to use, here should be 'db2-cdc'.
    hostname required (none) String IP address or hostname of the Db2 database server.
    username required (none) String Name of the Db2 database to use when connecting to the Db2 database server.
    password required (none) String Password to use when connecting to the Db2 database server.
    database-name required (none) String Database name of the Db2 server to monitor.
    schema-name required (none) String Schema name of the Db2 database to monitor.
    table-name required (none) String Table name of the Db2 database to monitor.
    port optional 50000 Integer Integer port number of the Db2 database server.
    scan.startup.mode optional initial String Optional startup mode for Db2 CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Position section for more detailed information.
    server-time-zone optional (none) String The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in Db2 converted to STRING. See more here. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
    debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Db2 server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's Db2 Connector properties

    Features #

    Startup Reading Position #

    The config option scan.startup.mode specifies the startup mode for DB2 CDC consumer. The valid enumerations are:

    • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
    • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.

    Note: the mechanism of scan.startup.mode option relying on Debezium’s snapshot.mode configuration. So please do not using them together. If you speicifying both scan.startup.mode and debezium.snapshot.mode options in the table DDL, it may make scan.startup.mode doesn’t work.

    DataStream Source #

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    public class Db2SourceExample {
      public static void main(String[] args) throws Exception {
        SourceFunction<String> db2Source =
                Db2Source.<String>builder()
                        .hostname("yourHostname")
                        .port(50000)
                        .database("yourDatabaseName") // set captured database
                        .tableList("yourSchemaName.yourTableName") // set captured table
                        .username("yourUsername")
                        .password("yourPassword")
                        .deserializer(
                                new JsonDebeziumDeserializationSchema()) // converts SourceRecord to
                        // JSON String
                        .build();
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // enable checkpoint
        env.enableCheckpointing(3000);
    
        env.addSource(db2Source)
                .print()
                .setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
        env.execute("Print Db2 Snapshot + Change Stream");
      }
    }
    

    Data Type Mapping #

    Db2 type Flink SQL type NOTE
    SMALLINT
    SMALLINT
    INTEGER INT
    BIGINT BIGINT
    REAL FLOAT
    DOUBLE DOUBLE
    NUMERIC(p, s)
    DECIMAL(p, s)
    DECIMAL(p, s)
    DATE DATE
    TIME TIME
    TIMESTAMP [(p)] TIMESTAMP [(p)]
    CHARACTER(n) CHAR(n)
    VARCHAR(n) VARCHAR(n)
    BINARY(n) BINARY(n)
    VARBINARY(N) VARBINARY(N)
    BLOB
    CLOB
    DBCLOB
    BYTES
    VARGRAPHIC
    XML
    STRING

    Back to top