MongoDB

MongoDB 连接器 #

Flink 提供了 MongoDB 连接器使用至少一次(At-least-once)的语义在 MongoDB collection 中读取和写入数据。

要使用此连接器,请将以下依赖添加到你的项目中:

There is no connector (yet) available for Flink version 1.20.

MongoDB Source #

下面的例子介绍了如何构建一个 MongoDB Source:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.bson.BsonDocument;

MongoSource<String> source = MongoSource.<String>builder()
        .setUri("mongodb://user:password@127.0.0.1:27017")
        .setDatabase("my_db")
        .setCollection("my_coll")
        .setProjectedFields("_id", "f0", "f1")
        .setFetchSize(2048)
        .setLimit(10000)
        .setNoCursorTimeout(true)
        .setPartitionStrategy(PartitionStrategy.SAMPLE)
        .setPartitionSize(MemorySize.ofMebiBytes(64))
        .setSamplesPerPartition(10)
        .setDeserializationSchema(new MongoDeserializationSchema<String>() {
            @Override
            public String deserialize(BsonDocument document) {
                return document.toJson();
            }

            @Override
            public TypeInformation<String> getProducedType() {
                return BasicTypeInfo.STRING_TYPE_INFO;
            }
        })
        .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-Source")
        .setParallelism(2)
        .print()
        .setParallelism(1);

配置项 #

Flink 的 MongoDB Source 可以通过 MongoSource.<OutputType>builder() 构造器构建。

  1. setUri(String uri)
  2. setDatabase(String database)
    • 必须。
    • 设置读取的数据库名称。
  3. setCollection(String collection)
    • 必须。
    • 设置读取的集合名称。
  4. setFetchSize(int fetchSize)
    • 可选。默认值: 2048
    • 设置每次循环读取时应该从游标中获取的行数。
  5. setNoCursorTimeout(boolean noCursorTimeout)
    • 可选。默认值: true
    • MongoDB 服务端通常会将空闲时间超过 10 分钟的 cursor 关闭,来节省内存开销。将这个参数设置为 true 可以 防止 cursor 因为读取时间过长或者背压导致的空闲而关闭。需要注意的是。如果 cursor 所在的会话空闲时间超过 了 30 分钟,MongoDB 会关闭当前会话以及由当前会话打开的 cursors,不管有没有设置 noCursorTimeout() 或者 maxTimeMS() 大于 30分钟。
  6. _setPartitionStrategy(PartitionStrategy partitionStrategy) _
    • 可选。 默认值: PartitionStrategy.DEFAULT
    • 设置分区策略。 可选的分区策略有 SINGLESAMPLESPLIT_VECTORSHARDEDDEFAULT。 查看 分区策略 章节获取详细介绍。
  7. setPartitionSize(MemorySize partitionSize)
    • 可选。默认值:64mb
    • 设置每个分区的内存大小。通过指定的分区大小,将 MongoDB 的一个集合切分成多个分区。 可以设置并行度,并行地读取这些分区,以提升整体的读取速度。
  8. setSamplesPerPartition(int samplesPerPartition)
    • 可选。默认值:10
    • 仅用于 SAMPLE 抽样分区策略,设置每个分区的样本数量。抽样分区器根据分区键对集合进行随机采样的方式计算分区边界。 总的样本数量 = 每个分区的样本数量 * (文档总数 / 每个分区的文档数量)
  9. setLimit(int limit)
    • 可选。默认值:-1
    • 限制每个 reader 最多读取文档的数量。如果我们设置了读取并行度大于 1,那么最多读取的文档数量等于 并行度 * 限制数量
  10. setProjectedFields(String… projectedFields)
    • 可选。
    • 设置读取文档的部分字段。如果没有设置,会读取文档的全部字段。
  11. setDeserializationSchema(MongoDeserializationSchema deserializationSchema)
    • 必须。
    • 设置 MongoDeserializationSchema 用于解析 MongoDB BSON 类型的文档。

分区策略 #

