MongoDB
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

MongoDB CDC 连接器 #

MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。 本文档描述了如何设置 MongoDB CDC 连接器以针对 MongoDB 运行 SQL 查询。

依赖 #

为了设置 MongoDB CDC 连接器, 下表提供了使用构建自动化工具(如 Maven 或 SBT )和带有 SQLJar 捆绑包的 SQLClient 的两个项目的依赖关系信息。

Maven dependency #

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-mongodb-cdc</artifactId>
   <!--  请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->
   <version>3.2-SNAPSHOT</version>
</dependency>

SQL Client JAR #

下载链接仅适用于稳定版本。

下载 flink-sql-connector-mongodb-cdc-3.0.1.jar, 把它放在 <FLINK_HOME>/lib/.

注意: 参考 flink-sql-connector-mongodb-cdc, 当前已发布的版本将在 Maven 中央仓库中提供。

设置 MongoDB #

可用性 #

  • MongoDB 版本

    MongoDB 版本 >= 3.6
    我们使用 更改流 功能(3.6 版中新增),以捕获更改数据。

  • 集群部署

    副本集 或者 分片集群 是必需的。

  • 存储引擎

    WiredTiger 存储引擎是必需的。

  • 副本集协议版本

    副本集协议版本 1 (pv1) 是必需的。
    从 4.0 版本开始,MongoDB 只支持pv1。 pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。

  • 权限

    changeStream and read 是 MongoDB Kafka Connector 必需权限。

    你可以使用以下示例进行简单的授权。
    有关更详细的授权, 请参照 MongoDB 数据库用户角色.

    use admin;
    db.createRole(
        {
            role: "flinkrole",
            privileges: [{
                // 所有数据库中所有非系统集合的 grant 权限
                resource: { db: "", collection: "" },
                actions: [
                    "splitVector",
                    "listDatabases",
                    "listCollections",
                    "collStats",
                    "find",
                    "changeStream" ]
            }],
            roles: [
               // 阅读 config.collections 和 config.chunks
               // 用于分片集群快照拆分。
                { role: 'read', db: 'config' }
            ]
        }
    );
    
    db.createUser(
      {
          user: 'flinkuser',
          pwd: 'flinkpw',
          roles: [
             { role: 'flinkrole', db: 'admin' }
          ]
      }
    );
    

如何创建 MongoDB CDC 表 #

MongoDB CDC 表可以定义如下:

-- 在 Flink SQL 中注册 MongoDB 表 `products`
CREATE TABLE products (
  _id STRING, // 必须声明
  name STRING,
  weight DECIMAL(10,3),
  tags ARRAY<STRING>, -- array
  price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
  suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'products'
);

-- 从 `products` 集合中读取快照和更改事件
SELECT * FROM products;

请注意

MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能将其转换为 Flink 的 UPSERT 更改日志流。 upstart 流需要一个唯一的密钥,所以我们必须声明 _id 作为主键。 我们不能将其他列声明为主键, 因为删除操作不包含除 _idsharding key 之外的键和值。

连接器选项 #

Option Required Default Type Description
connector required (none) String 指定要使用的连接器,此处应为 mongodb-cdc.
scheme optional mongodb String 指定 MongoDB 连接协议。 eg. mongodb or mongodb+srv.
hosts required (none) String MongoDB 服务器的主机名和端口对的逗号分隔列表。
eg. localhost:27017,localhost:27018
username optional (none) String 连接到 MongoDB 时要使用的数据库用户的名称。
只有当 MongoDB 配置为使用身份验证时,才需要这样做。
password optional (none) String 连接到 MongoDB 时要使用的密码。
只有当 MongoDB 配置为使用身份验证时,才需要这样做。
database optional (none) String 要监视更改的数据库的名称。 如果未设置,则将捕获所有数据库。
该数据库还支持正则表达式来监视与正则表达式匹配的多个数据库。
collection optional (none) String 数据库中要监视更改的集合的名称。 如果未设置,则将捕获所有集合。
该集合还支持正则表达式来监视与完全限定的集合标识符匹配的多个集合。
connection.options optional (none) String MongoDB连接选项。 例如:
replicaSet=test&connectTimeoutMS=300000
scan.startup.mode optional initial String MongoDB CDC 消费者可选的启动模式, 合法的模式为 "initial","latest-offset" 和 "timestamp"。 请查阅 启动模式 章节了解更多详细信息。
scan.startup.timestamp-millis optional (none) Long 起始毫秒数, 仅适用于 'timestamp' 启动模式.
batch.size optional 1024 Integer Cursor 批次大小。
poll.max.batch.size optional 1024 Integer 轮询新数据时,单个批处理中要包含的更改流文档的最大数量。
poll.await.time.ms optional 1000 Integer 在更改流上检查新结果之前等待的时间。
heartbeat.interval.ms optional 0 Integer 心跳间隔(毫秒)。使用 0 禁用。
scan.incremental.snapshot.enabled optional false Boolean 是否启用增量快照。增量快照功能仅支持 MongoDB 4.0 之后的版本。
scan.incremental.snapshot.chunk.size.mb optional 64 Integer 增量快照的区块大小 mb。
scan.incremental.close-idle-reader.enabled optional false Boolean 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。

注意: heartbeat.interval.ms 强烈建议设置一个大于 0 的适当值 如果集合更改缓慢. 当我们从检查点或保存点恢复 Flink 作业时,心跳事件可以向前推送 resumeToken,以避免 resumeToken 过期。

可用元数据 #

以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。

