Class JobManagerCheckpointStorage
- java.lang.Object
-
- org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
-
- All Implemented Interfaces:
Serializable
,CheckpointStorage
,ConfigurableCheckpointStorage
@PublicEvolving public class JobManagerCheckpointStorage extends Object implements CheckpointStorage, ConfigurableCheckpointStorage
TheCheckpointStorage
checkpoints state directly to the JobManager's memory (hence the name), but savepoints will be persisted to a file system.This checkpoint storage is primarily for experimentation, quick local setups, or for streaming applications that have very small state: Because it requires checkpoints to go through the JobManager's memory, larger state will occupy larger portions of the JobManager's main memory, reducing operational stability. For any other setup, the
FileSystemCheckpointStorage
should be used. TheFileSystemCheckpointStorage
but checkpoints state directly to files rather than to the JobManager's memory, thus supporting larger state sizes and more highly available recovery.State Size Considerations
State checkpointing with this storage is subject to the following conditions:
- Each individual state must not exceed the configured maximum state size (see
getMaxStateSize()
. - All state from one task (i.e., the sum of all operator states and keyed states from all chained operators of the task) must not exceed what the RPC system supports, which is be default < 10 MB. That limit can be configured up, but that is typically not advised.
- The sum of all states in the application times all retained checkpoints must comfortably fit into the JobManager's JVM heap space.
Persistence Guarantees
For the use cases where the state sizes can be handled by this checkpoint storage, the storage does guarantee persistence for savepoints, externalized checkpoints (if configured), and checkpoints (when high-availability is configured).
Configuration
As for all checkpoint storage, this storage policy can either be configured within the application (by creating the storage with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the checkpoint storage was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the storage if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the
configure(ReadableConfig, ClassLoader)
method.- See Also:
- Serialized Form
-
-
Field Summary
Fields Modifier and Type Field Description static int
DEFAULT_MAX_STATE_SIZE
The default maximal size that the snapshotted memory state may have (5 MiBytes).
-
Constructor Summary
Constructors Constructor Description JobManagerCheckpointStorage()
Creates a new job manager checkpoint storage that accepts states whose serialized forms are up to the default state size (5 MB).JobManagerCheckpointStorage(int maxStateSize)
Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.JobManagerCheckpointStorage(String checkpointPath)
Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.JobManagerCheckpointStorage(Path checkpointPath, int maxStateSize)
Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description JobManagerCheckpointStorage
configure(ReadableConfig config, ClassLoader classLoader)
Creates a copy of this checkpoint storage that uses the values defined in the configuration for fields where that were not specified in this checkpoint storage.CheckpointStorageAccess
createCheckpointStorage(JobID jobId)
Creates a storage for checkpoints for the given job.static JobManagerCheckpointStorage
createFromConfig(ReadableConfig config, ClassLoader classLoader)
Creates a newJobManagerCheckpointStorage
using the given configuration.Path
getCheckpointPath()
int
getMaxStateSize()
Gets the maximum size that an individual state can have, as configured in the constructor (by default 5242880).Path
getSavepointPath()
CompletedCheckpointStorageLocation
resolveCheckpoint(String pointer)
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location.String
toString()
-
-
-
Field Detail
-
DEFAULT_MAX_STATE_SIZE
public static final int DEFAULT_MAX_STATE_SIZE
The default maximal size that the snapshotted memory state may have (5 MiBytes).- See Also:
- Constant Field Values
-
-
Constructor Detail
-
JobManagerCheckpointStorage
public JobManagerCheckpointStorage()
Creates a new job manager checkpoint storage that accepts states whose serialized forms are up to the default state size (5 MB).Checkpoint and default savepoint locations are used as specified in the runtime configuration.
-
JobManagerCheckpointStorage
public JobManagerCheckpointStorage(int maxStateSize)
Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.WARNING: Increasing the size of this value beyond the default value (5242880) should be done with care. The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there and the JobManager needs to be able to hold all aggregated state in its memory.
- Parameters:
maxStateSize
- The maximal size of the serialized state.
-
JobManagerCheckpointStorage
public JobManagerCheckpointStorage(String checkpointPath)
Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.WARNING: Increasing the size of this value beyond the default value (5242880) should be done with care. The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there and the JobManager needs to be able to hold all aggregated state in its memory.
- Parameters:
checkpointPath
- path to write checkpoint metadata to. If null, the value from the runtime configuration will be used.
-
JobManagerCheckpointStorage
public JobManagerCheckpointStorage(Path checkpointPath, int maxStateSize)
Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.WARNING: Increasing the size of this value beyond the default value (5242880) should be done with care. The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there and the JobManager needs to be able to hold all aggregated state in its memory.
- Parameters:
maxStateSize
- The maximal size of the serialized state.
-
-
Method Detail
-
getMaxStateSize
public int getMaxStateSize()
Gets the maximum size that an individual state can have, as configured in the constructor (by default 5242880).- Returns:
- The maximum size that an individual state can have
-
getCheckpointPath
@Nullable public Path getCheckpointPath()
- Returns:
- The location where checkpoints will be externalized if set.
-
getSavepointPath
@Nullable public Path getSavepointPath()
- Returns:
- The default location where savepoints will be externalized if set.
-
configure
public JobManagerCheckpointStorage configure(ReadableConfig config, ClassLoader classLoader)
Creates a copy of this checkpoint storage that uses the values defined in the configuration for fields where that were not specified in this checkpoint storage.- Specified by:
configure
in interfaceConfigurableCheckpointStorage
- Parameters:
config
- The configurationclassLoader
- The class loader that should be used to load the checkpoint storage.- Returns:
- The re-configured variant of the checkpoint storage
-
createFromConfig
public static JobManagerCheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException
Creates a newJobManagerCheckpointStorage
using the given configuration.- Parameters:
config
- The Flink configuration (loaded by the TaskManager).classLoader
- The clsas loader that should be used to load the checkpoint storage.- Returns:
- The created checkpoint storage.
- Throws:
IllegalConfigurationException
- If the configuration misses critical values, or specifies invalid values
-
resolveCheckpoint
public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException
Description copied from interface:CheckpointStorage
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location supports reading the checkpoint metadata, or disposing the checkpoint storage location.- Specified by:
resolveCheckpoint
in interfaceCheckpointStorage
- Parameters:
pointer
- The external checkpoint pointer to resolve.- Returns:
- The checkpoint location handle.
- Throws:
IOException
- Thrown, if the state backend does not understand the pointer, or if the pointer could not be resolved due to an I/O error.
-
createCheckpointStorage
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException
Description copied from interface:CheckpointStorage
Creates a storage for checkpoints for the given job. The checkpoint storage is used to write checkpoint data and metadata.- Specified by:
createCheckpointStorage
in interfaceCheckpointStorage
- Parameters:
jobId
- The job to store checkpoint data for.- Returns:
- A checkpoint storage for the given job.
- Throws:
IOException
- Thrown if the checkpoint storage cannot be initialized.
-
-