Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode
The JDBC connector allows for reading data from and writing data into any relational databases with a JDBC driver. This document describes how to setup the JDBC connector to run SQL queries against relational databases.
The JDBC sink operate in upsert mode for exchange UPDATE/DELETE messages with the external system if a primary key is defined on the DDL, otherwise, it operates in append mode and doesn’t support to consume UPDATE/DELETE messages.
In order to setup the JDBC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client JAR |
---|---|
flink-connector-jdbc_2.11 |
Download |
A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
Driver | Group Id | Artifact Id | JAR |
---|---|---|---|
MySQL | mysql |
mysql-connector-java |
Download |
PostgreSQL | org.postgresql |
postgresql |
Download |
Derby | org.apache.derby |
derby |
Download |
JDBC connector and drivers are not currently part of Flink’s binary distribution. See how to link with them for cluster execution here.
The JDBC table can be defined as following:
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector |
required | (none) | String | Specify what connector to use, here should be 'jdbc' . |
url |
required | (none) | String | The JDBC database url. |
table-name |
required | (none) | String | The name of JDBC table to connect. |
driver |
optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
username |
optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
password |
optional | (none) | String | The JDBC password. |
scan.partition.column |
optional | (none) | String | The column name used for partitioning the input. See the following Partitioned Scan section for more details. |
scan.partition.num |
optional | (none) | Integer | The number of partitions. |
scan.partition.lower-bound |
optional | (none) | Integer | The smallest value of the first partition. |
scan.partition.upper-bound |
optional | (none) | Integer | The largest value of the last partition. |
scan.fetch-size |
optional | 0 | Integer | The number of rows that should be fetched from the database when reading per round trip. If the value specified is zero, then the hint is ignored. |
lookup.cache.max-rows |
optional | (none) | Integer | The max number of rows of lookup cache, over this value, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details. |
lookup.cache.ttl |
optional | (none) | Integer | The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details. |
lookup.max-retries |
optional | 3 | Integer | The max retry times if lookup database failed. |
sink.buffer-flush.max-rows |
optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
sink.buffer-flush.interval |
optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
sink.max-retries |
optional | 3 | Integer | The max retry times if writing records to database failed. |
Flink uses the primary key that defined in DDL when writing data to external databases. The connector operate in upsert mode if the primary key was defined, otherwise, the connector operate in append mode.
In upsert mode, Flink will insert a new row or update the existing row according to the primary key, Flink can ensure the idempotence in this way. To guarantee the output result is as expected, it’s recommended to define primary key for the table and make sure the primary key is one of the unique key sets or primary key of the underlying database table. In append mode, Flink will interpret all records as INSERT messages, the INSERT operation may fail if a primary key or unique constraint violation happens in the underlying database.
See CREATE TABLE DDL for more details about PRIMARY KEY syntax.
To accelerate reading data in parallel Source
task instances, Flink provides partitioned scan feature for JDBC table.
All the following scan partition options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple tasks.
The scan.partition.column
must be a numeric, date, or timestamp column from the table in question. Notice that scan.partition.lower-bound
and scan.partition.upper-bound
are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
scan.partition.column
: The column name used for partitioning the input.scan.partition.num
: The number of partitions.scan.partition.lower-bound
: The smallest value of the first partition.scan.partition.upper-bound
: The largest value of the last partition.JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
By default, lookup cache is not enabled. You can enable it by setting both lookup.cache.max-rows
and lookup.cache.ttl
.
The lookup cache is used to improve performance of temporal join the JDBC connector. By default, lookup cache is not enabled, so all the requests are sent to external database.
When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned.
The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.cache.max-rows
or when the row exceeds the max time to live lookup.cache.ttl
.
The cached rows might not be the latest, users can tune lookup.cache.ttl
to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.
JDBC sink will use upsert semantics rather than plain INSERT statements if primary key is defined in DDL. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a unique constraint violation in the underlying database, which provides idempotence.
If there are failures, the Flink job will recover and re-process from last successful checkpoint, which can lead to re-processing messages during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.
Aside from failure recovery, the source topic may also naturally contain multiple records over time with the same primary key, making upserts desirable.
As there is no standard syntax for upsert, the following table describes the database-specific DML that is used.
Database | Upsert Grammar |
---|---|
MySQL | INSERT .. ON DUPLICATE KEY UPDATE .. |
PostgreSQL | INSERT .. ON CONFLICT .. DO UPDATE SET .. |
The JdbcCatalog
enables users to connect Flink to relational databases over JDBC protocol.
Currently, PostgresCatalog
is the only implementation of JDBC Catalog at the moment, PostgresCatalog
only supports limited Catalog
methods include:
Other Catalog
methods is unsupported now.
Please refer to Dependencies section for how to setup a JDBC connector and Postgres driver.
Postgres catalog supports the following options:
name
: required, name of the catalog.default-database
: required, default database to connect to.username
: required, username of Postgres account.password
: required, password of the account.base-url
: required, should be of format "jdbc:postgresql://<ip>:<port>"
, and should not contain database name here.PostgresSQL has an additional namespace as schema
besides database. A Postgres instance can have multiple databases, each database can have multiple schemas with a default one named “public”, each schema can have multiple tables.
In Flink, when querying tables registered by Postgres catalog, users can use either schema_name.table_name
or just table_name
. The schema_name
is optional and defaults to “public”.
Therefor the metaspace mapping between Flink Catalog and Postgres is as following:
Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
---|---|
catalog name (defined in Flink only) | N/A |
database name | database name |
table name | [schema_name.]table_name |
The full path of Postgres table in Flink should be "<catalog>.<db>.`<schema.table>`"
if schema is specified, note the <schema.table>
should be escaped.
Here are some examples to access Postgres tables:
Flink supports connect to several databases which uses dialect like MySQL, PostgresSQL, Derby. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.