################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABCMeta
from py4j.java_gateway import get_java_class
from typing import Optional
from pyflink.java_gateway import get_gateway
__all__ = [
'CheckpointStorage',
'JobManagerCheckpointStorage',
'FileSystemCheckpointStorage',
'CustomCheckpointStorage']
def _from_j_checkpoint_storage(j_checkpoint_storage):
if j_checkpoint_storage is None:
return None
gateway = get_gateway()
JCheckpointStorage = gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
JJobManagerCheckpointStorage = gateway.jvm.org.apache.flink.runtime.state.storage \
.JobManagerCheckpointStorage
JFileSystemCheckpointStorage = gateway.jvm.org.apache.flink.runtime.state.storage \
.FileSystemCheckpointStorage
j_clz = j_checkpoint_storage.getClass()
if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
raise TypeError("%s is not an instance of CheckpointStorage." % j_checkpoint_storage)
if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
return JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
return FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_checkpoint_storage)
else:
return CustomCheckpointStorage(j_checkpoint_storage)
class CheckpointStorage(object, metaclass=ABCMeta):
"""
Checkpoint storage defines how :class:`StateBackend`'s store their state for fault-tolerance
in streaming applications. Various implementations store their checkpoints in different fashions
and have different requirements and availability guarantees.
For example, :class:`JobManagerCheckpointStorage` stores checkpoints in the memory of the
`JobManager`. It is lightweight and without additional dependencies but is not scalable
and only supports small state sizes. This checkpoints storage policy is convenient for local
testing and development.
:class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. For systems like HDFS
NFS drives, S3, and GCS, this storage policy supports large state size, in the magnitude of many
terabytes while providing a highly available foundation for streaming applications. This
checkpoint storage policy is recommended for most production deployments.
**Raw Bytes Storage**
The `CheckpointStorage` creates services for raw bytes storage.
The raw bytes storage (through the CheckpointStreamFactory) is the fundamental service that
simply stores bytes in a fault tolerant fashion. This service is used by the JobManager to
store checkpoint and recovery metadata and is typically also used by the keyed- and operator-
state backends to store checkpoint state.
**Serializability**
Implementations need to be serializable(`java.io.Serializable`), because they are distributed
across parallel processes (for distributed execution) together with the streaming application
code.
Because of that `CheckpointStorage` implementations are meant to be like _factories_ that create
the proper state stores that provide access to the persistent layer. That way, the storage
policy can be very lightweight (contain only configurations) which makes it easier to be
serializable.
**Thread Safety**
Checkpoint storage implementations have to be thread-safe. Multiple threads may be creating
streams concurrently.
"""
def __init__(self, j_checkpoint_storage):
self._j_checkpoint_storage = j_checkpoint_storage
[docs]class JobManagerCheckpointStorage(CheckpointStorage):
"""
The `CheckpointStorage` checkpoints state directly to the JobManager's memory (hence the
name), but savepoints will be persisted to a file system.
This checkpoint storage is primarily for experimentation, quick local setups, or for streaming
applications that have very small state: Because it requires checkpoints to go through the
JobManager's memory, larger state will occupy larger portions of the JobManager's main memory,
reducing operational stability. For any other setup, the `FileSystemCheckpointStorage`
should be used. The `FileSystemCheckpointStorage` but checkpoints state directly to files
rather than to the JobManager's memory, thus supporting larger state sizes and more highly
available recovery.
**State Size Considerations**
State checkpointing with this checkpoint storage is subject to the following conditions:
- Each individual state must not exceed the configured maximum state size
(see :func:`get_max_state_size`.
- All state from one task (i.e., the sum of all operator states and keyed states from all
chained operators of the task) must not exceed what the RPC system supports, which is
be default < 10 MB. That limit can be configured up, but that is typically not advised.
- The sum of all states in the application times all retained checkpoints must comfortably
fit into the JobManager's JVM heap space.
**Persistence Guarantees**
For the use cases where the state sizes can be handled by this storage, it does
guarantee persistence for savepoints, externalized checkpoints (of configured), and checkpoints
(when high-availability is configured).
**Configuration**
As for all checkpoint storage, this type can either be configured within the application (by
creating the storage with the respective constructor parameters and setting it on the execution
environment) or by specifying it in the Flink configuration.
If the storage was specified in the application, it may pick up additional configuration
parameters from the Flink configuration. For example, if the backend if configured in the
application without a default savepoint directory, it will pick up a default savepoint
directory specified in the Flink configuration of the running job/cluster. That behavior is
implemented via the :func:`configure` method.
"""
# The default maximal size that the snapshotted memory state may have (5 MiBytes).
DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
def __init__(self,
checkpoint_path=None,
max_state_size=None,
j_jobmanager_checkpoint_storage=None):
"""
Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist
checkpoint metadata to, as well as configuring state thresholds.
WARNING: Increasing the size of this value beyond the default value
(:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
The checkpointed state needs to be send to the JobManager via limited size RPC messages,
and there and the JobManager needs to be able to hold all aggregated state in its memory.
Example:
::
>>> checkpoint_storage = JobManagerCheckpointStorage()
:param checkpoint_path: The path to write checkpoint metadata to. If none, the value from
the runtime configuration will be used.
:param max_state_size: The maximal size of the serialized state. If none, the
:data:`DEFAULT_MAX_STATE_SIZE` will be used.
:param j_jobmanager_checkpoint_storage: For internal use, please keep none.
"""
if j_jobmanager_checkpoint_storage is None:
gateway = get_gateway()
JJobManagerCheckpointStorage = gateway.jvm.org.apache.flink.runtime.state.storage\
.JobManagerCheckpointStorage
JPath = gateway.jvm.org.apache.flink.core.fs.Path
if checkpoint_path is not None:
checkpoint_path = JPath(checkpoint_path)
if max_state_size is None:
max_state_size = JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
j_jobmanager_checkpoint_storage = JJobManagerCheckpointStorage(checkpoint_path,
max_state_size)
super(JobManagerCheckpointStorage, self).__init__(j_jobmanager_checkpoint_storage)
def get_checkpoint_path(self) -> Optional[str]:
"""
Gets the base directory where all the checkpoints are stored.
The job-specific checkpoint directory is created inside this directory.
:return: The base directory for checkpoints.
"""
j_path = self._j_checkpoint_storage.getCheckpointPath()
if j_path is None:
return None
else:
return j_path.toString()
def get_max_state_size(self) -> int:
"""
Gets the maximum size that an individual state can have, as configured in the
constructor. By default :data:`DEFAULT_MAX_STATE_SIZE` will be used.
"""
return self._j_checkpoint_storage.getMaxStateSize()
def get_savepoint_path(self) -> Optional[str]:
"""
Gets the base directory where all the savepoints are stored.
The job-specific savepoint directory is created inside this directory.
:return: The base directory for savepoints.
"""
j_path = self._j_checkpoint_storage.getSavepointPath()
if j_path is None:
return None
else:
return j_path.toString()
def __str__(self):
return self._j_checkpoint_storage.toString()
[docs]class FileSystemCheckpointStorage(CheckpointStorage):
"""
`FileSystemCheckpointStorage` checkpoints state as files to a filesystem.
Each checkpoint will store all its files in a subdirectory that includes the
checkpoints number, such as `hdfs://namenode:port/flink-checkpoints/chk-17/`.
**State Size Considerations**
This checkpoint storage stores small state chunks directly with the metadata, to avoid creating
many small files. The threshold for that is configurable. When increasing this threshold, the
size of the checkpoint metadata increases. The checkpoint metadata of all retained completed
checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem,
unless the threashold `get_min_file_size_threshold` is increased significantly.
**Persistence Guarantees**
Checkpoints from this checkpoint storage are as persistent and available as the filesystem
that it is written to. If the file system is a persistent distributed file system, this
checkpoint storage supports highly available setups. The backend additionally supports
savepoints and externalized checkpoints.
**Configuration**
As for all checkpoint storage policies, this backend can either be configured within the
application (by creating the storage with the respective constructor parameters and setting
it on the execution environment) or by specifying it in the Flink configuration.
If the checkpoint storage was specified in the application, it may pick up additional
configuration parameters from the Flink configuration. For example, if the storage is configured
in the application without a default savepoint directory, it will pick up a default savepoint
directory specified in the Flink configuration of the running job/cluster.
"""
# Maximum size of state that is stored with the metadata, rather than in files (1 MiByte).
MAX_FILE_STATE_THRESHOLD = 1024 * 1024
def __init__(self,
checkpoint_path=None,
file_state_size_threshold=None,
write_buffer_size=-1,
j_filesystem_checkpoint_storage=None):
"""
Creates a new FileSystemCheckpointStorage, setting the paths for the checkpoint data
in a file system.
All file systems for the file system scheme in the URI (e.g., `file://`, `hdfs://`, or
`s3://`) must be accessible via `FileSystem#get`.
For a Job targeting HDFS, this means that the URI must either specify the authority (host
and port), of the Hadoop configuration that describes that information must be in the
classpath.
Example:
::
>>> checkpoint_storage = FileSystemCheckpointStorage("hdfs://checkpoints")
:param checkpoint_path: The path to write checkpoint metadata to. If none, the value from
the runtime configuration will be used.
:param file_state_size_threshold: State below this size will be stored as part of the
metadata, rather than in files. If -1, the value configured
in the runtime configuration will be used, or the default
value (1KB) if nothing is configured.
:param write_buffer_size: Write buffer size used to serialize state. If -1, the value
configured in the runtime configuration will be used, or the
default value (4KB) if nothing is configured.
:param j_filesystem_checkpoint_storage: For internal use, please keep none.
"""
if j_filesystem_checkpoint_storage is None:
gateway = get_gateway()
JFileSystemCheckpointStorage = gateway.jvm.org.apache.flink.runtime.state.storage\
.FileSystemCheckpointStorage
JPath = gateway.jvm.org.apache.flink.core.fs.Path
if checkpoint_path is None:
raise ValueError("checkpoint_path must not be None")
else:
checkpoint_path = JPath(checkpoint_path)
if file_state_size_threshold is None:
file_state_size_threshold = -1
j_filesystem_checkpoint_storage = JFileSystemCheckpointStorage(
checkpoint_path,
file_state_size_threshold,
write_buffer_size)
super(FileSystemCheckpointStorage, self).__init__(j_filesystem_checkpoint_storage)
def get_checkpoint_path(self) -> str:
"""
Gets the base directory where all the checkpoints are stored.
The job-specific checkpoint directory is created inside this directory.
:return: The base directory for checkpoints.
"""
return self._j_checkpoint_storage.getCheckpointPath().toString()
def get_savepoint_path(self) -> Optional[str]:
"""
Gets the base directory where all the savepoints are stored.
The job-specific savepoint directory is created inside this directory.
:return: The base directory for savepoints.
"""
j_path = self._j_checkpoint_storage.getSavepointPath()
if j_path is None:
return None
else:
return j_path.toString()
def get_min_file_size_threshold(self) -> int:
"""
Gets the threshold below which state is stored as part of the metadata, rather than in
file. This threshold ensures the backend does not create a large amount of small files,
where potentially the file pointers are larget than the state itself.
"""
return self._j_checkpoint_storage.getMinFileSizeThreshold()
def get_write_buffer_size(self) -> int:
"""
Gets the write buffer size for created checkpoint streams.
"""
return self._j_checkpoint_storage.getWriteBufferSize()
def __str__(self):
return self._j_checkpoint_storage.toString()
[docs]class CustomCheckpointStorage(CheckpointStorage):
"""
A wrapper of customized java checkpoint storage.
"""
def __init__(self, j_custom_checkpoint_storage):
super(CustomCheckpointStorage, self).__init__(j_custom_checkpoint_storage)