This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Blocking Shuffle #
Flink supports a batch execution mode in both DataStream API and Table / SQL for jobs executing across bounded input. In this mode, network exchanges occur via a blocking shuffle. Unlike the pipeline shuffle used for streaming applications, blocking exchanges persists data to some storage. Downstream tasks then fetch these values via the network. Such an exchange reduces the resources required to execute the job as it does not need the upstream and downstream tasks to run simultaneously.
As a whole, Flink provides two different types of blocking shuffles;
Hash shuffle and
They will be detailed in the following sections.
Hash Shuffle #
The default blocking shuffle implementation for 1.14 and lower,
Hash Shuffle, has each upstream task persist its results in a separate file for each downstream task on the local disk of the TaskManager. When the downstream tasks run, they will request partitions from the upstream TaskManager’s, which read the files and transmit data via the network.
Hash Shuffle provides different mechanisms for writing and reading files:
file: Writes files with the normal File IO, reads and transmits files with Netty
sendfilesystem call to reduce the number of data copies and memory consumption.
mmap: Writes and reads files with
auto: Writes files with the normal File IO, for file reading, it falls back to normal
fileoption on 32 bit machine and use
mmapon 64 bit machine. This is to avoid file size limitation of java
mmapimplementation on 32 bit machine.
The different mechanism could be chosen via TaskManager configurations.
This option is experimental and might be changed future.
If SSL is enabled, the
filemechanism can not use
FileRegionand instead uses an un-pooled buffer to cache data before transmitting. This might cause direct memory OOM. Additionally, since the synchronous file reading might block Netty threads for some time, the SSL handshake timeout needs to be increased to avoid connection reset errors.
The memory usage of
mmapis not accounted for by configured memory limits, but some resource frameworks like Yarn will track this memory usage and kill the container if memory exceeds some threshold.
Hash Shuffle works well for small scale jobs with SSD, but it also have some disadvantages:
- If the job scale is large, it might create too many files, and it requires a large write buffer to write these files at the same time.
- On HDD, when multiple downstream tasks fetch their data simultaneously, it might incur the issue of random IO.
Sort Shuffle #
Sort Shuffle is another blocking shuffle implementation introduced in version 1.13 and it becomes the default blocking shuffle implementation in 1.15. Different from
Sort Shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furthermore, by writing fewer files and making a best effort to read data sequentially,
Sort Shuffle can achieve better performance than
Hash Shuffle, especially on HDD. Additionally,
Sort Shuffle uses extra managed memory as data reading buffer and does not rely on
mmap mechanism, thus it also works well with SSL. Please refer to FLINK-19582 and FLINK-19614 for more details about
Here are some config options that might need adjustment when using sort blocking shuffle:
- taskmanager.network.sort-shuffle.min-buffers: Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough. Because this memory is allocated from network memory, to increase this value, you may also need to increase the total network memory by adjusting taskmanager.memory.network.fraction, taskmanager.memory.network.min and taskmanager.memory.network.max to avoid the potential “Insufficient number of network buffers” error.
- taskmanager.memory.framework.off-heap.batch-shuffle.size: Config option to control data reading buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough. Because this memory is cut from the framework off-heap memory, to increase this value, you need also to increase the total framework off-heap memory by adjusting taskmanager.memory.framework.off-heap.size to avoid the potential direct memory OOM error.
Sort Shuffleonly sort records by partition index instead of the records themselves, that is to say, the
sortis only used as a data clustering algorithm.
Choices of Blocking Shuffle #
As a summary,
- For small scale jobs running on SSD, both implementation should work.
- For large scale jobs or for jobs running on HDD,
Sort Shuffleshould be more suitable.
To switch between
Sort Shuffle and
Hash Shuffle, you need to adjust this config option: taskmanager.network.sort-shuffle.min-parallelism. It controls which shuffle implementation to use based on the parallelism of downstream tasks, if the parallelism is lower than the configured value,
Hash Shuffle will be used, otherwise
Sort Shuffle will be used. For versions lower than 1.15, its default value is
Hash Shuffle will be used by default. Since 1.15, its default value is 1, so
Sort Shuffle will be used by default.
Performance Tuning #
The following guidelins may help you to achieve better performance especially for large scale batch jobs:
- Always use
Sort Shuffleon HDD because
Sort Shufflecan largely improve stability and IO performance. Since 1.15,
Sort Shuffleis already the default blocking shuffle implementation, for 1.14 and lower version, you need to enable it manually by setting taskmanager.network.sort-shuffle.min-parallelism to 1.
- For both blocking shuffle implementations, you may consider enabling data compression to improve the performance unless the data is hard to compress. Since 1.15, data compression is already enabled by default, for 1.14 and lower version, you need to enable it manually.
Sort Shuffleis used, decreasing the number of exclusive buffers per channel and increasing the number of floating buffers per gate can help. For 1.14 and higher version, it is suggested to set taskmanager.network.memory.buffers-per-channel to 0 and set taskmanager.network.memory.floating-buffers-per-gate to a larger value (for example, 4096). This setting has two main advantages: 1) It decouples the network memory consumption from parallelism so for large scale jobs, the possibility of “Insufficient number of network buffers” error can be decreased; 2) Networker buffers are distributed among different channels according to needs, which can improve the network buffer utilization and further improve performance.
- Increase the total size of network memory. Currently, the default network memory size is pretty modest. For large scale jobs, it’s suggested to increase the total network memory fraction to at least 0.2 to achieve better performance. At the same time, you may also need to adjust the lower bound and upper bound of the network memory size, please refer to the memory configuration document for more information.
- Increase the memory size for shuffle data write. As mentioned in the above section, for large scale jobs, it’s suggested to increase the number of write buffers per result partition to at least (2 * parallelism) if you have enough memory. Note that you may also need to increase the total size of network memory to avoid the “Insufficient number of network buffers” error after you increase this config value.
- Increase the memory size for shuffle data read. As mentioned in the above section, for large scale jobs, it’s suggested to increase the size of the shared read memory to a larger value (for example, 256M or 512M). Because this memory is cut from the framework off-heap memory, you must increase taskmanager.memory.framework.off-heap.size by the same size to avoid the direct memory OOM error.
Trouble Shooting #
Here are some exceptions you may encounter (rarely) and the corresponding solutions that may help:
|Insufficient number of network buffers||This means the amount of network memory is not enough to run the target job and you need to increase the total network memory size. Note that since 1.15,
|Too many open files||This means that the file descriptors is not enough. If you are using
|Connection reset by peer||This usually means that the network is unstable or or under heavy burden. Other issues like SSL handshake timeout mentioned above may also cause this problem. If you are using
|Network connection timeout||This usually means that the network is unstable or under heavy burden and increasing the network connection timeout or enable connection retry may help.|
|Socket read/write timeout||This may indicate that the network is slow or under heavy burden and increasing the network send/receive buffer size may help. If the job is running in Kubernetes environment, using host network may also help.|
|Read buffer request timeout||This can happen only when you are using
|No space left on device||This usually means that the disk space or the inodes have been exhausted. Please consider extending the storage space or do some cleanup.|
|Out of memory error||If you are using
|Container killed by external resource manger||There are several reasons which can lead to the killing of a container, for example, kill a low priority container to make room for high priority container or the container uses too many resources like memory and disk space. As mentioned in the above section,