MongoDB

MongoDB SQL Connector #

Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode

The MongoDB connector allows for reading data from and writing data into MongoDB. This document describes how to set up the MongoDB connector to run SQL queries against MongoDB.

The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.

If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.

Dependencies #

In order to use the MongoDB connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-mongodb</artifactId>
      <version>1.1.0-1.17</version>
    </dependency>
Download

The MongoDB connector is not part of the binary distribution. See how to link with it for cluster execution here.

How to create a MongoDB table #

The MongoDB table can be defined as following:

-- register a MongoDB table 'users' in Flink SQL
CREATE TABLE MyUserTable (
  _id STRING,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (_id) NOT ENFORCED
) WITH (
   'connector' = 'mongodb',
   'uri' = 'mongodb://user:password@127.0.0.1:27017',
   'database' = 'my_db',
   'collection' = 'users'
);

-- write data into the MongoDB table from the other table "T"
INSERT INTO MyUserTable
SELECT _id, name, age, status FROM T;

-- scan data from the MongoDB table
SELECT id, name, age, status FROM MyUserTable;

-- temporal join the MongoDB table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable._id;

Connector Options #

Option Required Forwarded Default Type Description
connector
required no (none) String Specify what connector to use, here should be 'mongodb'.
uri
required yes (none) String The MongoDB connection uri.
database
required yes (none) String The name of MongoDB database to read or write.
collection
required yes (none) String The name of MongoDB collection to read or write.
scan.fetch-size
optional yes 2048 Integer Gives the reader a hint as to the number of documents that should be fetched from the database per round-trip when reading.
scan.cursor.no-timeout
optional yes true Boolean MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that. However, if the application takes longer than 30 minutes to process the current batch of documents, the session is marked as expired and closed.
scan.partition.strategy
optional no default String Specifies the partition strategy. Available strategies are `single`, `sample`, `split-vector`, `sharded` and `default`. See the following Partitioned Scan section for more details.
scan.partition.size
optional no 64mb MemorySize Specifies the partition memory size.
scan.partition.samples
optional no 10 Integer Specifies the samples count per partition. It only takes effect when the partition strategy is sample. The sample partitioner samples the collection, projects and sorts by the partition fields. Then uses every `scan.partition.samples` as the value to use to calculate the partition boundaries. The total number of samples taken is calculated as: `samples per partition * (count of documents / number of documents per partition)`.
lookup.cache
optional no NONE

Enum

Possible values: NONE, PARTIAL
The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database).
lookup.partial-cache.max-rows
optional no (none) Long The max number of rows of lookup cache, over this value, the oldest rows will be expired. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.expire-after-write
optional no (none) Duration The max time to live for each rows in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.expire-after-access
optional no (none) Duration The max time to live for each rows in lookup cache after accessing the entry in the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.caching-missing-key
optional no true Boolean Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. "lookup.cache" must be set to "PARTIAL" to use this option.
lookup.max-retries
optional no 3 Integer The max retry times if lookup database failed.
lookup.retry.interval
optional no 1s Duration Specifies the retry time interval if lookup records from database failed.
sink.buffer-flush.max-rows
optional yes 1000 Integer Specifies the maximum number of buffered rows per batch request.
sink.buffer-flush.interval
optional yes 1s Duration Specifies the batch flush interval.
sink.max-retries
optional yes 3 Integer The max retry times if writing records to database failed.
sink.retry.interval
optional yes 1s Duration Specifies the retry time interval if writing records to database failed.
sink.parallelism
optional no (none) Integer Defines the parallelism of the MongoDB sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
sink.delivery-guarantee
optional no at-lease-once

Enum

Possible values: none, at-least-once
Optional delivery guarantee when committing. The exactly-once guarantee is not supported yet.

Features #

Key handling #

The MongoDB sink can work in either upsert mode or append mode, depending on whether a primary key is defined. If a primary key is defined, the MongoDB sink works in upsert mode which can consume queries containing UPDATE/DELETE messages. If a primary key is not defined, the MongoDB sink works in append mode which can only consume queries containing INSERT only messages.