使用分区可以利用并行读取来加速整体的读取效率。目前提供了以下几种分区策略:

  • SINGLE:将整个集合作为一个分区。
  • SAMPLE:通过随机采样的方式来生成分区,快速但可能不均匀。
  • SPLIT_VECTOR:通过 MongoDB 计算分片的 splitVector 命令来生成分区,快速且均匀。 仅适用于未分片集合,需要 splitVector 权限。
  • SHARDED:从 config.chunks 集合中直接读取分片集合的分片边界作为分区,不需要额外计算,快速且均匀。 仅适用于已经分片的集合,需要 config 数据库的读取权限。
  • DEFAULT:对分片集合使用 SHARDED 策略,对未分片集合使用 SPLIT_VECTOR 策略。

MongoDB Sink #

下面的例子介绍了如何构建一个 MongoDB Sink:

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStream;

import com.mongodb.client.model.InsertOneModel;
import org.bson.BsonDocument;

DataStream<String> stream = ...;

MongoSink<String> sink = MongoSink.<String>builder()
        .setUri("mongodb://user:password@127.0.0.1:27017")
        .setDatabase("my_db")
        .setCollection("my_coll")
        .setBatchSize(1000)
        .setBatchIntervalMs(1000)
        .setMaxRetries(3)
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .setSerializationSchema(
                (input, context) -> new InsertOneModel<>(BsonDocument.parse(input)))
        .build();

stream.sinkTo(sink);

配置项 #

Flink 的 MongoDB Source 可以通过 MongoSink.<InputType>builder() 构造器构建。

  1. setUri(String uri)
  2. setDatabase(String database)
    • 必须。
    • 设置写入的数据库名称。
  3. setCollection(String collection)
    • 必须。
    • 设置写入的集合名称。
  4. setBatchSize(int batchSize)
    • 可选。默认值:1000
    • 设置写入的最大批次大小。可以设置为 -1 来禁用批式写入。
  5. setBatchIntervalMs(long batchIntervalMs)
    • 可选。默认值:1000.
    • 设置写入的最大间隔时间,单位为毫秒。可以设置为 -1 来禁用批式写入。
  6. setMaxRetries(int maxRetries)
    • 可选。默认值:3
    • 设置写入失败时最大重试次数。
  7. setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)
    • 可选。默认值:DeliveryGuarantee.AT_LEAST_ONCE.
    • 设置投递保证。 仅一次(EXACTLY_ONCE)的投递保证暂不支持。
  8. setSerializationSchema(MongoSerializationSchema serializationSchema)
    • 必须。
    • 设置 MongoSerializationSchema 将输入类型转换为 MongoDB WriteModel

容错 #

当开启了 Flink checkpoint,Flink MongoDB Sink 保证至少一次 (at-least-once) 的写入语义. MongoWriter 在检查点时,会确认所有被缓存的写入操作被正确写入.

关于检查点和容错相关的更多信息,请参阅 容错文档.

下面的示例介绍了如何开启检查点:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
env = StreamExecutionEnvironment.get_execution_environment()
# checkpoint every 5000 msecs
env.enable_checkpointing(5000)

重要提醒: 尽管默认的写入语义为至少一次 (AT_LEAST_ONCE),但检查点并不是默认开启的. 这会导致 Sink 缓冲写入请求,直到完成或 `MongoWriter` 自动刷新. 默认情况下,`MongoWriter` 会缓冲 1000 个新增的写入请求。需要使写入更加频繁,请参考 配置 MongoWriter

当开启 AT_LEAST_ONCE 投递时,通过指定明确的主键和 upsert 方式写入,可以实现仅一次 (exactly-once) 的写入语义。

配置 Mongo Writer #

内部的 MongoWriter 可以使用 MongoSinkBuilder 更精细化地配置来实现不一样的写入行为:

  • setBatchSize(int batchSize):设置写入的批次大小。 可以设置为 -1 来禁用批式写入。
  • setBatchIntervalMs(long batchIntervalMs):设置写入的最大间隔时间,单位为毫秒。 可以设置为 -1 来禁用批式写入。

当使用如下设置时,会产生不一样的写入行为:

  • 当缓存记录数超过最大批次大小,或者写入时间间隔超过限制时写入。
    • batchSize > 1 and batchInterval > 0
  • 仅在检查点写入。
    • batchSize == -1 and batchInterval == -1
  • 对每条记录进行写入。
    • batchSize == 1 or batchInterval == 0