JDBC SQL 连接器 #
Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode
JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。
如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。
依赖 #
In order to use the JDBC connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Download |
JDBC 连接器不是二进制发行版的一部分,请查阅这里了解如何在集群运行中引用 JDBC 连接器。
在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
Driver | Group Id | Artifact Id | JAR |
---|---|---|---|
MySQL | mysql |
mysql-connector-java |
下载 |
Oracle | com.oracle.database.jdbc |
ojdbc8 |
下载 |
PostgreSQL | org.postgresql |
postgresql |
下载 |
Derby | org.apache.derby |
derby |
下载 |
当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅这里了解在集群上执行时何连接它们。
如何创建 JDBC 表 #
JDBC table 可以按如下定义:
-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;
-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
连接器参数 #
参数 | 是否必填 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
connector |
必填 | (none) | String | 指定使用什么类型的连接器,这里应该是'jdbc' 。 |
url |
必填 | (none) | String | JDBC 数据库 url。 |
table-name |
必填 | (none) | String | 连接到 JDBC 表的名称。 |
driver |
可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
username |
可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
password |
可选 | (none) | String | JDBC 密码。 |
connection.max-retry-timeout |
可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
scan.partition.column |
可选 | (none) | String | 用于将输入进行分区的列名。请参阅下面的分区扫描部分了解更多详情。 |
scan.partition.num |
可选 | (none) | Integer | 分区数。 |
scan.partition.lower-bound |
可选 | (none) | Integer | 第一个分区的最小值。 |
scan.partition.upper-bound |
可选 | (none) | Integer | 最后一个分区的最大值。 |
scan.fetch-size |
可选 | 0 | Integer | 每次循环读取时应该从数据库中获取的行数。如果指定的值为 '0' ,则该配置项会被忽略。 |
scan.auto-commit |
可选 | true | Boolean | 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。 |
lookup.cache |
可选 | NONE | 枚举类型 可选值: NONE, PARTIAL |
维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)。 |
lookup.cache.max-rows |
可选 | (none) | Integer | 维表缓存的最大行数,若超过该值,则最老的行记录将会过期。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.partial-cache.expire-after-write |
可选 | (none) | Duration | 在记录写入缓存后该记录的最大保留时间。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.partial-cache.expire-after-access |
可选 | (none) | Duration | 在缓存中的记录被访问后该记录的最大保留时间。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.partial-cache.cache-missing-key |
可选 | true | Boolean | 是否缓存维表中不存在的键,默认为true。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 |
lookup.max-retries |
可选 | 3 | Integer | 查询数据库失败的最大重试时间。 |
sink.buffer-flush.max-rows |
可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
sink.buffer-flush.interval |
可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
sink.max-retries |
可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
sink.parallelism |
可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
已弃用的配置 #
这些弃用配置已经被上述的新配置代替,而且最终会被弃用。请优先考虑使用新配置。
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
lookup.cache.max-rows |
optional | yes | (none) | Integer | 请配置 "lookup.cache" = "PARTIAL" 并使用 "lookup.partial-cache.max-rows" 代替 |
lookup.cache.ttl |
optional | yes | (none) | Duration | 请配置 "lookup.cache" = "PARTIAL" 并使用 "lookup.partial-cache.expire-after-write" 代替 |
lookup.cache.caching-missing-key |
optional | yes | true | Boolean | 请配置 "lookup.cache" = "PARTIAL" 并使用 "lookup.partial-cache.cache-missing-key" 代替 |
Database | Upsert Grammar |
---|---|
MySQL | INSERT .. ON DUPLICATE KEY UPDATE .. |
Oracle | MERGE INTO .. USING (..) ON (..) WHEN MATCHED THEN UPDATE SET (..) WHEN NOT MATCHED THEN INSERT (..) VALUES (..) |
PostgreSQL | INSERT .. ON CONFLICT .. DO UPDATE SET .. |
JDBC Catalog #
JdbcCatalog
允许用户通过 JDBC 协议将 Flink 连接到关系数据库。
目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。
// Postgres Catalog & MySQL Catalog 支持的方法
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);
其他的 Catalog
方法现在尚不支持。
JDBC Catalog 的使用 #
本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。 请参阅 Dependencies 部分了解如何配置 JDBC 连接器和相应的驱动。
JDBC catalog 支持以下参数:
name
:必填,catalog 的名称。default-database
:必填,默认要连接的数据库。username
:必填,Postgres/MySQL 账户的用户名。password
:必填,账户的密码。base-url
:必填,(不应该包含数据库名)- 对于 Postgres Catalog
base-url
应为"jdbc:postgresql://<ip>:<port>"
的格式。 - 对于 MySQL Catalog
base-url
应为"jdbc:mysql://<ip>:<port>"
的格式。
- 对于 Postgres Catalog
CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);
USE CATALOG my_catalog;
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "my_catalog";
String defaultDatabase = "mydb";
String username = "...";
String password = "...";
String baseUrl = "..."
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("my_catalog", catalog);
// 设置 JdbcCatalog 为会话的当前 catalog
tableEnv.useCatalog("my_catalog");
val settings = EnvironmentSettings.inStreamingMode()
val tableEnv = TableEnvironment.create(settings)
val name = "my_catalog"
val defaultDatabase = "mydb"
val username = "..."
val password = "..."
val baseUrl = "..."
val catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl)
tableEnv.registerCatalog("my_catalog", catalog)
// 设置 JdbcCatalog 为会话的当前 catalog
tableEnv.useCatalog("my_catalog")
from pyflink.table.catalog import JdbcCatalog
environment_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings)
name = "my_catalog"
default_database = "mydb"
username = "..."
password = "..."
base_url = "..."
catalog = JdbcCatalog(name, default_database, username, password, base_url)
t_env.register_catalog("my_catalog", catalog)
# 设置 JdbcCatalog 为会话的当前 catalog
t_env.use_catalog("my_catalog")
execution:
...
current-catalog: my_catalog # 设置目标 JdbcCatalog 为会话的当前 catalog
current-database: mydb
catalogs:
- name: my_catalog
type: jdbc
default-database: mydb
username: ...
password: ...
base-url: ...
JDBC Catalog for PostgreSQL #
PostgreSQL 元空间映射 #
除了数据库之外,postgreSQL 还有一个额外的命名空间 schema
。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为 “public”,每个 schema 可以包含多张表。
在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用 schema_name.table_name
或只有 table_name
,其中 schema_name
是可选的,默认值为 “public”。
因此,Flink Catalog 和 Postgres 之间的元空间映射如下:
Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
---|---|
catalog name (defined in Flink only) | N/A |
database name | database name |
table name | [schema_name.]table_name |
Flink 中的 Postgres 表的完整路径应该是 "<catalog>.<db>.`<schema.table>`"
。如果指定了 schema,请注意需要转义 <schema.table>
。
这里提供了一些访问 Postgres 表的例子:
-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- 扫描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定义 schema 不能省略,并且必须与表一起转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
JDBC Catalog for MySQL #
MySQL 元空间映射 #
MySQL 实例中的数据库与 MySQL Catalog 注册的 catalog 下的数据库处于同一个映射层级。一个 MySQL 实例可以拥有多个数据库,每个数据库可以包含多张表。
在 Flink 中,当查询由 MySQL catalog 注册的表时,用户可以使用 database.table_name
或只使用 table_name
,其中 database
是可选的,默认值为创建 MySQL Catalog 时指定的默认数据库。
因此,Flink Catalog 和 MySQL catalog 之间的元空间映射如下:
Flink Catalog Metaspace Structure | MySQL Metaspace Structure |
---|---|
catalog name (defined in Flink only) | N/A |
database name | database name |
table name | table_name |
Flink 中的 MySQL 表的完整路径应该是 "`<catalog>`.`<db>`.`<table>`"
。
这里提供了一些访问 MySQL 表的例子:
-- 扫描 默认数据库中的 'test_table' 表
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- 扫描 'given_database' 数据库中的 'test_table2' 表,
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;
数据类型映射 #
Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。
MySQL type | Oracle type | PostgreSQL type | Flink SQL type |
---|---|---|---|
TINYINT |
TINYINT |
||
SMALLINT TINYINT UNSIGNED |
SMALLINT INT2 SMALLSERIAL SERIAL2 |
SMALLINT |
|
INT MEDIUMINT SMALLINT UNSIGNED |
INTEGER SERIAL |
INT |
|
BIGINT INT UNSIGNED |
BIGINT BIGSERIAL |
BIGINT |
|
BIGINT UNSIGNED |
DECIMAL(20, 0) |
||
BIGINT |
BIGINT |
BIGINT |
|
FLOAT |
BINARY_FLOAT |
REAL FLOAT4 |
FLOAT |
DOUBLE DOUBLE PRECISION |
BINARY_DOUBLE |
FLOAT8 DOUBLE PRECISION |
DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) |
SMALLINT FLOAT(s) DOUBLE PRECISION REAL NUMBER(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
DECIMAL(p, s) |
BOOLEAN TINYINT(1) |
BOOLEAN |
BOOLEAN |
|
DATE |
DATE |
DATE |
DATE |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) VARCHAR(n) TEXT |
CHAR(n) VARCHAR(n) CLOB |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT |
STRING |
BINARY VARBINARY BLOB |
RAW(s) BLOB |
BYTEA |
BYTES |
ARRAY |
ARRAY |