Using the HiveCatalog
, Apache Flink can be used for unified BATCH
and STREAM
processing of Apache
Hive Tables. This means Flink can be used as a more performant alternative to Hive’s batch engine,
or to continuously read and write data into and out of Hive tables to power real-time data
warehousing applications.
Flink supports reading data from Hive in both BATCH
and STREAMING
modes. When run as a BATCH
application, Flink will execute its query over the state of the table at the point in time when the
query is executed. STREAMING
reads will continuously monitor the table and incrementally fetch
new data as it is made available. Flink will read tables as bounded by default.
STREAMING
reads support consuming both partitioned and non-partitioned tables.
For partitioned tables, Flink will monitor the generation of new partitions, and read
them incrementally when available. For non-partitioned tables, Flink will monitor the generation
of new files in the folder and read new files incrementally.
Key | Default | Type | Description |
---|---|---|---|
streaming-source.enable |
false | Boolean | Enable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data. |
streaming-source.partition.include |
all | String | Option to set the partitions to read, the supported option are `all` and `latest`, the `all` means read all partitions; the `latest` means read latest partition in order of 'streaming-source.partition.order', the `latest` only works` when the streaming hive source table used as temporal table. By default the option is `all`. Flink supports temporal join the latest hive partition by enabling 'streaming-source.enable' and setting 'streaming-source.partition.include' to 'latest', at the same time, user can assign the partition compare order and data update interval by configuring following partition-related options. |
streaming-source.monitor-interval |
None | Duration | Time interval for consecutively monitoring partition/file. Notes: The default interval for hive streaming reading is '1 m', the default interval for hive streaming temporal join is '60 m', this is because there's one framework limitation that every TM will visit the Hive metaStore in current hive streaming temporal join implementation which may produce pressure to metaStore, this will improve in the future. |
streaming-source.partition-order |
partition-name | String | The partition order of streaming source, support create-time, partition-time and partition-name. create-time compares partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. partition-time compares the time extracted from partition name. partition-name compares partition name's alphabetical order. For non-partition table, this value should always be 'create-time'. By default the value is partition-name. The option is equality with deprecated option 'streaming-source.consume-order'. |
streaming-source.consume-start-offset |
None | String | Start offset for streaming consuming. How to parse and compare offsets depends on your order. For create-time and partition-time, should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use partition time extractor to extract time from partition. For partition-name, is the partition name string (e.g. pt_year=2020/pt_mon=10/pt_day=01). |
SQL Hints can be used to apply configurations to a Hive table without changing its definition in the Hive metastore.
Notes
Flink is able to read from Hive defined views, but some limitations apply:
1) The Hive catalog must be set as the current catalog before you can query the view.
This can be done by either tableEnv.useCatalog(...)
in Table API or USE CATALOG ...
in SQL Client.
2) Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. Make sure the view’s query is compatible with Flink grammar.
Flink will automatically used vectorized reads of Hive tables when the following conditions are met:
This feature is enabled by default. It may be disabled with the following configuration.
By default, Flink will infer the optimal parallelism for its Hive readers based on the number of files, and number of blocks in each file.
Flink allows you to flexibly configure the policy of parallelism inference. You can configure the
following parameters in TableConfig
(note that these parameters affect all sources of the job):
Key | Default | Type | Description |
---|---|---|---|
table.exec.hive.infer-source-parallelism |
true | Boolean | If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config. |
table.exec.hive.infer-source-parallelism.max |
1000 | Integer | Sets max infer parallelism for source operator. |
You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join. Please see temporal join for more information about the temporal join.
Flink supports processing-time temporal join Hive Table, the processing-time temporal join always joins the latest version of temporal table. Flink supports temporal join both partitioned table and Hive non-partitioned table, for partitioned table, Flink supports tracking the latest partition of Hive table automatically.
NOTE: Flink does not support event-time temporal join Hive table yet.
For a partitioned table which is changing over time, we can read it out as an unbounded stream, the partition can be acted as a version of the temporal table if every partition contains complete data of a version, the version of temporal table keeps the data of the partition.
Flink support tracking the latest partition(version) of temporal table automatically in processing time temporal join, the latest partition(version) is defined by ‘streaming-source.partition-order’ option, This is the most common user cases that use Hive table as dimension table in a Flink stream application job.
NOTE: This feature is only support in Flink STREAMING
Mode.
The following demo shows a classical business pipeline, the dimension table comes from Hive and it’s updated once every day by a batch pipeline job or a Flink job, the kafka stream comes from real time online business data or log and need to join with the dimension table to enrich stream.
For a Hive table, we can read it out as a bounded stream. In this case, the Hive table can only track its latest version at the time when we query. The latest version of table keep all data of the Hive table.
When performing the temporal join the latest Hive table, the Hive table will be cached in Slot memory and each record from the stream is joined against the table by key to decide whether a match is found. Using the latest Hive table as a temporal table does not require any additional configuration. Optionally, you can configure the TTL of the Hive table cache with the following property. After the cache expires, the Hive table will be scanned again to load the latest data.
Key | Default | Type | Description |
---|---|---|---|
lookup.join.cache.ttl |
60 min | Duration | The cache TTL (e.g. 10min) for the build table in lookup join. By default the TTL is 60 minutes. NOTES: The option only works when lookup bounded hive table source, if you're using streaming hive source as temporal table, please use 'streaming-source.monitor-interval' to configure the interval of data update. |
The following demo shows load all data of hive table as a temporal table.
Note:
streaming-source.monitor-interval
(latest partition as temporal table) or lookup.join.cache.ttl
(all partitions as temporal table). Otherwise, Jobs are prone to performance issues as the table needs to be updated and reloaded too frequently.Flink supports writing data from Hive in both BATCH
and STREAMING
modes. When run as a BATCH
application, Flink will write to a Hive table only making those records visible when the Job finishes.
BATCH
writes support both appending to and overwriting existing tables.
Data can also be inserted into particular partitions.
STREAMING
writes continuously adding new data to Hive, committing records - making them
visible - incrementally. Users control when/how to trigger commits with several properties. Insert
overwrite is not supported for streaming write.
The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit, and runs a batch query to read that data back out.
Please see the streaming sink for a full list of available configurations.
By default, for streaming writes, Flink only supports renaming committers, meaning the S3 filesystem
cannot support exactly-once streaming writes.
Exactly-once writes to S3 can be achieved by configuring the following parameter to false.
This will instruct the sink to use Flink’s native writers but only works for
parquet and orc file types.
This configuration is set in the TableConfig
and will affect all sinks of the job.
Key | Default | Type | Description |
---|---|---|---|
table.exec.hive.fallback-mapred-writer |
true | Boolean | If it is false, using flink native writer to write parquet and orc files; if it is true, using hadoop mapred record writer to write parquet and orc files. |
Flink’s Hive integration has been tested against the following file formats: