Fault Tolerance
Flink’s fault tolerance mechanism recovers programs in the presence of failures and continues to execute them. Such failures include machine hardware failures, network failures, transient program failures, etc.
Streaming Fault Tolerance
Flink has a checkpointing mechanism that recovers streaming jobs after failures. The checkpointing mechanism requires a persistent (or durable) source that can be asked for prior records again (Apache Kafka is a good example of such a source).
The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see Working with State) consistently to provide exactly once processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured state backend.
The docs on streaming fault tolerance describe in detail the technique behind Flink’s streaming fault tolerance mechanism.
To enable checkpointing, call enableCheckpointing(n)
on the StreamExecutionEnvironment
, where n is the checkpoint interval in milliseconds.
Other parameters for checkpointing include:
-
Number of retries: The
setNumberOfExecutionRerties()
method defines how many times the job is restarted after a failure. When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often. -
exactly-once vs. at-least-once: You can optionally pass a mode to the
enableCheckpointing(n)
method to choose between the two guarantee levels. Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications. -
number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
-
checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
Fault Tolerance Guarantees of Data Sources and Sinks
Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the snapshotting mechanism. The following table lists the state update guarantees of Flink coupled with the bundled connectors.
Please read the documentation of each connector to understand the details of the fault tolerance guarantees.
Source | Guarantees | Notes |
---|---|---|
Apache Kafka | exactly once | Use the appropriate Kafka connector for your version |
AWS Kinesis Streams | exactly once | |
RabbitMQ | at most once (v 0.10) / exactly once (v 1.0) | |
Twitter Streaming API | at most once | |
Collections | exactly once | |
Files | exactly once | |
Sockets | at most once |
To guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs to take part in the checkpointing mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:
Sink | Guarantees | Notes |
---|---|---|
HDFS rolling sink | exactly once | Implementation depends on Hadoop version |
Elasticsearch | at least once | |
Kafka producer | at least once | |
Cassandra sink | at least once / exactly once | exactly once only for idempotent updates |
AWS Kinesis Streams | at least once | |
File sinks | at least once | |
Socket sinks | at least once | |
Standard output | at least once | |
Redis sink | at least once |
Restart Strategies
Flink supports different restart strategies which control how the jobs are restarted in case of a failure. The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined. In case that the job is submitted with a restart strategy, this strategy overrides the cluster’s default setting.
The default restart strategy is set via Flink’s configuration file flink-conf.yaml
.
The configuration parameter restart-strategy defines which strategy is taken.
Per default, the no-restart strategy is used.
See the following list of available restart strategies to learn what values are supported.
Each restart strategy comes with its own set of parameters which control its behaviour. These values are also set in the configuration file. The description of each restart strategy contains more information about the respective configuration values.
Restart Strategy | Value for restart-strategy |
---|---|
Fixed delay | fixed-delay |
Failure rate | failure-rate |
No restart | none |
Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy.
This restart strategy is set programmatically by calling the setRestartStrategy
method on the ExecutionEnvironment
.
Note that this also works for the StreamExecutionEnvironment
.
The following example shows how we can set a fixed delay restart strategy for our job. In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
Fixed Delay Restart Strategy
The fixed delay restart strategy attempts a given number of times to restart the job. If the maximum number of attempts is exceeded, the job eventually fails. In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml
.
restart-strategy: fixed-delay
Configuration Parameter | Description | Default Value |
---|---|---|
Number of restart attempts | 1 | |
Delay between two consecutive restart attempts |
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
The fixed delay restart strategy can also be set programmatically:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
Restart Attempts
The number of times that Flink retries the execution before the job is declared as failed is configurable via the restart-strategy.fixed-delay.attempts parameter.
The default value is 1.
Retry Delays
Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay.
Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.
The default value is the value of akka.ask.timeout.
Failure Rate Restart Strategy
The failure rate restart strategy restarts job after failure, but when failure rate
(failures per time interval) is exceeded, the job eventually fails.
In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml
.
restart-strategy: failure-rate
Configuration Parameter | Description | Default Value |
---|---|---|
Maximum number of restarts in given time interval before failing a job | 1 | |
Time interval for measuring failure rate. | 1 minute | |
Delay between two consecutive restart attempts |
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
The failure rate restart strategy can also be set programmatically:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per interval
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))
No Restart Strategy
The job fails directly and no restart is attempted.
restart-strategy: none
The no restart strategy can also be set programmatically:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())