In MongoDB the primary key is used to calculate the MongoDB document _id. Its value must be unique and immutable in the collection, and may be of any BSON Type other than an Array. If the _id contains subfields, the subfield names cannot begin with a ($) symbol.

There are also some constraints on the primary key index. Before MongoDB 4.2, the total size of an index entry, which can include structural overhead depending on the BSON type, must be less than 1024 bytes. Starting in version 4.2, MongoDB removes the Index Key Limit. For more detailed introduction, you can refer to Index Key Limit.

The MongoDB connector generates a document _id for every row by compositing all primary key fields in the order defined in the DDL.

  • When there’s only a single field in the specified primary key, we convert the field data to bson value as _id of the corresponding document.
  • When there’s multiple fields in the specified primary key, we convert and composite these fields into a bson document as the _id of the corresponding document. For example, if have a primary key statement PRIMARY KEY (f1, f2) NOT ENFORCED, the extracted _id will be the form like _id: {f1: v1, f2: v2}.

Notice that it will be ambiguous if the _id field exists in DDL, but the primary key is not declared as _id. Either use the _id column as the key, or rename the _id column.

See CREATE TABLE DDL for more details about PRIMARY KEY syntax.

Partitioned Scan #

To accelerate reading data in parallel Source task instances, Flink provides partitioned scan feature for MongoDB collection. The following partition strategies are provided:

  • single: treats the entire collection as a single partition.
  • sample: samples the collection and generate partitions which is fast but possibly uneven.
  • split-vector: uses the splitVector command to generate partitions for non-sharded collections which is fast and even. The splitVector permission is required.
  • sharded: reads config.chunks (MongoDB splits a sharded collection into chunks, and the range of the chunks are stored within the collection) as the partitions directly. The sharded strategy only used for sharded collection which is fast and even. Read permission of config database is required.
  • default: uses sharded strategy for sharded collections otherwise using split vector strategy.

Lookup Cache #

MongoDB connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.

By default, lookup cache is not enabled. You can enable it by setting lookup.cache to PARTIAL.

The lookup cache is used to improve performance of temporal join the MongoDB connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.partial-cache.max-rows or when the row exceeds the max time to live specified by lookup.partial-cache.expire-after-write or lookup.partial-cache.expire-after-access. The cached rows might not be the latest, users can tune expiration options to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.

By default, flink will cache the empty query result for a Primary key, you can toggle the behaviour by setting lookup.partial-cache.caching-missing-key to false.

Idempotent Writes #

MongoDB sink will use upsert semantics rather than plain INSERT statements if primary key is defined in DDL. We composite the primary key fields as the document _id which is the reserved primary key of MongoDB. Use upsert mode to write rows into MongoDB, which provides idempotence.

If there are failures, the Flink job will recover and re-process from last successful checkpoint, which can lead to re-processing messages during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.

Filters Pushdown #

MongoDB supports pushing down simple comparisons and logical filters to optimize queries. The mappings from Flink SQL filters to MongoDB query operators are listed in the following table.

Flink SQL filters MongoDB Query Operators
= $eq
<> $ne
> $gt
>= $gte
< $lt
<= $lte
IS NULL $eq : null
IS NOT NULL $ne : null
OR $or
AND $and

Data Type Mapping #

The field data type mappings from MongoDB BSON types to Flink SQL data types are listed in the following table.

MongoDB BSON type Flink SQL type
ObjectId STRING
String STRING
Boolean BOOLEAN
Binary BINARY
VARBINARY
Int32 INTEGER
- TINYINT
SMALLINT
FLOAT
Int64 BIGINT
Double DOUBLE
Decimal128 DECIMAL
DateTime TIMESTAMP_LTZ(3)
Timestamp TIMESTAMP_LTZ(0)
Object ROW
Array ARRAY

For specific types in MongoDB, we use Extended JSON format to map them to Flink SQL STRING type.

MongoDB BSON type Flink SQL STRING
Symbol {"_value": {"$symbol": "12"}}
RegularExpression {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript {"_value": {"$code": "function() { return 10; }"}}
DbPointer {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}

Back to top