Lookup Join
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Lookup Join #

A Lookup Join is used to enrich a table with data that is queried from Flink Table Store. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.

First, create a table, and update it in real-time.

-- Create a table store catalog
CREATE CATALOG my_catalog WITH (
  'type'='table-store',
  'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
);

USE CATALOG my_catalog;

-- Create a table in table-store catalog
CREATE TABLE customers (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING,
  country STRING,
  zip STRING
);

-- Launch a streaming job to update customers table
INSERT INTO customers ...

-- Create a temporary left table, like from kafka
CREATE TEMPORARY TABLE Orders (
  order_id INT,
  total INT,
  customer_id INT,
  proc_time AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = '...',
  'properties.bootstrap.servers' = '...',
  'format' = 'csv'
  ...
);

Then, you can use this table in lookup join.

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
         JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
              ON o.customer_id = c.id;

The lookup join node will maintain the rocksdb cache locally and pull the latest updates of the table in real time, and only pull the necessary data. Therefore, your filter conditions are also very important.

In order to avoid excessive use of local disks, the lookup join feature is only suitable for table sizes below tens of millions.

Note: Partitioned or non-pk tables are not supported now.

Project pushdown can effectively reduce the overhead, FLINK-29138 fixed the bug that the project cannot be pushed down to the source. So it is preferable to use a version greater than or equal to flink 1.14.6, flink 1.15.3 and flink 1.16.0. Or you can cherry-pick the commit to your own Flink branch.

RocksDBOptions #

Options for rocksdb cache, you can configure options in WITH or dynamic table hints.

SELECT o.order_id, o.total, c.country, c.zip
  FROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */
  FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
Key Default Type Description
lookup.cache-rows
10000 Long The maximum number of rows to store in the cache.
rocksdb.block.blocksize
4 kb MemorySize The approximate size (in bytes) of user data packed per block. The default blocksize is '4KB'.
rocksdb.block.cache-size
8 mb MemorySize The amount of the cache for data blocks in RocksDB. The default block-cache size is '8MB'.
rocksdb.block.metadata-blocksize
4 kb MemorySize Approximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. The default blocksize is '4KB'.
rocksdb.bloom-filter.bits-per-key
10.0 Double Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0.
rocksdb.bloom-filter.block-based-mode
false Boolean If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'.
rocksdb.compaction.level.max-size-level-base
256 mb MemorySize The upper-bound of the total size of level base files in bytes. The default value is '256MB'.
rocksdb.compaction.level.target-file-size-base
64 mb MemorySize The target file size for compaction, which determines a level-1 file size. The default value is '64MB'.
rocksdb.compaction.level.use-dynamic-size
false Boolean If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is 'false'. For more information, please refer to RocksDB's doc.
rocksdb.compaction.style
LEVEL

Enum

The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses 'LEVEL' as default style.

Possible values:
  • "LEVEL"
  • "UNIVERSAL"
  • "FIFO"
  • "NONE"
rocksdb.compression.type
LZ4_COMPRESSION

Enum

The compression type.

Possible values:
  • "NO_COMPRESSION"
  • "SNAPPY_COMPRESSION"
  • "ZLIB_COMPRESSION"
  • "BZLIB2_COMPRESSION"
  • "LZ4_COMPRESSION"
  • "LZ4HC_COMPRESSION"
  • "XPRESS_COMPRESSION"
  • "ZSTD_COMPRESSION"
  • "DISABLE_COMPRESSION_OPTION"
rocksdb.files.open
-1 Integer The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. The default value is '-1'.
rocksdb.thread.num
2 Integer The maximum number of concurrent background flush and compaction jobs (per stateful operator). The default value is '2'.
rocksdb.use-bloom-filter
false Boolean If true, every newly created SST file will contain a Bloom filter. It is disabled by default.
rocksdb.writebuffer.count
2 Integer The maximum number of write buffers that are built up in memory. The default value is '2'.
rocksdb.writebuffer.number-to-merge
1 Integer The minimum number of write buffers that will be merged together before writing to storage. The default value is '1'.
rocksdb.writebuffer.size
64 mb MemorySize The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is '64MB'.