MongoDB

MongoDB CDC Connector #

The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB. This document describes how to setup the MongoDB CDC connector to run SQL queries against MongoDB.

Dependencies #

In order to setup the MongoDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency #

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-mongodb-cdc</artifactId>
    <version>3.2.1</version>
</dependency>

SQL Client JAR #

Download link is available only for stable releases.

Download flink-sql-connector-mongodb-cdc-3.2.1.jar and put it under <FLINK_HOME>/lib/.

Note: Refer to flink-sql-connector-mongodb-cdc, more released versions will be available in the Maven central warehouse.

Setup MongoDB #

Availability #

  • MongoDB version

    MongoDB version >= 3.6
    We use change streams feature (new in version 3.6) to capture change data.

  • Cluster Deployment

    replica sets or sharded clusters is required.

  • Storage Engine

    WiredTiger storage engine is required.

  • Replica set protocol version

    Replica set protocol version 1 (pv1) is required.
    Starting in version 4.0, MongoDB only supports pv1. pv1 is the default for all new replica sets created with MongoDB 3.2 or later.

  • Privileges

    changeStream and read privileges are required by MongoDB Kafka Connector.

    You can use the following example for simple authorization.
    For more detailed authorization, please refer to MongoDB Database User Roles.

    use admin;
    db.createRole(
        {
            role: "flinkrole",
            privileges: [{
                // Grant privileges on all non-system collections in all databases
                resource: { db: "", collection: "" },
                actions: [
                    "splitVector",
                    "listDatabases",
                    "listCollections",
                    "collStats",
                    "find",
                    "changeStream" ]
            }],
            roles: [
                // Read config.collections and config.chunks
                // for sharded cluster snapshot splitting.
                { role: 'read', db: 'config' }
            ]
        }
    );
    
    db.createUser(
      {
          user: 'flinkuser',
          pwd: 'flinkpw',
          roles: [
             { role: 'flinkrole', db: 'admin' }
          ]
      }
    );
    

How to create a MongoDB CDC table #

The MongoDB CDC table can be defined as following:

-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE products (
  _id STRING, // must be declared
  name STRING,
  weight DECIMAL(10,3),
  tags ARRAY<STRING>, -- array
  price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'products'
);

-- read snapshot and change events from products collection
SELECT * FROM products;

Note that

MongoDB’s change event record doesn’t have updated before message. So, we can only convert it to Flink’s UPSERT changelog stream. An upsert stream requires a unique key, so we must declare _id as primary key. We can’t declare other column as primary key, because delete operation does not contain the key and value besides _id and sharding key.

Connector Options #

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be mongodb-cdc.
scheme optional mongodb String The protocol connected to MongoDB. eg. mongodb or mongodb+srv.
hosts required (none) String The comma-separated list of hostname and port pairs of the MongoDB servers.
eg. localhost:27017,localhost:27018
username optional (none) String Name of the database user to be used when connecting to MongoDB.
This is required only when MongoDB is configured to use authentication.
password optional (none) String Password to be used when connecting to MongoDB.
This is required only when MongoDB is configured to use authentication.
database optional (none) String Name of the database to watch for changes. If not set then all databases will be captured.
The database also supports regular expressions to monitor multiple databases matching the regular expression.
collection optional (none) String Name of the collection in the database to watch for changes. If not set then all collections will be captured.
The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers.
connection.options optional (none) String The ampersand-separated connection options of MongoDB. eg.
replicaSet=test&connectTimeoutMS=300000
scan.startup.mode optional initial String Optional startup mode for MongoDB CDC consumer, valid enumerations are "initial", "latest-offset" and "timestamp". Please see Startup Reading Position section for more detailed information.
scan.startup.timestamp-millis optional (none) Long Timestamp in millis of the start point, only used for 'timestamp' startup mode.
copy.existing.queue.size optional 10240 Integer The max size of the queue to use when copying data.
batch.size optional 1024 Integer The cursor batch size.
poll.max.batch.size optional 1024 Integer Maximum number of change stream documents to include in a single batch when polling for new data.
poll.await.time.ms optional 1000 Integer The amount of time to wait before checking for new results on the change stream.
heartbeat.interval.ms optional 0 Integer The length of time in milliseconds between sending heartbeat messages. Use 0 to disable.
scan.full-changelog optional false Boolean Whether try to generate full-mode changelog based on pre- and post-images in MongoDB. Refer to Full Changelog for more details. Supports MongoDB 6.0 and above only.
scan.incremental.snapshot.enabled optional false Boolean Whether enable incremental snapshot. The incremental snapshot feature only supports after MongoDB 4.0.
scan.incremental.snapshot.chunk.size.mb optional 64 Integer The chunk size mb of incremental snapshot.
scan.incremental.snapshot.chunk.samples optional 20 Integer The samples count per chunk when using sample partition strategy during incremental snapshot.
scan.incremental.close-idle-reader.enabled optional false Boolean Whether to close idle readers at the end of the snapshot phase.
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
scan.cursor.no-timeout optional true Boolean MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that. Only available when parallelism snapshot is enabled.

Note: heartbeat.interval.ms is highly recommended setting a proper value larger than 0 if the collection changes slowly. The heartbeat event can push the resumeToken forward to avoid resumeToken being expired when we recover the Flink job from a checkpoint or savepoint.

Available Metadata #

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
database_name STRING NOT NULL Name of the database that contain the row.
collection_name STRING NOT NULL Name of the collection that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products'
);

Features #

Exactly-Once Processing #

