For single-node setups Flink is ready to go out of the box and you don’t need to change the default configuration to get started.
The out of the box configuration will use your default Java installation. You can manually set the environment variable JAVA_HOME
or the configuration key env.java.home
in conf/flink-conf.yaml
if you want to manually override the Java runtime to use.
This page lists the most common options that are typically needed to set up a well performing (distributed) installation. In addition a full list of all available configuration parameters is listed here.
All configuration is done in conf/flink-conf.yaml
, which is expected to be a flat collection of YAML key value pairs with format key: value
.
The system and run scripts parse the config at startup time. Changes to the configuration file require restarting the Flink JobManager and TaskManagers.
The configuration files for the TaskManagers can be different, Flink does not assume uniform machines in the cluster.
Key | Default | Description |
---|---|---|
jobmanager.heap.size |
"1024m" | JVM heap size for the JobManager. |
taskmanager.heap.size |
"1024m" | JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value. |
parallelism.default |
1 | |
taskmanager.numberOfTaskSlots |
1 | The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores). |
state.backend |
(none) | The state backend to be used to store and checkpoint state. |
state.checkpoints.dir |
(none) | The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). |
state.savepoints.dir |
(none) | The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). |
high-availability |
"NONE" | Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER". |
high-availability.storageDir |
(none) | File system path (URI) where Flink persists metadata in high-availability setups. |
security.ssl.internal.enabled |
false | Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc). |
security.ssl.rest.enabled |
false | Turns on SSL for external communication via the REST endpoints. |
HADOOP_CONF_DIR
instead.
These parameters configure the default HDFS used by Flink. Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files
) Files will also be written with default HDFS parameters (block size, replication factor).
fs.hdfs.hadoopconf
: The absolute path to the Hadoop File System’s (HDFS) configuration directory (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (hdfs:///path/to/files
, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like hdfs://address:port/path/to/files
. This option also causes file writers to pick up the HDFS’s default values for block sizes and replication factors. Flink will look for the “core-site.xml” and “hdfs-site.xml” files in the specified directory.
fs.hdfs.hdfsdefault
: The absolute path of Hadoop’s own configuration file “hdfs-default.xml” (DEFAULT: null).
fs.hdfs.hdfssite
: The absolute path of Hadoop’s own configuration file “hdfs-site.xml” (DEFAULT: null).
Key | Default | Description |
---|---|---|
classloader.parent-first-patterns.additional |
(none) | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to "classloader.parent-first-patterns.default". |
classloader.parent-first-patterns.default |
"java.; |
A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use "classloader.parent-first-patterns.additional" instead. |
classloader.resolve-order |
"child-first" | Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively). |
io.tmp.dirs |
'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone. | |
mode |
"new" | Switch to select the execution mode. Possible values are 'new' and 'legacy'. |
parallelism.default |
1 |
Key | Default | Description |
---|---|---|
jobmanager.archive.fs.dir |
(none) | |
jobmanager.execution.attempts-history-size |
16 | The maximum number of prior execution attempts kept in history. |
jobmanager.heap.size |
"1024m" | JVM heap size for the JobManager. |
jobmanager.resourcemanager.reconnect-interval |
2000 | This option specifies the interval in order to trigger a resource manager reconnection if the connection to the resource manager has been lost. This option is only intended for internal use. |
jobmanager.rpc.address |
(none) | The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers. |
jobmanager.rpc.port |
6123 | The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers. |
jobstore.cache-size |
52428800 | The job store cache size in bytes which is used to keep completed jobs in memory. |
jobstore.expiration-time |
3600 | The time in seconds after which a completed job expires and is purged from the job store. |
slot.idle.timeout |
50000 | The timeout in milliseconds for a idle slot in Slot Pool. |
slot.request.timeout |
300000 | The timeout in milliseconds for requesting a slot from Slot Pool. |
Key | Default | Description |
---|---|---|
task.cancellation.interval |
30000 | Time interval between two successive task cancellation attempts in milliseconds. |
task.cancellation.timeout |
180000 | Timeout in milliseconds after which a task cancellation times out and leads to a fatal TaskManager error. A value of 0 deactivates the watch dog. |
task.cancellation.timers.timeout |
7500 | |
task.checkpoint.alignment.max-size |
-1 | The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped). A value of -1 indicates that there is no limit. |
taskmanager.data.port |
0 | The task manager’s port used for data exchange operations. |
taskmanager.data.ssl.enabled |
true | Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true |
taskmanager.debug.memory.log |
false | Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM. |
taskmanager.debug.memory.log-interval |
5000 | The interval (in ms) for the log thread to log the current memory usage. |
taskmanager.exit-on-fatal-akka-error |
false | Whether the quarantine monitor for task managers shall be started. The quarantine monitor shuts down the actor system if it detects that it has quarantined another actor system or if it has been quarantined by another actor system. |
taskmanager.heap.size |
"1024m" | JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value. |
taskmanager.host |
(none) | The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file. |
taskmanager.jvm-exit-on-oom |
false | Whether to kill the TaskManager when the task thread throws an OutOfMemoryError. |
taskmanager.memory.fraction |
0.7 | The relative amount of memory (after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. This parameter is only evaluated, if taskmanager.memory.size is not set. |
taskmanager.memory.off-heap |
false | Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager as well as the network buffers. |
taskmanager.memory.preallocate |
false | Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. |
taskmanager.memory.segment-size |
"32768" | Size of memory buffers used by the network stack and the memory manager. |
taskmanager.memory.size |
"0" | Amount of memory to be allocated by the task manager's memory manager. If not set, a relative fraction will be allocated. |
taskmanager.network.detailed-metrics |
false | Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths. |
taskmanager.network.memory.buffers-per-channel |
2 | Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. |
taskmanager.network.memory.floating-buffers-per-gate |
8 | Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster. |
taskmanager.network.memory.fraction |
0.1 | Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that "taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may override this fraction. |
taskmanager.network.memory.max |
"1073741824" | Maximum memory size for network buffers. |
taskmanager.network.memory.min |
"67108864" | Minimum memory size for network buffers. |
taskmanager.network.request-backoff.initial |
100 | Minimum backoff for partition requests of input channels. |
taskmanager.network.request-backoff.max |
10000 | Maximum backoff for partition requests of input channels. |
taskmanager.numberOfTaskSlots |
1 | The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores). |
taskmanager.registration.initial-backoff |
"500 ms" | The initial registration backoff between two consecutive registration attempts. The backoff is doubled for each new registration attempt until it reaches the maximum registration backoff. |
taskmanager.registration.max-backoff |
"30 s" | The maximum registration backoff between two consecutive registration attempts. The max registration backoff requires a time unit specifier (ms/s/min/h/d). |
taskmanager.registration.refused-backoff |
"10 s" | The backoff after a registration has been refused by the job manager before retrying to connect. |
taskmanager.registration.timeout |
"5 min" | Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates. |
taskmanager.rpc.port |
"0" | The task manager’s IPC port. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. |
Key | Default | Description |
---|---|---|
akka.ask.timeout |
"10 s" | Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d). |
akka.client-socket-worker-pool.pool-size-factor |
1.0 | The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values. |
akka.client-socket-worker-pool.pool-size-max |
2 | Max number of threads to cap factor-based number to. |
akka.client-socket-worker-pool.pool-size-min |
1 | Min number of threads to cap factor-based number to. |
akka.client.timeout |
"60 s" | Timeout for all blocking calls on the client side. |
akka.fork-join-executor.parallelism-factor |
2.0 | The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values. |
akka.fork-join-executor.parallelism-max |
64 | Max number of threads to cap factor-based parallelism number to. |
akka.fork-join-executor.parallelism-min |
8 | Min number of threads to cap factor-based parallelism number to. |
akka.framesize |
"10485760b" | Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier. |
akka.jvm-exit-on-fatal-error |
true | Exit JVM on fatal Akka errors. |
akka.log.lifecycle.events |
false | Turns on the Akka’s remote logging of events. Set this value to ‘true’ in case of debugging. |
akka.lookup.timeout |
"10 s" | Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d). |
akka.retry-gate-closed-for |
50 | Milliseconds a gate should be closed for after a remote connection was disconnected. |
akka.server-socket-worker-pool.pool-size-factor |
1.0 | The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values. |
akka.server-socket-worker-pool.pool-size-max |
2 | Max number of threads to cap factor-based number to. |
akka.server-socket-worker-pool.pool-size-min |
1 | Min number of threads to cap factor-based number to. |
akka.ssl.enabled |
true | Turns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true. |
akka.startup-timeout |
(none) | Timeout after which the startup of a remote component is considered being failed. |
akka.tcp.timeout |
"20 s" | Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value. |
akka.throughput |
15 | Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness. |
akka.transport.heartbeat.interval |
"1000 s" | Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d). |
akka.transport.heartbeat.pause |
"6000 s" | Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d). |
akka.transport.threshold |
300.0 | Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value. |
akka.watch.heartbeat.interval |
"10 s" | Heartbeat interval for Akka’s DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should decrease this value or increase akka.watch.heartbeat.pause. A thorough description of Akka’s DeathWatch can be found here |
akka.watch.heartbeat.pause |
"60 s" | Acceptable heartbeat pause for Akka’s DeathWatch mechanism. A low value does not allow an irregular heartbeat. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value or decrease akka.watch.heartbeat.interval. Higher value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found here |
akka.watch.threshold |
12 | Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found here |
Key | Default | Description |
---|---|---|
rest.address |
(none) | The address that should be used by clients to connect to the server. |
rest.await-leader-timeout |
30000 | The time in ms that the client waits for the leader address, e.g., Dispatcher or WebMonitorEndpoint |
rest.bind-address |
(none) | The address that the server binds itself. |
rest.client.max-content-length |
104857600 | The maximum content length in bytes that the client will handle. |
rest.connection-timeout |
15000 | The maximum time in ms for the client to establish a TCP connection. |
rest.idleness-timeout |
300000 | The maximum time in ms for a connection to stay idle before failing. |
rest.port |
8081 | The port that the server listens on / the client connects to. |
rest.retry.delay |
3000 | The time in ms that the client waits between retries (See also `rest.retry.max-attempts`). |
rest.retry.max-attempts |
20 | The number of retries the client will attempt if a retryable operations fails. |
rest.server.max-content-length |
104857600 | The maximum content length in bytes that the server will handle. |
rest.server.numThreads |
4 | The number of threads for the asynchronous processing of requests. |
rest.server.thread-priority |
5 | Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing. |
Key | Default | Description |
---|---|---|
blob.fetch.backlog |
1000 | The config parameter defining the backlog of BLOB fetches on the JobManager. |
blob.fetch.num-concurrent |
50 | The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves. |
blob.fetch.retries |
5 | The config parameter defining number of retires for failed BLOB fetches. |
blob.offload.minsize |
1048576 | The minimum size for messages to be offloaded to the BlobServer. |
blob.server.port |
"0" | The config parameter defining the server port of the blob service. |
blob.service.cleanup.interval |
3600 | Cleanup interval of the blob caches at the task managers (in seconds). |
blob.service.ssl.enabled |
true | Flag to override ssl support for the blob service transport. |
blob.storage.directory |
(none) | The config parameter defining the storage directory to be used by the blob server. |
Key | Default | Description |
---|---|---|
heartbeat.interval |
10000 | Time interval for requesting heartbeat from sender side. |
heartbeat.timeout |
50000 | Timeout for requesting and receiving heartbeat for both sender and receiver sides. |
Key | Default | Description |
---|---|---|
security.ssl.algorithms |
"TLS_RSA_WITH_AES_128_CBC_SHA" | The comma separated list of standard SSL algorithms to be supported. Read more here |
security.ssl.internal.close-notify-flush-timeout |
-1 | The timeout (in ms) for flushing the `close_notify` that was triggered by closing a channel. If the `close_notify` was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default) |
security.ssl.internal.enabled |
false | Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc). |
security.ssl.internal.handshake-timeout |
-1 | The timeout (in ms) during SSL handshake. (-1 = use system default) |
security.ssl.internal.key-password |
(none) | The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.keystore |
(none) | The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.keystore-password |
(none) | The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.session-cache-size |
-1 | The size of the cache used for storing SSL session objects. According to https://github.com/netty/netty/issues/832, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default). |
security.ssl.internal.session-timeout |
-1 | The timeout (in ms) for the cached SSL session objects. (-1 = use system default) |
security.ssl.internal.truststore |
(none) | The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.truststore-password |
(none) | The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.key-password |
(none) | The secret to decrypt the server key in the keystore. |
security.ssl.keystore |
(none) | The Java keystore file to be used by the flink endpoint for its SSL Key and Certificate. |
security.ssl.keystore-password |
(none) | The secret to decrypt the keystore file. |
security.ssl.protocol |
"TLSv1.2" | The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list. |
security.ssl.rest.authentication-enabled |
false | Turns on mutual SSL authentication for external communication via the REST endpoints. |
security.ssl.rest.enabled |
false | Turns on SSL for external communication via the REST endpoints. |
security.ssl.rest.key-password |
(none) | The secret to decrypt the key in the keystore for Flink's external REST endpoints. |
security.ssl.rest.keystore |
(none) | The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints. |
security.ssl.rest.keystore-password |
(none) | The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints. |
security.ssl.rest.truststore |
(none) | The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints. |
security.ssl.rest.truststore-password |
(none) | The password to decrypt the truststore for Flink's external REST endpoints. |
security.ssl.truststore |
(none) | The truststore file containing the public CA certificates to be used by flink endpoints to verify the peer’s certificate. |
security.ssl.truststore-password |
(none) | The secret to decrypt the truststore. |
security.ssl.verify-hostname |
true | Flag to enable peer’s hostname verification during ssl handshake. |
These parameters allow for advanced tuning. The default values are sufficient when running concurrent high-throughput jobs on a large cluster.
Key | Default | Description |
---|---|---|
taskmanager.network.netty.client.connectTimeoutSec |
120 | The Netty client connection timeout. |
taskmanager.network.netty.client.numThreads |
-1 | The number of Netty client threads. |
taskmanager.network.netty.num-arenas |
-1 | The number of Netty arenas. |
taskmanager.network.netty.sendReceiveBufferSize |
0 | The Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux. |
taskmanager.network.netty.server.backlog |
0 | The netty server connection backlog. |
taskmanager.network.netty.server.numThreads |
-1 | The number of Netty server threads. |
taskmanager.network.netty.transport |
"nio" | The Netty transport type, either "nio" or "epoll" |
Key | Default | Description |
---|---|---|
web.access-control-allow-origin |
"*" | |
web.address |
(none) | |
web.backpressure.cleanup-interval |
600000 | |
web.backpressure.delay-between-samples |
50 | |
web.backpressure.num-samples |
100 | |
web.backpressure.refresh-interval |
60000 | |
web.checkpoints.history |
10 | |
web.history |
5 | |
web.log.path |
(none) | |
web.refresh-interval |
3000 | |
web.ssl.enabled |
true | |
web.submit.enable |
true | |
web.timeout |
10000 | |
web.tmpdir |
System.getProperty("java.io.tmpdir") | |
web.upload.dir |
(none) |
Key | Default | Description |
---|---|---|
fs.default-scheme |
(none) | The default filesystem scheme, used for paths that do not declare a scheme explicitly. May contain an authority, e.g. host:port in case of a HDFS NameNode. |
fs.output.always-create-directory |
false | File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to "true", writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to "false", the writer will directly create the file directly at the output path, without creating a containing directory. |
fs.overwrite-files |
false | Specifies whether file output writers should overwrite existing files by default. Set to "true" to overwrite by default,"false" otherwise. |
Key | Default | Description |
---|---|---|
compiler.delimited-informat.max-line-samples |
10 | he maximum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters. |
compiler.delimited-informat.max-sample-len |
2097152 | The maximal length of a line sample that the compiler takes for delimited inputs. If the length of a single sample exceeds this value (possible because of misconfiguration of the parser), the sampling aborts. This value can be overridden for a specific input with the input format’s parameters. |
compiler.delimited-informat.min-line-samples |
2 | The minimum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters |
Key | Default | Description |
---|---|---|
taskmanager.runtime.hashjoin-bloom-filters |
false | Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles. |
taskmanager.runtime.max-fan |
128 | The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small. |
taskmanager.runtime.sort-spilling-threshold |
0.8 | A sort operation starts spilling when this fraction of its memory budget is full. |
The configuration keys in this section are independent of the used resource management framework (YARN, Mesos, Standalone, …)
Key | Default | Description |
---|---|---|
containerized.heap-cutoff-min |
600 | Minimum amount of heap memory to remove in containers, as a safety margin. |
containerized.heap-cutoff-ratio |
0.25 | Percentage of heap space to remove from containers (YARN / Mesos), to compensate for other JVM memory usage. |
local.number-resourcemanager |
1 | |
resourcemanager.job.timeout |
"5 minutes" | Timeout for jobs which don't have a job manager as leader assigned. |
resourcemanager.rpc.port |
0 | Defines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges. |
resourcemanager.taskmanager-timeout |
30000 | The timeout for an idle task manager to be released. |
Key | Default | Description |
---|---|---|
yarn.application-attempts |
(none) | Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you’ll need to set the JM host:port manually. It is recommended to leave this option at 1. |
yarn.application-master.port |
"0" | With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. |
yarn.appmaster.rpc.address |
(none) | The hostname or address where the application master RPC system is listening. |
yarn.appmaster.rpc.port |
-1 | The port where the application master RPC system is listening. |
yarn.containers.vcores |
-1 | The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler . |
yarn.heartbeat-delay |
5 | Time between heartbeats with the ResourceManager in seconds. |
yarn.maximum-failed-containers |
(none) | Maximum number of containers the system is going to reallocate in case of a failure. |
yarn.per-job-cluster.include-user-jar |
"ORDER" | Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). Setting this parameter to "DISABLED" causes the jar to be included in the user class path instead. |
yarn.properties-file.location |
(none) | When a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users). |
yarn.tags |
(none) | A comma-separated list of tags to apply to the Flink YARN application. |
Key | Default | Description |
---|---|---|
mesos.failover-timeout |
604800 | The failover timeout in seconds for the Mesos scheduler, after which running tasks are automatically shut down. |
mesos.initial-tasks |
0 | The initial workers to bring up when the master starts. This option is ignored unless Flink is in legacy mode. |
mesos.master |
(none) | The Mesos master URL. The value should be in one of the following forms:
|
mesos.maximum-failed-tasks |
-1 | The maximum number of failed workers before the cluster fails. May be set to -1 to disable this feature. This option is ignored unless Flink is in legacy mode. |
mesos.resourcemanager.artifactserver.port |
0 | The config parameter defining the Mesos artifact server port to use. Setting the port to 0 will let the OS choose an available port. |
mesos.resourcemanager.artifactserver.ssl.enabled |
true | Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to be set to true encryption to enable encryption. |
mesos.resourcemanager.framework.name |
"Flink" | Mesos framework name |
mesos.resourcemanager.framework.principal |
(none) | Mesos framework principal |
mesos.resourcemanager.framework.role |
"*" | Mesos framework role definition |
mesos.resourcemanager.framework.secret |
(none) | Mesos framework secret |
mesos.resourcemanager.framework.user |
(none) | Mesos framework user |
mesos.resourcemanager.tasks.port-assignments |
(none) | Comma-separated list of configuration keys which represent a configurable port. All port keys will dynamically get a port assigned through Mesos. |
Key | Default | Description |
---|---|---|
mesos.constraints.hard.hostattribute |
(none) | Constraints for task placement on Mesos based on agent attributes. Takes a comma-separated list of key:value pairs corresponding to the attributes exposed by the target mesos agents. Example: az:eu-west-1a,series:t2 |
mesos.resourcemanager.tasks.bootstrap-cmd |
(none) | A command which is executed before the TaskManager is started. |
mesos.resourcemanager.tasks.container.docker.force-pull-image |
false | Instruct the docker containerizer to forcefully pull the image rather than reuse a cached version. |
mesos.resourcemanager.tasks.container.docker.parameters |
(none) | Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of "key=value" pairs. The "value" may contain '='. |
mesos.resourcemanager.tasks.container.image.name |
(none) | Image name to use for the container. |
mesos.resourcemanager.tasks.container.type |
"mesos" | Type of the containerization used: “mesos” or “docker”. |
mesos.resourcemanager.tasks.container.volumes |
(none) | A comma separated list of [host_path:]container_path[:RO|RW]. This allows for mounting additional volumes into your container. |
mesos.resourcemanager.tasks.cpus |
0.0 | CPUs to assign to the Mesos workers. |
mesos.resourcemanager.tasks.gpus |
0 | GPUs to assign to the Mesos workers. |
mesos.resourcemanager.tasks.hostname |
(none) | Optional value to define the TaskManager’s hostname. The pattern _TASK_ is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. _TASK_.flink-service.mesos) for name lookups. |
mesos.resourcemanager.tasks.mem |
1024 | Memory to assign to the Mesos workers in MB. |
mesos.resourcemanager.tasks.taskmanager-cmd |
"$FLINK_HOME/bin/mesos-taskmanager.sh" | |
mesos.resourcemanager.tasks.uris |
(none) | A comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers. |
taskmanager.numberOfTaskSlots |
1 | The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores). |
Key | Default | Description |
---|---|---|
high-availability |
"NONE" | Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER". |
high-availability.cluster-id |
"/default" | The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos. |
high-availability.job.delay |
(none) | The time before a JobManager after a fail over recovers the current jobs. |
high-availability.jobmanager.port |
"0" | Optional port (range) used by the job manager in high-availability mode. |
high-availability.storageDir |
(none) | File system path (URI) where Flink persists metadata in high-availability setups. |
Key | Default | Description |
---|---|---|
high-availability.zookeeper.client.acl |
"open" | Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos). |
high-availability.zookeeper.client.connection-timeout |
15000 | Defines the connection timeout for ZooKeeper in ms. |
high-availability.zookeeper.client.max-retry-attempts |
3 | Defines the number of connection retries before the client gives up. |
high-availability.zookeeper.client.retry-wait |
5000 | Defines the pause between consecutive retries in ms. |
high-availability.zookeeper.client.session-timeout |
60000 | Defines the session timeout for the ZooKeeper session in ms. |
high-availability.zookeeper.path.checkpoint-counter |
"/checkpoint-counter" | ZooKeeper root path (ZNode) for checkpoint counters. |
high-availability.zookeeper.path.checkpoints |
"/checkpoints" | ZooKeeper root path (ZNode) for completed checkpoints. |
high-availability.zookeeper.path.jobgraphs |
"/jobgraphs" | ZooKeeper root path (ZNode) for job graphs |
high-availability.zookeeper.path.latch |
"/leaderlatch" | Defines the znode of the leader latch which is used to elect the leader. |
high-availability.zookeeper.path.leader |
"/leader" | Defines the znode of the leader which contains the URL to the leader and the current leader session ID. |
high-availability.zookeeper.path.mesos-workers |
"/mesos-workers" | The ZooKeeper root path for persisting the Mesos worker information. |
high-availability.zookeeper.path.root |
"/flink" | The root path under which Flink stores its entries in ZooKeeper. |
high-availability.zookeeper.path.running-registry |
"/running_job_registry/" | |
high-availability.zookeeper.quorum |
(none) | The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper. |
Key | Default | Description |
---|---|---|
zookeeper.sasl.disable |
false | |
zookeeper.sasl.login-context-name |
"Client" | |
zookeeper.sasl.service-name |
"zookeeper" |
Key | Default | Description |
---|---|---|
security.kerberos.login.contexts |
(none) | A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication) |
security.kerberos.login.keytab |
(none) | Absolute path to a Kerberos keytab file that contains the user credentials. |
security.kerberos.login.principal |
(none) | Kerberos principal name associated with the keytab. |
security.kerberos.login.use-ticket-cache |
true | Indicates whether to read from your Kerberos ticket cache. |
Key | Default | Description |
---|---|---|
env.hadoop.conf.dir |
(none) | Path to hadoop configuration directory. It is required to read HDFS and/or YARN configuration. You can also set it via environment variable. |
env.java.opts |
(none) | |
env.java.opts.jobmanager |
(none) | |
env.java.opts.taskmanager |
(none) | |
env.log.dir |
(none) | Defines the directory where the Flink logs are saved. It has to be an absolute path. (Defaults to the log directory under Flink’s home) |
env.log.max |
5 | The maximum number of old log files to keep. |
env.ssh.opts |
(none) | Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh). |
env.yarn.conf.dir |
(none) | Path to yarn configuration directory. It is required to run flink on YARN. You can also set it via environment variable. |
Key | Default | Description |
---|---|---|
state.backend |
(none) | The state backend to be used to store and checkpoint state. |
state.backend.async |
true | Option whether the state backend should use an asynchronous snapshot method where possible and configurable. Some state backends may not support asynchronous snapshots, or only support asynchronous snapshots, and ignore this option. |
state.backend.fs.memory-threshold |
1024 | The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. |
state.backend.incremental |
false | Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Some state backends may not support incremental checkpoints and ignore this option. |
state.backend.local-recovery |
false | |
state.checkpoints.dir |
(none) | The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). |
state.checkpoints.num-retained |
1 | The maximum number of completed checkpoints to retain. |
state.savepoints.dir |
(none) | The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). |
taskmanager.state.local.root-dirs |
(none) |
Key | Default | Description |
---|---|---|
state.backend.rocksdb.localdir |
(none) | The local directory (on the TaskManager) where RocksDB puts its files. |
state.backend.rocksdb.timer-service.factory |
"HEAP" | This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB . |
Key | Default | Description |
---|---|---|
query.client.network-threads |
0 | Number of network (Netty's event loop) Threads for queryable state client. |
query.proxy.network-threads |
0 | Number of network (Netty's event loop) Threads for queryable state proxy. |
query.proxy.ports |
"9069" | The port range of the queryable state proxy. The specified range can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234". |
query.proxy.query-threads |
0 | Number of query Threads for queryable state proxy. Uses the number of slots if set to 0. |
query.server.network-threads |
0 | Number of network (Netty's event loop) Threads for queryable state server. |
query.server.ports |
"9067" | The port range of the queryable state server. The specified range can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234". |
query.server.query-threads |
0 | Number of query Threads for queryable state server. Uses the number of slots if set to 0. |
Key | Default | Description |
---|---|---|
metrics.latency.granularity |
"subtask" | Defines the granularity of latency metrics. Accepted values are:
|
metrics.latency.history-size |
128 | Defines the number of measured latencies to maintain at each operator. |
metrics.latency.interval |
2000 | Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster. |
metrics.reporter.<name>.<parameter> |
(none) | Configures the parameter <parameter> for the reporter named <name>. |
metrics.reporter.<name>.class |
(none) | The reporter class to use for the reporter named <name>. |
metrics.reporter.<name>.interval |
(none) | The reporter interval to use for the reporter named <name>. |
metrics.reporters |
(none) | |
metrics.scope.delimiter |
"." | |
metrics.scope.jm |
"<host>.jobmanager" | Defines the scope format string that is applied to all metrics scoped to a JobManager. |
metrics.scope.jm.job |
"<host>.jobmanager.<job_name>" | Defines the scope format string that is applied to all metrics scoped to a job on a JobManager. |
metrics.scope.operator |
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>" | Defines the scope format string that is applied to all metrics scoped to an operator. |
metrics.scope.task |
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>" | Defines the scope format string that is applied to all metrics scoped to a task. |
metrics.scope.tm |
"<host>.taskmanager.<tm_id>" | Defines the scope format string that is applied to all metrics scoped to a TaskManager. |
metrics.scope.tm.job |
"<host>.taskmanager.<tm_id>.<job_name>" | Defines the scope format string that is applied to all metrics scoped to a job on a TaskManager. |
You have to configure jobmanager.archive.fs.dir
in order to archive terminated jobs and add it to the list of monitored directories via historyserver.archive.fs.dir
if you want to display them via the HistoryServer’s web frontend.
jobmanager.archive.fs.dir
: Directory to upload information about terminated jobs to. You have to add this directory to the list of monitored directories of the history server via historyserver.archive.fs.dir
.Key | Default | Description |
---|---|---|
historyserver.archive.fs.dir |
(none) | Comma separated list of directories to fetch archived jobs from. The history server will monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a directory via `jobmanager.archive.fs.dir`. |
historyserver.archive.fs.refresh-interval |
10000 | Interval in milliseconds for refreshing the archived job directories. |
historyserver.web.address |
(none) | Address of the HistoryServer's web interface. |
historyserver.web.port |
8082 | Port of the HistoryServers's web interface. |
historyserver.web.refresh-interval |
10000 | |
historyserver.web.ssl.enabled |
false | Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true. |
historyserver.web.tmpdir |
(none) | This configuration parameter allows defining the Flink web directory to be used by the history server web interface. The web interface will copy its static files into the directory. |
mode
: Execution mode of Flink. Possible values are legacy
and new
. In order to start the legacy components, you have to specify legacy
(DEFAULT: new
).If you ever see the Exception java.io.IOException: Insufficient number of network buffers
, you
need to adapt the amount of memory used for network buffers in order for your program to run on your
task managers.
Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the application. A sufficient number of network buffers is critical to achieve a good throughput.
In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks.
taskmanager.memory.off-heap
. This way, we can pass these buffers directly to the underlying network stack layers.
Previously, the number of network buffers was set manually which became a quite error-prone task (see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for network buffers with the following configuration parameters:
taskmanager.network.memory.fraction
: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),taskmanager.network.memory.min
: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB),taskmanager.network.memory.max
: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB), andtaskmanager.memory.segment-size
: Size of memory buffers used by the memory manager and the
network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).The required number of buffers on a task manager is total-degree-of-parallelism (number of targets) * intra-node-parallelism (number of sources in one task manager) * n with n being a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time. Since the intra-node-parallelism is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to
Where #slots per TM
are the number of slots per TaskManager and #TMs
are the total number of task managers.
To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.
Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus allocate roughly 300 MiBytes for network buffers.
The number and size of network buffers can be configured with the following parameters:
taskmanager.network.numberOfBuffers
, andtaskmanager.memory.segment-size
.Although Flink aims to process as much data in main memory as possible, it is not uncommon that more data needs to be processed than memory is available. Flink’s runtime is designed to write temporary data to disk to handle these situations.
The taskmanager.tmp.dirs
parameter specifies a list of directories into which Flink writes temporary files. The paths of the directories need to be separated by ‘:’ (colon character). Flink will concurrently write (or read) one temporary file to (from) each configured directory. This way, temporary I/O can be evenly distributed over multiple independent I/O devices such as hard disks to improve performance. To leverage fast I/O devices (e.g., SSD, RAID, NAS), it is possible to specify a directory multiple times.
If the taskmanager.tmp.dirs
parameter is not explicitly specified, Flink writes temporary data to the temporary directory of the operating system, such as /tmp in Linux systems.
Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.
Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good default for taskmanager.numberOfTaskSlots
.
When starting a Flink application, users can supply the default number of slots to use for that job. The command line value therefore is called -p
(for parallelism). In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.