Class ZooKeeperStateHandleStore<T extends Serializable>
- java.lang.Object
-
- org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore<T>
-
- Type Parameters:
T
- Type of state
- All Implemented Interfaces:
StateHandleStore<T,IntegerResourceVersion>
public class ZooKeeperStateHandleStore<T extends Serializable> extends Object implements StateHandleStore<T,IntegerResourceVersion>
Class which stores state via the providedRetrievableStateStorageHelper
and writes the returned state handle to ZooKeeper. The ZooKeeper node can be locked by creating an ephemeral child in the lock sub-path. The implementation only allows the deletion of the ZooKeeper node if the lock sub-path has no children. That way we protect concurrent accesses from different ZooKeeperStateHandleStore instances.Added state is persisted via
RetrievableStateHandles
, which in turn are written to ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs.State modifications require some care, because it is possible that certain failures bring the state handle backend and ZooKeeper out of sync.
ZooKeeper holds the ground truth about state handles, i.e. the following holds:
State handle in ZooKeeper and not marked for deletion => State handle exists
State handle in ZooKeeper and marked for deletion => State handle might or might not be deleted, yet
State handle not in ZooKeeper => State handle does not exist
There can be lingering state handles when failures happen during operation. They need to be cleaned up manually (see FLINK-2513 about a possible way to overcome this).
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.persistence.StateHandleStore
StateHandleStore.AlreadyExistException, StateHandleStore.NotExistException
-
-
Constructor Summary
Constructors Constructor Description ZooKeeperStateHandleStore(org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework client, RetrievableStateStorageHelper<T> storage)
Creates aZooKeeperStateHandleStore
.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description RetrievableStateHandle<T>
addAndLock(String pathInZooKeeper, T state)
Creates a state handle, stores it in ZooKeeper and locks it.void
clearEntries()
Recursively deletes all children.IntegerResourceVersion
exists(String pathInZooKeeper)
Returns the version of the node if it exists and is not marked for deletion or-1
.List<Tuple2<RetrievableStateHandle<T>,String>>
getAllAndLock()
Gets all available state handles from ZooKeeper and locks the respective state nodes.Collection<String>
getAllHandles()
Return a list of all valid name for state handles.RetrievableStateHandle<T>
getAndLock(String pathInZooKeeper)
Gets theRetrievableStateHandle
stored in the given ZooKeeper node and locks it.void
release(String pathInZooKeeper)
Releases the lock from the node under the given ZooKeeper path.void
releaseAll()
Releases all lock nodes of this ZooKeeperStateHandleStore.boolean
releaseAndTryRemove(String pathInZooKeeper)
Releases the lock for the given state node and tries to remove the state node if it is no longer locked.void
replace(String pathInZooKeeper, IntegerResourceVersion expectedVersion, T state)
Replaces a state handle in ZooKeeper and discards the old state handle.protected void
setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion)
String
toString()
-
-
-
Constructor Detail
-
ZooKeeperStateHandleStore
public ZooKeeperStateHandleStore(org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework client, RetrievableStateStorageHelper<T> storage)
Creates aZooKeeperStateHandleStore
.- Parameters:
client
- The Curator ZooKeeper client. Important: It is expected that the client's namespace ensures that the root path is exclusive for all state handles managed by this instance, e.g.client.usingNamespace("/stateHandles")
storage
- to persist the actual state and whose returned state handle is then written to ZooKeeper
-
-
Method Detail
-
addAndLock
public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T state) throws PossibleInconsistentStateException, Exception
Creates a state handle, stores it in ZooKeeper and locks it. A locked node cannot be removed by anotherZooKeeperStateHandleStore
instance as long as this instance remains connected to ZooKeeper.Important: This will not store the actual state in ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection makes sure that data in ZooKeeper is small.
The operation will fail if there is already a node under the given path.
- Specified by:
addAndLock
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Parameters:
pathInZooKeeper
- Destination path in ZooKeeper (expected to *not* exist yet)state
- State to be added- Returns:
- The Created
RetrievableStateHandle
. - Throws:
PossibleInconsistentStateException
- if the write-to-ZooKeeper operation failed. This indicates that it's not clear whether the new state was successfully written to ZooKeeper or not. Proper error handling has to be applied on the caller's side.Exception
- If a ZooKeeper or state handle operation fails
-
replace
public void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersion, T state) throws Exception
Replaces a state handle in ZooKeeper and discards the old state handle.- Specified by:
replace
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Parameters:
pathInZooKeeper
- Destination path in ZooKeeper (expected to exist and start with a '/')expectedVersion
- Expected version of the node to replacestate
- The new state to replace the old one- Throws:
Exception
- If a ZooKeeper or state handle operation failsIllegalStateException
- if the replace operation shall be performed on a node that is not locked for this specific instance.
-
setStateHandle
@VisibleForTesting protected void setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion) throws Exception
- Throws:
Exception
-
exists
public IntegerResourceVersion exists(String pathInZooKeeper) throws Exception
Returns the version of the node if it exists and is not marked for deletion or-1
.- Specified by:
exists
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Parameters:
pathInZooKeeper
- Path in ZooKeeper to check- Returns:
- Version of the ZNode if the path exists and is not marked for deletion,
-1
otherwise. - Throws:
Exception
- If the ZooKeeper operation fails
-
getAndLock
public RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception
Gets theRetrievableStateHandle
stored in the given ZooKeeper node and locks it. A locked node cannot be removed by anotherZooKeeperStateHandleStore
instance as long as this instance remains connected to ZooKeeper.- Specified by:
getAndLock
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Parameters:
pathInZooKeeper
- Path to the ZooKeeper node which contains the state handle- Returns:
- The retrieved state handle from the specified ZooKeeper node
- Throws:
IOException
- Thrown if the method failed to deserialize the stored state handleException
- Thrown if a ZooKeeper operation failed
-
getAllHandles
public Collection<String> getAllHandles() throws Exception
Description copied from interface:StateHandleStore
Return a list of all valid name for state handles. The result might contain nodes that are marked for deletion.- Specified by:
getAllHandles
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Returns:
- List of valid state handle name. The name is key name in ConfigMap or child path name in ZooKeeper.
- Throws:
Exception
- if get handle operation failed
-
getAllAndLock
public List<Tuple2<RetrievableStateHandle<T>,String>> getAllAndLock() throws Exception
Gets all available state handles from ZooKeeper and locks the respective state nodes.If there is a concurrent modification, the operation is retried until it succeeds.
- Specified by:
getAllAndLock
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Returns:
- All state handles from ZooKeeper.
- Throws:
Exception
- If a ZooKeeper or state handle operation fails
-
releaseAndTryRemove
public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception
Releases the lock for the given state node and tries to remove the state node if it is no longer locked.- Specified by:
releaseAndTryRemove
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Parameters:
pathInZooKeeper
- Path of state handle to remove- Returns:
true
if the state handle could be deleted;false
, if the handle is locked by another connection.- Throws:
Exception
- If the ZooKeeper operation or discarding the state handle fails
-
release
public void release(String pathInZooKeeper) throws Exception
Releases the lock from the node under the given ZooKeeper path. If no lock exists, then nothing happens.- Specified by:
release
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Parameters:
pathInZooKeeper
- Path describing the ZooKeeper node- Throws:
Exception
- if the delete operation of the lock node fails
-
releaseAll
public void releaseAll() throws Exception
Releases all lock nodes of this ZooKeeperStateHandleStore.- Specified by:
releaseAll
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Throws:
Exception
- if the delete operation of a lock file fails
-
clearEntries
public void clearEntries() throws Exception
Recursively deletes all children.- Specified by:
clearEntries
in interfaceStateHandleStore<T extends Serializable,IntegerResourceVersion>
- Throws:
Exception
- ZK errors
-
-