The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with exactly-once processing even failures happen.

Startup Reading Position #

The config option scan.startup.mode specifies the startup mode for MongoDB CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest oplog.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the oplog which means only have the changes since the connector was started.
  • timestamp: Skip snapshot phase and start reading oplog events from a specific timestamp.

For example in DataStream API:

MongoDBSource.builder()
    .startupOptions(StartupOptions.latest()) // Start from latest offset
    .startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
    .build()

and with SQL:

CREATE TABLE mongodb_source (...) WITH (
    'connector' = 'mongodb-cdc',
    'scan.startup.mode' = 'latest-offset', -- Start from latest offset
    ...
    'scan.startup.mode' = 'timestamp', -- Start from timestamp
    'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under timestamp startup mode
    ...
)

Change Streams #

We integrate the MongoDB’s official Kafka Connector to read snapshot or change events from MongoDB and drive it by Debezium’s EmbeddedEngine.

Debezium’s EmbeddedEngine provides a mechanism for running a single Kafka Connect SourceConnector within an application’s process, and it can drive any standard Kafka Connect SourceConnector properly even which is not provided by Debezium.

We choose MongoDB’s official Kafka Connector instead of the Debezium’s MongoDB Connector because they use a different change data capture mechanism.

  • For Debezium’s MongoDB Connector, it reads the oplog.rs collection of each replica-set’s master node.
  • For MongoDB’s Kafka Connector, it subscribes Change Stream of MongoDB.

MongoDB’s oplog.rs collection doesn’t keep the changed record’s update before state, so it’s hard to extract the full document state by a single oplog.rs record and convert it to change log stream accepted by Flink (Insert Only, Upsert, All). Additionally, MongoDB 5 (released in July 2021) has changed the oplog format, so the current Debezium connector cannot be used with it.

Change Stream is a new feature provided by MongoDB 3.6 for replica sets and sharded clusters that allows applications to access real-time data changes without the complexity and risk of tailing the oplog.
Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them.

Lookup Full Document for Update Operations is a feature provided by Change Stream which can configure the change stream to return the most current majority-committed version of the updated document. Because of this feature, we can easily collect the latest full document and convert the change log to Flink’s Upsert Changelog Stream.

By the way, Debezium’s MongoDB change streams exploration mentioned by DBZ-435 is on roadmap.
If it’s done, we can consider integrating two kinds of source connector for users to choose.

DataStream Source #

The MongoDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mongodb.MongoDBSource;

public class MongoDBSourceExample {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
                .hosts("localhost:27017")
                .username("flink")
                .password("flinkpw")
                .databaseList("inventory") // set captured database, support regex
                .collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute();
    }
}

The MongoDB CDC incremental connector (after 2.3.0) can be used as the following shows:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;

public class MongoDBIncrementalSourceExample {
    public static void main(String[] args) throws Exception {
        MongoDBSource<String> mongoSource =
                MongoDBSource.<String>builder()
                        .hosts("localhost:27017")
                        .databaseList("inventory") // set captured database, support regex
                        .collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
                        .username("flink")
                        .password("flinkpw")
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000);
        // set the source parallelism to 2
        env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
                .setParallelism(2)
                .print()
                .setParallelism(1);

        env.execute("Print MongoDB Snapshot + Change Stream");
    }
}

Note:

  • If database regex is used, readAnyDatabase role is required.
  • The incremental snapshot feature only supports after MongoDB 4.0.

Full Changelog #

MongoDB 6.0 and above supports emitting change stream events containing document before and after the change was made (aka. pre- and post-images).

  • The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.

  • The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.

MongoDB CDC could make uses of pre-image and post-images to generate full-mode changelog stream including Insert, Update Before, Update After, and Delete data rows, thereby avoiding additional ChangelogNormalize downstream node.

To enable this feature, here’s some prerequisites:

  • MongoDB version must be 6.0 or above;
  • Enable preAndPostImages feature at the database level:
db.runCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: {
        expireAfterSeconds: 'off' // replace with custom image expiration time
      }
    }
  }
})
  • Enable changeStreamPreAndPostImages feature for collections to be monitored:
db.runCommand({
  collMod: "<< collection name >>", 
  changeStreamPreAndPostImages: {
    enabled: true 
  } 
})
  • Enable MongoDB CDC’s scan.full-changelog feature:
MongoDBSource.builder()
    .scanFullChangelog(true)
    ...
    .build()

or with Flink SQL:

CREATE TABLE mongodb_source (...) WITH (
    'connector' = 'mongodb-cdc',
    'scan.full-changelog' = 'true',
    ...
)

Data Type Mapping #

BSON short for Binary JSON is a binary-encoded serialization of JSON-like format used to store documents and make remote procedure calls in MongoDB.

Flink SQL Data Type is similar to the SQL standard’s data type terminology which describes the logical type of a value in the table ecosystem. It can be used to declare input and/or output types of operations.

In order to enable Flink SQL to process data from heterogeneous data sources, the data types of heterogeneous data sources need to be uniformly converted to Flink SQL data types.

The following is the mapping of BSON type and Flink SQL type.

BSON type Flink SQL type
TINYINT
SMALLINT
Int
INT
Long BIGINT
FLOAT
Double DOUBLE
Decimal128 DECIMAL(p, s)
Boolean BOOLEAN
Date
Timestamp
DATE
Date
Timestamp
TIME
Date TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
BinData BYTES
Object ROW
Array ARRAY
DBPointer ROW<$ref STRING, $id STRING>
GeoJSON Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

Reference #

Back to top