JDBC SQL Connector #
Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode
The JDBC connector allows for reading data from and writing data into any relational databases with a JDBC driver. This document describes how to setup the JDBC connector to run SQL queries against relational databases.
The JDBC sink operate in upsert mode for exchange UPDATE/DELETE messages with the external system if a primary key is defined on the DDL, otherwise, it operates in append mode and doesn’t support to consume UPDATE/DELETE messages.
Dependencies #
Maven dependency | SQL Client |
---|---|
|
Download |
The JDBC connector is not part of the binary distribution. See how to link with it for cluster execution here.
A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
Driver | Group Id | Artifact Id | JAR |
---|---|---|---|
MySQL | mysql |
mysql-connector-java |
Download |
Oracle | com.oracle.database.jdbc |
ojdbc8 |
Download |
PostgreSQL | org.postgresql |
postgresql |
Download |
Derby | org.apache.derby |
derby |
Download |
SQL Server | com.microsoft.sqlserver |
mssql-jdbc |
Download |
CrateDB | io.crate |
crate-jdbc |
Download |
Db2 | com.ibm.db2.jcc |
db2jcc |
Download |
Trino | io.trino |
trino-jdbc |
Download |
OceanBase | com.oceanbase |
oceanbase-client |
Download |
JDBC connector and drivers are not part of Flink’s binary distribution. See how to link with them for cluster execution here.
How to create a JDBC table #
The JDBC table can be defined as following:
-- register a MySQL table 'users' in Flink SQL
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- write data into the JDBC table from the other table "T"
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- scan data from the JDBC table
SELECT id, name, age, status FROM MyUserTable;
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
Connector Options #
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
connector |
required | no | (none) | String | Specify what connector to use, here should be 'jdbc' . |
url |
required | yes | (none) | String | The JDBC database url. |
table-name |
required | yes | (none) | String | The name of JDBC table to connect. |
driver |
optional | yes | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
compatible-mode |
optional | yes | (none) | String | The compatible mode of database. |
username |
optional | yes | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
password |
optional | yes | (none) | String | The JDBC password. |
connection.max-retry-timeout |
optional | yes | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
scan.partition.column |
optional | no | (none) | String | The column name used for partitioning the input. See the following Partitioned Scan section for more details. |
scan.partition.num |
optional | no | (none) | Integer | The number of partitions. |
scan.partition.lower-bound |
optional | no | (none) | Integer | The smallest value of the first partition. |
scan.partition.upper-bound |
optional | no | (none) | Integer | The largest value of the last partition. |
scan.fetch-size |
optional | yes | 0 | Integer | The number of rows that should be fetched from the database when reading per round trip. If the value specified is zero, then the hint is ignored. |
scan.auto-commit |
optional | yes | true | Boolean | Sets the auto-commit flag on the JDBC driver, which determines whether each statement is committed in a transaction automatically. Some JDBC drivers, specifically Postgres, may require this to be set to false in order to stream results. |
lookup.cache |
optional | yes | NONE | Enum Possible values: NONE, PARTIAL |
The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database). |
lookup.partial-cache.max-rows |
optional | yes | (none) | Long | The max number of rows of lookup cache, over this value, the oldest rows will be expired. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
lookup.partial-cache.expire-after-write |
optional | yes | (none) | Duration | The max time to live for each rows in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
lookup.partial-cache.expire-after-access |
optional | yes | (none) | Duration | The max time to live for each rows in lookup cache after accessing the entry in the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
lookup.partial-cache.cache-missing-key |
optional | yes | true | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. "lookup.cache" must be set to "PARTIAL" to use this option. |
lookup.max-retries |
optional | yes | 3 | Integer | The max retry times if lookup database failed. |
sink.buffer-flush.max-rows |
optional | yes | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
sink.buffer-flush.interval |
optional | yes | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
sink.max-retries |
optional | yes | 3 | Integer | The max retry times if writing records to database failed. |
sink.parallelism |
optional | no | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
Deprecated Options #
These deprecated options has been replaced by new options listed above and will be removed eventually. Please consider using new options first.
Option | Required | Forwarded | Default | Type | Description | ||||
---|---|---|---|---|---|---|---|---|---|
lookup.cache.max-rows |
optional | yes | (none) | Integer | Please set "lookup.cache" = "PARTIAL" and use "lookup.partial-cache.max-rows" instead. | ||||
lookup.cache.ttl |
optional | yes | (none) | Duration | Please set "lookup.cache" = "PARTIAL" and use "lookup.partial-cache.expire-after-write" instead. | ||||
lookup.cache.caching-missing-key |
optional | yes | true | Boolean | Please set "lookup.cache" = "PARTIAL" and use "lookup.partial-cache.cache-missing-key" instead. |
Database | Upsert Grammar |
---|---|
MySQL | INSERT .. ON DUPLICATE KEY UPDATE .. |
Oracle | MERGE INTO .. USING (..) ON (..) WHEN MATCHED THEN UPDATE SET (..) WHEN NOT MATCHED THEN INSERT (..) VALUES (..) |
PostgreSQL | INSERT .. ON CONFLICT .. DO UPDATE SET .. |
MS SQL Server | MERGE INTO .. USING (..) ON (..) WHEN MATCHED THEN UPDATE SET (..) WHEN NOT MATCHED THEN INSERT (..) VALUES (..) |
Db2 | MERGE INTO .. AS TARGET USING TABLE (VALUES (..)) AS SOURCE (..) ON (..) WHEN MATCHED THEN UPDATE SET .. WHEN NOT MATCHED THEN INSERT (..) VALUES (..) |
JDBC Catalog #
The JdbcCatalog
enables users to connect Flink to relational databases over JDBC protocol.
Currently, there are two JDBC catalog implementations, Postgres Catalog and MySQL Catalog. They support the following catalog methods. Other methods are currently not supported.
// The supported methods by Postgres & MySQL Catalog.
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);
Other Catalog
methods are currently not supported.
Usage of JDBC Catalog #
The section mainly describes how to create and use a Postgres Catalog or MySQL Catalog. Please refer to Dependencies section for how to setup a JDBC connector and the corresponding driver.
The JDBC catalog supports the following options:
name
: required, name of the catalog.default-database
: required, default database to connect to.username
: required, username of Postgres/MySQL account.password
: required, password of the account.base-url
: required (should not contain the database name)- for Postgres Catalog this should be
"jdbc:postgresql://<ip>:<port>"
- for MySQL Catalog this should be
"jdbc:mysql://<ip>:<port>"
- for Postgres Catalog this should be
CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);
USE CATALOG my_catalog;
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "my_catalog";
String defaultDatabase = "mydb";
String username = "...";
String password = "...";
String baseUrl = "..."
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("my_catalog", catalog);
// set the JdbcCatalog as the current catalog of the session
tableEnv.useCatalog("my_catalog");
val settings = EnvironmentSettings.inStreamingMode()
val tableEnv = TableEnvironment.create(settings)
val name = "my_catalog"
val defaultDatabase = "mydb"
val username = "..."
val password = "..."
val baseUrl = "..."
val catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl)
tableEnv.registerCatalog("my_catalog", catalog)
// set the JdbcCatalog as the current catalog of the session
tableEnv.useCatalog("my_catalog")
from pyflink.table.catalog import JdbcCatalog
environment_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings)
name = "my_catalog"
default_database = "mydb"
username = "..."
password = "..."
base_url = "..."
catalog = JdbcCatalog(name, default_database, username, password, base_url)
t_env.register_catalog("my_catalog", catalog)
# set the JdbcCatalog as the current catalog of the session
t_env.use_catalog("my_catalog")
execution:
...
current-catalog: my_catalog # set the target JdbcCatalog as the current catalog of the session
current-database: mydb
catalogs:
- name: my_catalog
type: jdbc
default-database: mydb
username: ...
password: ...
base-url: ...
JDBC Catalog for PostgreSQL #
PostgreSQL Metaspace Mapping #
PostgreSQL has an additional namespace as schema
besides database. A Postgres instance can have multiple databases, each database can have multiple schemas with a default one named “public”, each schema can have multiple tables.
In Flink, when querying tables registered by Postgres catalog, users can use either schema_name.table_name
or just table_name
. The schema_name
is optional and defaults to “public”.
Therefore, the metaspace mapping between Flink Catalog and Postgres is as following:
Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
---|---|
catalog name (defined in Flink only) | N/A |
database name | database name |
table name | [schema_name.]table_name |
The full path of Postgres table in Flink should be "<catalog>.<db>.`<schema.table>`"
if schema is specified, note the <schema.table>
should be escaped.
Here are some examples to access Postgres tables:
-- scan table 'test_table' of 'public' schema (i.e. the default schema), the schema name can be omitted
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- scan table 'test_table2' of 'custom_schema' schema,
-- the custom schema can not be omitted and must be escaped with table.
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
JDBC Catalog for MySQL #
MySQL Metaspace Mapping #
The databases in a MySQL instance are at the same mapping level as the databases under the catalog registered with MySQL Catalog. A MySQL instance can have multiple databases, each database can have multiple tables.
In Flink, when querying tables registered by MySQL catalog, users can use either database.table_name
or just table_name
. The default value is the default database specified when MySQL Catalog was created.
Therefore, the metaspace mapping between Flink Catalog and MySQL Catalog is as following:
Flink Catalog Metaspace Structure | MySQL Metaspace Structure |
---|---|
catalog name (defined in Flink only) | N/A |
database name | database name |
table name | table_name |
The full path of MySQL table in Flink should be "`<catalog>`.`<db>`.`<table>`"
.
Here are some examples to access MySQL tables:
-- scan table 'test_table', the default database is 'mydb'.
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- scan table 'test_table' with the given database.
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;
JDBC Catalog for CrateDB #
CrateDB Metaspace Mapping #
CrateDB is similar to PostgreSQL, but it has only on database which defaults to crate
. It has an additional namespace as schema
, a CrateDB instance can have multiple schemas with a default one named “doc”, each schema can have multiple tables.
In Flink, when querying tables registered by CrateDB catalog, users can use either schema_name.table_name
or just table_name
. The schema_name
is optional and defaults to doc
.
Therefore, the metaspace mapping between Flink Catalog and CrateDB is as following:
Flink Catalog Metaspace Structure | CrateDB Metaspace Structure |
---|---|
catalog name (defined in Flink only) | N/A |
database name | database name (always crate ) |
table name | [schema_name.]table_name |
The full path of CrateDB table in Flink should be "<catalog>.<db>.`<schema.table>`"
if schema is specified, note the <schema.table>
should be escaped.
Here are some examples to access CrateDB tables:
-- scan table 'test_table' of 'doc' schema (i.e. the default schema), the schema name can be omitted
SELECT * FROM mycatalog.crate.doc.test_table;
SELECT * FROM crate.doc.test_table;
SELECT * FROM doc.test_table;
SELECT * FROM test_table;
-- scan table 'test_table2' of 'custom_schema' schema,
-- the custom schema can not be omitted and must be escaped with table.
SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
SELECT * FROM crate.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
Data Type Mapping #
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
MySQL type | Oracle type | PostgreSQL type | CrateDB type | SQL Server type | Db2 | Trino type | OceanBase MySQL mode type | OceanBase Oracle mode type | Flink SQL type |
---|---|---|---|---|---|---|---|---|---|
TINYINT |
TINYINT |
TINYINT |
TINYINT |
TINYINT |
|||||
SMALLINT TINYINT UNSIGNED |
SMALLINT INT2 SMALLSERIAL SERIAL2 |
SMALLINT
SHORT |
SMALLINT |
SMALLINT |
SMALLINT |
SMALLINT TINYINT UNSIGNED |
SMALLINT |
||
INT MEDIUMINT SMALLINT UNSIGNED |
INTEGER SERIAL |
INTEGER INT |
INT |
INTEGER |
INTEGER |
INT MEDIUMINT SMALLINT UNSIGNED |
INT |
||
BIGINT INT UNSIGNED |
BIGINT BIGSERIAL |
BIGINT LONG |
BIGINT |
BIGINT |
BIGINT INT UNSIGNED |
BIGINT |
|||
BIGINT UNSIGNED |
BIGINT UNSIGNED |
DECIMAL(20, 0) |
|||||||
FLOAT |
BINARY_FLOAT |
REAL FLOAT4 |
REAL FLOAT |
REAL |
REAL |
FLOAT |
FLOAT |
BINARY_FLOAT |
FLOAT |
DOUBLE DOUBLE PRECISION |
BINARY_DOUBLE |
FLOAT8 DOUBLE PRECISION |
DOUBLE DOUBLE PRECISION |
FLOAT |
DOUBLE |
DOUBLE |
DOUBLE |
BINARY_DOUBLE |
DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) |
SMALLINT FLOAT(s) DOUBLE PRECISION REAL NUMBER(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
NUMERIC(p, s) |
DECIMAL(p, s) |
NUMERIC(p, s)
DECIMAL(p, s)
|
DECIMAL(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
FLOAT(s) NUMBER(p, s) |
DECIMAL(p, s) |
BOOLEAN TINYINT(1) |
BOOLEAN |
BOOLEAN |
BIT |
BOOLEAN |
BOOLEAN TINYINT(1) |
BOOLEAN |
|||
DATE |
DATE |
DATE |
DATE (only in expressions - not stored type) |
DATE |
DATE |
DATE |
DATE |
DATE |
DATE |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME (only in expressions - not stored type) |
TIME(0) |
TIME |
TIME_WITHOUT_TIME_ZONE |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
DATETIME
DATETIME2
|
TIMESTAMP [(p)] |
TIMESTAMP_WITHOUT_TIME_ZONE |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) VARCHAR(n) TEXT |
CHAR(n) VARCHAR(n) CLOB |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT
STRING |
CHAR(n) NCHAR(n) VARCHAR(n) NVARCHAR(n) TEXT NTEXT |
VARCHAR(n) CHAR [(p)]
|
CHAR
VARCHAR
|
CHAR(n) VARCHAR(n) TEXT |
CHAR(n) NCHAR(n) VARCHAR2(n) CLOB |
STRING |
BINARY VARBINARY BLOB |
RAW(s) BLOB |
BYTEA |
BINARY(n) VARBINARY(n) |
VARBINARY |
BINARY VARBINARY BLOB |
RAW(s) BLOB |
BYTES |
||
ARRAY |
ARRAY |
ARRAY |
ARRAY |