HBase SQL 连接器 #
Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Upsert Mode
HBase 连接器支持读取和写入 HBase 集群。本文档介绍如何使用 HBase 连接器基于 HBase 进行 SQL 查询。
HBase 连接器在 upsert 模式下运行,可以使用 DDL 中定义的主键与外部系统交换更新操作消息。但是主键只能基于 HBase 的 rowkey 字段定义。如果没有声明主键,HBase 连接器默认取 rowkey 作为主键。
依赖 #
There is no connector (yet) available for Flink version 1.20.
HBase 连接器不是二进制发行版的一部分,请查阅这里了解如何在集群运行中引用 HBase 连接器。
如何使用 HBase 表 #
所有 HBase 表的列簇必须定义为 ROW 类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。
-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATE TABLE hTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);
-- 用 ROW(...) 构造函数构造列簇,并往 HBase 表写数据。
-- 假设 "T" 的表结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
-- 从 HBase 表扫描数据
SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
-- temporal join HBase 表,将 HBase 表作为维表
SELECT * FROM myTopic
LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = hTable.rowkey;
可用的元数据 #
以下的连接器元数据可以在表定义中通过元数据列的形式获取。
R/W
列定义了一个元数据是可读的(R
)还是可写的(W
)。
只读列必须声明为 VIRTUAL
以在 INSERT INTO
操作中排除它们。
Key | Data Type | Description | R/W |
---|---|---|---|
timestamp |
TIMESTAMP_LTZ(3) NOT NULL |
HBase记录的时间戳。 | W |
ttl |
BIGINT NOT NULL |
HBase记录的生存时间(毫秒)。 | W |
连接器参数 #
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector |
必选 | (none) | String | 指定使用的连接器, 支持的值如下 :
|
table-name |
必选 | (none) | String | 连接的 HBase 表名。默认该表在 "default" 命名空间下,指定命名空间下的表需要使用 "namespace:table"。 |
zookeeper.quorum |
必选 | (none) | String | HBase Zookeeper quorum 信息。 |
zookeeper.znode.parent |
可选 | /hbase | String | HBase 集群的 Zookeeper 根目录。 |
null-string-literal |
可选 | null | String | 当字符串值为 null 时的存储形式,默认存成 "null" 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。 |
sink.buffer-flush.max-size |
可选 | 2mb | MemorySize | 写入的参数选项。每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.buffer-flush.max-rows |
可选 | 1000 | Integer | 写入的参数选项。 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.buffer-flush.interval |
可选 | 1s | Duration | 写入的参数选项。刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",刷写选项整个异步处理缓存行为。 |
sink.ignore-null-value |
可选 | false | Boolean | 写入的参数选项。控制写入时是否需要忽略空值。 |
sink.parallelism |
可选 | (none) | Integer | 为 HBase sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游 operator 一样。 |
lookup.async |
可选 | false | Boolean | 是否启用异步查找。如果为真,查找将是异步的。注意:异步方式只支持 hbase-2.2 连接器 |
lookup.cache |
可选 | NONE | 枚举类型 可选值: NONE, PARTIAL |
维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)。 |
lookup.partial-cache.max-rows |
可选 | (none) | Long | 查找缓存的最大行数,超过这个值,最旧的行将过期。使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 |
lookup.partial-cache.expire-after-write |
可选 | (none) | Duration | 在记录写入缓存后该记录的最大保留时间。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 |
lookup.partial-cache.expire-after-access |
可选 | (none) | Duration | 在缓存中的记录被访问后该记录的最大保留时间。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 |
lookup.partial-cache.caching-missing-key |
可选 | true | Boolean | 是否缓存维表中不存在的键,默认为true。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 |
lookup.max-retries |
可选 | 3 | Integer | 查找数据库失败时的最大重试次数。 |
properties.* |
可选 | (无) | String |
可以设置任意 HBase 的配置项。后缀名必须匹配在 HBase 配置文档 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。
例如您可以设置 'properties.hbase.security.authentication' = 'kerberos' 等kerberos认证参数。
|
已弃用的配置 #
这些弃用配置已经被上述的新配置代替,而且最终会被弃用。请优先考虑使用新配置。
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
lookup.cache.max-rows |
optional | yes | (none) | Integer | 请配置 "lookup.cache" = "PARTIAL" 并使用 "lookup.partial-cache.max-rows" 代替 |
lookup.cache.ttl |
optional | yes | (none) | Duration | 请配置 "lookup.cache" = "PARTIAL" 并使用 "lookup.partial-cache.expire-after-write" 代替 |
Flink 数据类型 | HBase 转换 |
---|---|
CHAR / VARCHAR / STRING |
|
BOOLEAN |
|
BINARY / VARBINARY |
返回 byte[] 。 |
DECIMAL |
|
TINYINT |
|
SMALLINT |
|
INT |
|
BIGINT |
|
FLOAT |
|
DOUBLE |
|
DATE |
从 1970-01-01 00:00:00 UTC 开始的天数,int 值。 |
TIME |
从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。 |
TIMESTAMP |
从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。 |
ARRAY |
不支持 |
MAP / MULTISET |
不支持 |
ROW |
不支持 |