Lookup Joins
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Lookup Joins #

Lookup Joins are a type of join in streaming queries. It is used to enrich a table with data that is queried from Table Store. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.

Table Store supports lookup joins on unpartitioned tables with primary keys in Flink. The following example illustrates this feature.

First, let’s create a Table Store table and update it in real-time.

-- Create a table store catalog
CREATE CATALOG my_catalog WITH (
  'type'='table-store',
  'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
);

USE CATALOG my_catalog;

-- Create a table in table-store catalog
CREATE TABLE customers (
    id INT PRIMARY KEY NOT ENFORCED,
    name STRING,
    country STRING,
    zip STRING
);

-- Launch a streaming job to update customers table
INSERT INTO customers ...

-- Create a temporary left table, like from kafka
CREATE TEMPORARY TABLE Orders (
    order_id INT,
    total INT,
    customer_id INT,
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = '...',
    'properties.bootstrap.servers' = '...',
    'format' = 'csv'
    ...
);

You can now use customers in a lookup join query.

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

The lookup join operator will maintain a RocksDB cache locally and pull the latest updates of the table in real time. Lookup join operator will only pull the necessary data, so your filter conditions are very important for performance.

This feature is only suitable for tables containing at most tens of millions of records to avoid excessive use of local disks.

RocksDB Cache Options #

The following options allow users to finely adjust RocksDB for better performance. You can either specify them in table properties or in dynamic table hints.

-- dynamic table hints example
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
Key Default Type Description
lookup.cache-rows
10000 Long The maximum number of rows to store in the cache.
rocksdb.block.blocksize
4 kb MemorySize The approximate size (in bytes) of user data packed per block. The default blocksize is '4KB'.
rocksdb.block.cache-size
8 mb MemorySize The amount of the cache for data blocks in RocksDB. The default block-cache size is '8MB'.
rocksdb.block.metadata-blocksize
4 kb MemorySize Approximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. The default blocksize is '4KB'.
rocksdb.bloom-filter.bits-per-key
10.0 Double Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0.
rocksdb.bloom-filter.block-based-mode
false Boolean If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'.
rocksdb.compaction.level.max-size-level-base
256 mb MemorySize The upper-bound of the total size of level base files in bytes. The default value is '256MB'.
rocksdb.compaction.level.target-file-size-base
64 mb MemorySize The target file size for compaction, which determines a level-1 file size. The default value is '64MB'.
rocksdb.compaction.level.use-dynamic-size
false Boolean If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is 'false'. For more information, please refer to RocksDB's doc.
rocksdb.compaction.style
LEVEL

Enum

The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses 'LEVEL' as default style.

Possible values:
  • "LEVEL"
  • "UNIVERSAL"
  • "FIFO"
  • "NONE"
rocksdb.compression.type
LZ4_COMPRESSION

Enum

The compression type.

Possible values:
  • "NO_COMPRESSION"
  • "SNAPPY_COMPRESSION"
  • "ZLIB_COMPRESSION"
  • "BZLIB2_COMPRESSION"
  • "LZ4_COMPRESSION"
  • "LZ4HC_COMPRESSION"
  • "XPRESS_COMPRESSION"
  • "ZSTD_COMPRESSION"
  • "DISABLE_COMPRESSION_OPTION"
rocksdb.files.open
-1 Integer The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. The default value is '-1'.
rocksdb.thread.num
2 Integer The maximum number of concurrent background flush and compaction jobs (per stateful operator). The default value is '2'.
rocksdb.use-bloom-filter
false Boolean If true, every newly created SST file will contain a Bloom filter. It is disabled by default.
rocksdb.writebuffer.count
2 Integer The maximum number of write buffers that are built up in memory. The default value is '2'.
rocksdb.writebuffer.number-to-merge
1 Integer The minimum number of write buffers that will be merged together before writing to storage. The default value is '1'.
rocksdb.writebuffer.size
64 mb MemorySize The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is '64MB'.