This connector provides access to partitioned files in filesystems supported by the Flink FileSystem abstraction.
The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system.
The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem table can be defined as:
Attention Make sure to include Flink File System specific dependencies.
Attention File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
Attention The behaviour of file system connector is much different from previous legacy filesystem connector
:
the path parameter is specified for a directory not for a file and you can’t get a human-readable file in the path that you declare.
Flink’s file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain datetime
and hour
partitions.
path
└── datetime=2019-08-25
└── hour=11
├── part-0.parquet
├── part-1.parquet
└── hour=12
├── part-0.parquet
└── datetime=2019-08-26
└── hour=6
├── part-0.parquet
The file system table supports both partition inserting and overwrite inserting. See INSERT Statement. When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table.
The file system connector supports multiple formats:
avro.codec
.The file system connector supports streaming writes, based on Flink’s Streaming File Sink to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro.
You can write SQL directly, insert the stream data into the non-partitioned table. If it is a partitioned table, you can configure partition related operations. See Partition Commit for details.
Data within the partition directories are split into part files. Each partition will contain at least one part file for each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional part file will be created according to the configurable rolling policy. The policy rolls part files based on size, a timeout that specifies the maximum duration for which a file can be open.
Key | Default | Type | Description |
---|---|---|---|
sink.rolling-policy.file-size |
128MB | MemorySize | The maximum part file size before rolling. |
sink.rolling-policy.rollover-interval |
30 min | Duration | The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files). The frequency at which this is checked is controlled by the 'sink.rolling-policy.check-interval' option. |
sink.rolling-policy.check-interval |
1 min | Duration | The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'. |
NOTE: For bulk formats (parquet, orc, avro), the rolling policy in combination with the checkpoint interval(pending files become finished on the next checkpoint) control the size and number of these parts.
NOTE: For row formats (csv, json), you can set the parameter sink.rolling-policy.file-size
or sink.rolling-policy.rollover-interval
in the connector properties and parameter execution.checkpointing.interval
in flink-conf.yaml together
if you don’t want to wait a long period before observe the data exists in file system. For other formats (avro, orc), you can just set parameter execution.checkpointing.interval
in flink-conf.yaml.
The file sink supports file compactions, which allows applications to have smaller checkpoint intervals without generating a large number of files.
Key | Default | Type | Description |
---|---|---|---|
auto-compaction |
false | Boolean | Whether to enable automatic compaction in streaming sink or not. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction. |
compaction.file-size |
(none) | MemorySize | The compaction target file size, the default value is the rolling file size. |
If enabled, file compaction will merge multiple small files into larger files based on the target file size. When running file compaction in production, please be aware that:
After writing a partition, it is often necessary to notify downstream applications. For example, add the partition to a Hive metastore or writing a _SUCCESS
file in the directory. The file system sink contains a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of triggers
and policies
.
NOTE: Partition Commit only works in dynamic partition inserting.
To define when to commit a partition, providing partition commit trigger:
Key | Default | Type | Description |
---|---|---|---|
sink.partition-commit.trigger |
process-time | String | Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'. |
sink.partition-commit.delay |
0 s | Duration | The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'. |
There are two types of trigger:
If you want to let downstream see the partition as soon as possible, no matter whether its data is complete or not:
If you want to let downstream see the partition only when its data is complete, and your job has watermark generation, and you can extract the time from partition values:
If you want to let downstream see the partition only when its data is complete, but there is no watermark, or the time cannot be extracted from partition values:
Late data processing: The record will be written into its partition when a record is supposed to be written into a partition that has already been committed, and then the committing of this partition will be triggered again.
Time extractors define extracting time from partition values.
Key | Default | Type | Description |
---|---|---|---|
partition.time-extractor.kind |
default | String | Time extractor to extract time from partition values. Support default and custom. For default, can configure timestamp pattern. For custom, should configure extractor class. |
partition.time-extractor.class |
(none) | String | The extractor class for implement PartitionTimeExtractor interface. |
partition.time-extractor.timestamp-pattern |
(none) | String | The 'default' construction way allows users to use partition fields to get a legal timestamp pattern. Default support 'yyyy-mm-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can configure: '$dt $hour:00:00'. |
The default extractor is based on a timestamp pattern composed of your partition fields. You can also specify an implementation for fully custom partition extraction based on the PartitionTimeExtractor
interface.
The partition commit policy defines what action is taken when partitions are committed.
Key | Default | Type | Description |
---|---|---|---|
sink.partition-commit.policy.kind |
(none) | String | Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. custom: use policy class to create a commit policy. Support to configure multiple policies: 'metastore,success-file'. |
sink.partition-commit.policy.class |
(none) | String | The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy. |
sink.partition-commit.success-file.name |
_SUCCESS | String | The file name for success-file partition commit policy, default is '_SUCCESS'. |
You can extend the implementation of commit policy, The custom commit policy implementation like:
The below shows how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.