MongoDB

MongoDB SQL 连接器 #

Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode

MongoDB 连接器提供了从 MongoDB 中读取和写入数据的能力。 本文档介绍如何设置 MongoDB 连接器以对 MongoDB 运行 SQL 查询。

连接器可以在 upsert 模式下运行,使用在 DDL 上定义的主键与外部系统交换 UPDATE/DELETE 消息。

如果 DDL 上没有定义主键,则连接器只能以 append 模式与外部系统交换 INSERT 消息且不支持消费 UPDATE/DELETE 消息。

依赖 #

In order to use the MongoDB connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-mongodb</artifactId>
      <version>1.1.2-1.19</version>
    </dependency>
Download

MongoDB 连接器目前并不包含在 Flink 的二进制发行版中,请查阅这里了解如何在集群运行中引用 MongoDB 连接器。

如何创建 MongoDB 表 #

MongoDB 表可以以如下方式定义:

-- 在 Flink SQL 中注册一张 MongoDB 'users' 表
CREATE TABLE MyUserTable (
  _id STRING,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (_id) NOT ENFORCED
) WITH (
   'connector' = 'mongodb',
   'uri' = 'mongodb://user:password@127.0.0.1:27017',
   'database' = 'my_db',
   'collection' = 'users'
);

-- 读取表 "T" 的数据并写入至 MongoDB 表
INSERT INTO MyUserTable
SELECT _id, name, age, status FROM T;

-- 从 MongoDB 表中读取数据
SELECT id, name, age, status FROM MyUserTable;

-- 将 MongoDB 表作为维度表关联
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable._id;

连接器参数 #

参数 是否必选 是否透传 默认值 数据类型 描述
connector
必填 (none) String 指定使用什么类型的连接器,这里应该是'mongodb'
uri
必填 (none) String MongoDB 连接字符串。
database
必填 (none) String 读取或写入的数据库。
collection
必填 (none) String 读取或写入的集合。
scan.fetch-size
可选 2048 Integer 每次循环读取时应该从游标中获取的行数。
scan.cursor.no-timeout
可选 true Boolean MongoDB 服务端通常会将空闲时间超过 10 分钟的 cursor 关闭,来节省内存开销。将这个参数设置为 true 可以 防止 cursor 因为读取时间过长或者背压导致的空闲而关闭。需要注意的是。如果 cursor 所在的会话空闲时间超过 了 30 分钟,MongoDB 会关闭当前会话以及由当前会话打开的 cursors,不管有没有设置 `noCursorTimeout()` 或者 `maxTimeMS()` 大于 30分钟。
scan.partition.strategy
可选 default String 设置分区策略。可以选择的分区策略有`single`,`sample`,`split-vector`,`sharded` 和 `default`。 请参阅分区扫描部分了解更多详情。
scan.partition.size
可选 64mb MemorySize 设置每个分区的内存大小。通过指定的分区大小,将 MongoDB 的一个集合切分成多个分区。
scan.partition.samples
可选 10 Integer 仅用于 `SAMPLE` 抽样分区策略,设置每个分区的样本数量。抽样分区器根据分区键对集合进行随机采样的方式计算分区边界。 `总的样本数量 = 每个分区的样本数量 * (文档总数 / 每个分区的文档数量)`。
lookup.cache
可选 NONE

枚举类型

可选值: NONE, PARTIAL
维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)。
lookup.partial-cache.max-rows
可选 (none) Long 维表缓存的最大行数,若超过该值,则最老的行记录将会过期。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。
lookup.partial-cache.expire-after-write
可选 (none) Duration 在记录写入缓存后该记录的最大保留时间。使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 请参阅下面的 Lookup Cache 部分了解更多详情。
lookup.partial-cache.expire-after-access
可选 (none) Duration 在缓存中的记录被访问后该记录的最大保留时间。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 请参阅下面的 Lookup Cache 部分了解更多详情。
lookup.partial-cache.caching-missing-key
可选 true Boolean 是否缓存维表中不存在的键,默认为true。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。
lookup.max-retries
可选 3 Integer 查询数据库失败的最大重试次数。
lookup.retry.interval
可选 1s Duration 查询数据库失败的最大重试时间。
sink.buffer-flush.max-rows
可选 1000 Integer 设置写入的最大批次大小。
sink.buffer-flush.interval
可选 1s Duration 设置写入的最大间隔时间。
sink.max-retries
可选 3 Integer 设置写入失败时最大重试次数。
sink.retry.interval
可选 1s Duration 设置写入失败时最大重试时间。
sink.parallelism
可选 (none) Integer 用于定义 MongoDB sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。
sink.delivery-guarantee
可选 at-lease-once

Enum

可选值: none, at-least-once
设置投递保证。仅一次(exactly-once)的投递保证暂不支持。

特性 #

键处理 #

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。 如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

在 MongoDB 连接器中主键用来计算 MongoDB 文档的保留主键 _id。 MongoDB 的文档 _id 是唯一的且不可变的,可以是除了 Array 外的任何 BSON 类型。 如果 _id 是嵌套的,嵌套字段的名称不能包含 ($) 符号。

MongoDB 的主键有一些限制。 在 MongoDB 4.2 之前,索引值的大小需要小于 1024 字节。 在 MongoDB 4.2 及之后版本,移除了索引大小的限制。 请参阅 Index Key Limit 了解更多详情。

MongoDB 连接器通过将 DDL 声明的主键进行组合,来生成文档的保留主键 _id。

  • 当 DDL 中声明的主键仅包含一个字段时,将字段数据转换为 bson 类型的值作为相应文档的_id。
  • 当 DDL 中声明的主键包含多个字段时,我们将所有声明的主键字段组合为一个 bson document 的嵌套类型,作为相应文档的_id。 举个例子,如果我们在 DDL 中定义主键为 PRIMARY KEY (f1, f2) NOT ENFORCED,那么相应文档的 _id 将会是 _id: {f1: v1, f2: v2} 的形式。

需要注意的是:当 _id 字段存在于 DDL 中,但是主键并没有指定为 _id 时,会产生歧义。 要么将 _id 字段声明为主键,或者将 _id 字段进行重命名。

有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL

分区扫描 #

为了在并行 Source task 实例中加速读取数据,Flink 为 MongoDB table 提供了分区扫描的特性。

  • single: 将整个集合作为一个分区。
  • sample: 通过随机采样的方式来生成分区,快速但可能不均匀。
  • split-vector: 通过 MongoDB 计算分片的 splitVector 命令来生成分区,快速且均匀。 仅适用于未分片集合,需要 splitVector 权限。
  • sharded: 从 config.chunks 集合中直接读取分片集合的分片边界作为分区,不需要额外计算,快速且均匀。 仅适用于已经分片的集合,需要 config 数据库的读取权限。
  • default: 对分片集合使用 sharded 策略,对未分片集合使用 split-vector 策略。

Lookup Cache #

MongoDB 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。

默认情况下,lookup cache 是未启用的,你可以将 lookup.cache 设置为 PARTIAL 参数来启用。

lookup cache 的主要目的是用于提高时态表关联 MongoDB 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。 当缓存命中最大缓存行 lookup.partial-cache.max-rows 或当行超过 lookup.partial-cache.expire-after-writelookup.partial-cache.expire-after-access 指定的最大存活时间时,缓存中的行将被设置为已过期。 缓存中的记录可能不是最新的,用户可以将缓存记录超时设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。

默认情况下,flink 会缓存主键的空查询结果,你可以通过将 lookup.partial-cache.caching-missing-key 设置为 false 来切换行为。

幂等写入 #

如果在 DDL 中定义了主键,MongoDB sink 将使用 upsert 语义而不是普通的 INSERT 语句。 我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 upsert 模式进行写入,来保证写入的幂等性。

如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。 强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。

过滤器下推 #

MongoDB 支持将 Flink SQL 的简单比较和逻辑过滤器下推以优化查询。 Flink SQL 过滤器到 MongoDB 查询操作符的映射如下表所示。

Flink SQL filters MongoDB Query Operators
= $eq
<> $ne
> $gt
>= $gte
< $lt
<= $lte
IS NULL $eq : null
IS NOT NULL $ne : null
OR $or
AND $and

数据类型映射 #

MongoDB BSON 类型到 Flink SQL 数据类型的映射如下表所示。

MongoDB BSON type Flink SQL type
ObjectId STRING
String STRING
Boolean BOOLEAN
Binary BINARY
VARBINARY
Int32 INTEGER
- TINYINT
SMALLINT
FLOAT
Int64 BIGINT
Double DOUBLE
Decimal128 DECIMAL
DateTime TIMESTAMP_LTZ(3)
Timestamp TIMESTAMP_LTZ(0)
Object ROW
Array ARRAY

对于 MongoDB 独有的类型,我们使用 Extended JSON 格式 将其映射为 Flink SQL STRING 类型。

MongoDB BSON type Flink SQL STRING
Symbol {"_value": {"$symbol": "12"}}
RegularExpression {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript {"_value": {"$code": "function() { return 10; }"}}
DbPointer {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}

Back to top