Key DataType Description
database_name STRING NOT NULL 包含该行的数据库的名称。
collection_name STRING NOT NULL 包含该行的集合的名称。
op_ts TIMESTAMP_LTZ(3) NOT NULL 它指示在数据库中进行更改的时间。
如果记录是从表的快照而不是改变流中读取的,该值将始终为0。

扩展的 CREATE TABLE 示例演示了用于公开这些元数据字段的语法:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    _id STRING, // 必须声明
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products'
);

特性 #

精确一次处理 #

MongoDB CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后在处理甚至失败时继续读取带有的更改流事件。

启动模式 #

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 oplog。
  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 oplog 事件。

例如使用 DataStream API:

MongoDBSource.builder()
    .startupOptions(StartupOptions.latest()) // Start from latest offset
    .startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
    .build()

and with SQL:

CREATE TABLE mongodb_source (...) WITH (
    'connector' = 'mongodb-cdc',
    'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
    ...
    'scan.incremental.snapshot.enabled' = 'true', -- 指定时间戳启动,需要开启增量快照读
    'scan.startup.mode' = 'timestamp', -- 指定时间戳启动模式
    'scan.startup.timestamp-millis' = '1667232000000' -- 启动毫秒时间
    ...
)

Notes:

  • ’timestamp’ 指定时间戳启动模式,需要开启增量快照读。

更改流 #

我们将 MongoDB’s official Kafka Connector 从 MongoDB 中读取快照或更改事件,并通过 Debezium 的 EmbeddedEngine 进行驱动。

Debezium 的 EmbeddedEngine 提供了一种在应用程序进程中运行单个 Kafka Connect SourceConnector 的机制,并且它可以正确地驱动任何标准的 Kafka Connect SourceConnector,即使它不是由 Debezium 提供的。

我们选择 MongoDB 的官方 Kafka连接器,而不是 Debezium 的MongoDB 连接器,因为它们使用了不同的更改数据捕获机制。

  • 对于 Debezium 的 MongoDB 连接器,它读取每个复制集主节点的 oplog.rs 集合。
  • 对于 MongoDB 的 Kafka 连接器,它订阅了 MongoDB 的 更改流

MongoDB 的oplog.rs 集合没有在状态之前保持更改记录的更新, 因此,很难通过单个 oplog.rs 记录提取完整的文档状态,并将其转换为 Flink 接受的更改日志流(Insert Only,Upsert,All)。 此外,MongoDB 5(2021 7月发布)改变了 oplog 格式,因此当前的 Debezium 连接器不能与其一起使用。

Change Stream是 MongoDB 3.6 为副本集和分片集群提供的一项新功能,它允许应用程序访问实时数据更改,而不会带来跟踪操作日志的复杂性和风险。
应用程序可以使用更改流来订阅单个集合上的所有数据更改, 数据库或整个部署,并立即对其做出反应。

查找更新操作的完整文档变更流提供的一项功能,它可以配置变更流以返回更新文档的最新多数提交版本。由于该功能,我们可以轻松收集最新的完整文档,并将更改日志转换为 Flink 的Upsert Changelog Stream

顺便说一句,DBZ-435提到的Debezium的MongoDB变更流探索,正在制定路线图。
如果完成了,我们可以考虑集成两种源连接器供用户选择。

DataStream Source #

MongoDB CDC 连接器也可以是一个数据流源。 你可以创建 SourceFunction,如下所示:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mongodb.MongoDBSource;

public class MongoDBSourceExample {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
                .hosts("localhost:27017")
                .username("flink")
                .password("flinkpw")
                .databaseList("inventory") // 设置捕获的数据库,支持正则表达式
                .collectionList("inventory.products", "inventory.orders") //设置捕获的集合,支持正则表达式
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction)
                .print().setParallelism(1); // 对 sink 使用并行度 1 以保持消息顺序

        env.execute();
    }
}

MongoDB CDC 增量连接器(2.3.0 之后)可以使用,如下所示:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;

public class MongoDBIncrementalSourceExample {
    public static void main(String[] args) throws Exception {
        MongoDBSource<String> mongoSource =
                MongoDBSource.<String>builder()
                        .hosts("localhost:27017")
                        .databaseList("inventory") // 设置捕获的数据库,支持正则表达式
                        .collectionList("inventory.products", "inventory.orders") //设置捕获的集合,支持正则表达式
                        .username("flink")
                        .password("flinkpw")
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 启用检查点
        env.enableCheckpointing(3000);
        // 将 source 并行度设置为 2
        env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
                .setParallelism(2)
                .print()
                .setParallelism(1);

        env.execute("Print MongoDB Snapshot + Change Stream");
    }
}

注意:

  • 如果使用数据库正则表达式,则需要 readAnyDatabase 角色。
  • 增量快照功能仅支持 MongoDB 4.0 之后的版本。

数据类型映射 #

BSON 二进制 JSON的缩写是一种类似 JSON 格式的二进制编码序列,用于在 MongoDB 中存储文档和进行远程过程调用。

Flink SQL Data Type 类似于 SQL 标准的数据类型术语,该术语描述了表生态系统中值的逻辑类型。它可以用于声明操作的输入和/或输出类型。

为了使 Flink SQL 能够处理来自异构数据源的数据,异构数据源的数据类型需要统一转换为 Flink SQL 数据类型。

以下是 BSON 类型和 Flink SQL 类型的映射。

BSON type Flink SQL type
TINYINT
SMALLINT
Int
INT
Long BIGINT
FLOAT
Double DOUBLE
Decimal128 DECIMAL(p, s)
Boolean BOOLEAN
Date
Timestamp
DATE
Date
Timestamp
TIME
Date TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
BinData BYTES
Object ROW
Array ARRAY
DBPointer ROW<$ref STRING, $id STRING>
GeoJSON Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

参考 #

Back to top