HBase SQL Connector #
Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Upsert Mode
The HBase connector allows for reading from and writing to an HBase cluster. This document describes how to setup the HBase Connector to run SQL queries against HBase.
HBase always works in upsert mode for exchange changelog messages with the external system using a primary key defined on the DDL. The primary key must be defined on the HBase rowkey field (rowkey field must be declared). If the PRIMARY KEY clause is not declared, the HBase connector will take rowkey as the primary key by default.
Dependencies #
There is no connector (yet) available for Flink version 2.0-preview1.
The HBase connector is not part of the binary distribution. See how to link with it for cluster execution here.
How to use HBase table #
All the column families in HBase table must be declared as ROW type, the field name maps to the column family name, and the nested field names map to the column qualifier names. There is no need to declare all the families and qualifiers in the schema, users can declare what’s used in the query. Except the ROW type fields, the single atomic type field (e.g. STRING, BIGINT) will be recognized as HBase rowkey. The rowkey field can be arbitrary name, but should be quoted using backticks if it is a reserved keyword.
-- register the HBase table 'mytable' in Flink SQL
CREATE TABLE hTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);
-- use ROW(...) construction function construct column families and write data into the HBase table.
-- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
-- scan data from the HBase table
SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
-- temporal join the HBase table as a dimension table
SELECT * FROM myTopic
LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = hTable.rowkey;
Available Metadata #
The following connector metadata can be accessed as metadata columns in a table definition.
The R/W
column defines whether a metadata field is readable (R
) and/or writable (W
).
Read-only columns must be declared VIRTUAL
to exclude them during an INSERT INTO
operation.
Key | Data Type | Description | R/W |
---|---|---|---|
timestamp |
TIMESTAMP_LTZ(3) NOT NULL |
Timestamp for the HBase mutation. | W |
ttl |
BIGINT NOT NULL |
Time-to-live for the HBase mutation, in milliseconds. | W |
Connector Options #
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
connector |
required | no | (none) | String | Specify what connector to use, valid values are:
|
table-name |
required | yes | (none) | String | The name of HBase table to connect. By default, the table is in 'default' namespace. To assign the table a specified namespace you need to use 'namespace:table'. |
zookeeper.quorum |
required | yes | (none) | String | The HBase Zookeeper quorum. |
zookeeper.znode.parent |
optional | yes | /hbase | String | The root dir in Zookeeper for HBase cluster. |
null-string-literal |
optional | yes | null | String | Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type. |
sink.buffer-flush.max-size |
optional | yes | 2mb | MemorySize | Writing option, maximum size in memory of buffered rows for each writing request.
This can improve performance for writing data to HBase database, but may increase the latency.
Can be set to '0' to disable it.
|
sink.buffer-flush.max-rows |
optional | yes | 1000 | Integer | Writing option, maximum number of rows to buffer for each writing request.
This can improve performance for writing data to HBase database, but may increase the latency.
Can be set to '0' to disable it.
|
sink.buffer-flush.interval |
optional | yes | 1s | Duration | Writing option, the interval to flush any buffered rows.
This can improve performance for writing data to HBase database, but may increase the latency.
Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows'
can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.
|
sink.ignore-null-value |
optional | yes | false | Boolean | Writing option, whether ignore null value or not. |
sink.parallelism |
optional | no | (none) | Integer | Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
lookup.async |
optional | no | false | Boolean | Whether async lookup are enabled. If true, the lookup will be async. Note, async only supports hbase-2.2 connector. |
lookup.cache |
optional | yes | NONE | Enum Possible values: NONE, PARTIAL |
The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database). |
lookup.partial-cache.max-rows |
optional | yes | (none) | Long | The max number of rows of lookup cache, over this value, the oldest rows will be expired. "lookup.cache" must be set to "PARTIAL" to use this option. |
lookup.partial-cache.expire-after-write |
optional | yes | (none) | Duration | The max time to live for each rows in lookup cache after writing into the cache "lookup.cache" must be set to "PARTIAL" to use this option. |
lookup.partial-cache.expire-after-access |
optional | yes | (none) | Duration | The max time to live for each rows in lookup cache after accessing the entry in the cache. "lookup.cache" must be set to "PARTIAL" to use this option. |
lookup.partial-cache.caching-missing-key |
optional | yes | true | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. "lookup.cache" must be set to "PARTIAL" to use this option. |
lookup.max-retries |
optional | yes | 3 | Integer | The max retry times if lookup database failed. |
properties.* |
optional | no | (none) | String |
This can set and pass arbitrary HBase configurations. Suffix names must match the configuration key defined in HBase Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying HBaseClient. For example, you can add a kerberos authentication parameter 'properties.hbase.security.authentication' = 'kerberos' .
|
Deprecated Options #
These deprecated options has been replaced by new options listed above and will be removed eventually. Please consider using new options first.
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
lookup.cache.max-rows |
optional | yes | (none) | Integer | Please set "lookup.cache" = "PARTIAL" and use "lookup.partial-cache.max-rows" instead. |
lookup.cache.ttl |
optional | yes | (none) | Duration | Please set "lookup.cache" = "PARTIAL" and use "lookup.partial-cache.expire-after-write" instead. |
Flink SQL type | HBase conversion |
---|---|
CHAR / VARCHAR / STRING |
|
BOOLEAN |
|
BINARY / VARBINARY |
Returns byte[] as is. |
DECIMAL |
|
TINYINT |
|
SMALLINT |
|
INT |
|
BIGINT |
|
FLOAT |
|
DOUBLE |
|
DATE |
Stores the number of days since epoch as int value. |
TIME |
Stores the number of milliseconds of the day as int value. |
TIMESTAMP |
Stores the milliseconds since epoch as long value. |
ARRAY |
Not supported |
MAP / MULTISET |
Not supported |
ROW |
Not supported |