T
- Type of statepublic class ZooKeeperStateHandleStore<T extends Serializable> extends Object implements StateHandleStore<T,IntegerResourceVersion>
RetrievableStateStorageHelper
and writes the
returned state handle to ZooKeeper. The ZooKeeper node can be locked by creating an ephemeral
child in the lock sub-path. The implementation only allows the deletion of the ZooKeeper node if
the lock sub-path has no children. That way we protect concurrent accesses from different
ZooKeeperStateHandleStore instances.
Added state is persisted via RetrievableStateHandles
, which in
turn are written to ZooKeeper. This level of indirection is necessary to keep the amount of data
in ZooKeeper small. ZooKeeper is build for data in the KB range whereas state can grow to
multiple MBs.
State modifications require some care, because it is possible that certain failures bring the state handle backend and ZooKeeper out of sync.
ZooKeeper holds the ground truth about state handles, i.e. the following holds:
State handle in ZooKeeper and not marked for deletion => State handle exists
State handle in ZooKeeper and marked for deletion => State handle might or might not be deleted, yet
State handle not in ZooKeeper => State handle does not exist
There can be lingering state handles when failures happen during operation. They need to be cleaned up manually (see FLINK-2513 about a possible way to overcome this).
StateHandleStore.AlreadyExistException, StateHandleStore.NotExistException
Constructor and Description |
---|
ZooKeeperStateHandleStore(org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework client,
RetrievableStateStorageHelper<T> storage)
Creates a
ZooKeeperStateHandleStore . |
Modifier and Type | Method and Description |
---|---|
RetrievableStateHandle<T> |
addAndLock(String pathInZooKeeper,
T state)
Creates a state handle, stores it in ZooKeeper and locks it.
|
void |
clearEntries()
Recursively deletes all children.
|
IntegerResourceVersion |
exists(String pathInZooKeeper)
Returns the version of the node if it exists and is not marked for deletion or
-1
. |
List<Tuple2<RetrievableStateHandle<T>,String>> |
getAllAndLock()
Gets all available state handles from ZooKeeper and locks the respective state nodes.
|
Collection<String> |
getAllHandles()
Return a list of all valid name for state handles.
|
RetrievableStateHandle<T> |
getAndLock(String pathInZooKeeper)
Gets the
RetrievableStateHandle stored in the given ZooKeeper node and locks it. |
void |
release(String pathInZooKeeper)
Releases the lock from the node under the given ZooKeeper path.
|
void |
releaseAll()
Releases all lock nodes of this ZooKeeperStateHandleStore.
|
boolean |
releaseAndTryRemove(String pathInZooKeeper)
Releases the lock for the given state node and tries to remove the state node if it is no
longer locked.
|
void |
replace(String pathInZooKeeper,
IntegerResourceVersion expectedVersion,
T state)
Replaces a state handle in ZooKeeper and discards the old state handle.
|
protected void |
setStateHandle(String path,
byte[] serializedStateHandle,
int expectedVersion) |
String |
toString() |
public ZooKeeperStateHandleStore(org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework client, RetrievableStateStorageHelper<T> storage)
ZooKeeperStateHandleStore
.client
- The Curator ZooKeeper client. Important: It is expected that
the client's namespace ensures that the root path is exclusive for all state handles
managed by this instance, e.g. client.usingNamespace("/stateHandles")
storage
- to persist the actual state and whose returned state handle is then written to
ZooKeeperpublic RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T state) throws PossibleInconsistentStateException, Exception
ZooKeeperStateHandleStore
instance as long as this instance remains
connected to ZooKeeper.
Important: This will not store the actual state in ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection makes sure that data in ZooKeeper is small.
The operation will fail if there is already a node under the given path.
addAndLock
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Destination path in ZooKeeper (expected to *not* exist yet)state
- State to be addedRetrievableStateHandle
.PossibleInconsistentStateException
- if the write-to-ZooKeeper operation failed. This
indicates that it's not clear whether the new state was successfully written to ZooKeeper
or not. Proper error handling has to be applied on the caller's side.Exception
- If a ZooKeeper or state handle operation failspublic void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersion, T state) throws Exception
replace
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Destination path in ZooKeeper (expected to exist and start with a '/')expectedVersion
- Expected version of the node to replacestate
- The new state to replace the old oneException
- If a ZooKeeper or state handle operation failsIllegalStateException
- if the replace operation shall be performed on a node that is
not locked for this specific instance.@VisibleForTesting protected void setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion) throws Exception
Exception
public IntegerResourceVersion exists(String pathInZooKeeper) throws Exception
-1
.exists
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path in ZooKeeper to check-1
otherwise.Exception
- If the ZooKeeper operation failspublic RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception
RetrievableStateHandle
stored in the given ZooKeeper node and locks it. A
locked node cannot be removed by another ZooKeeperStateHandleStore
instance as long
as this instance remains connected to ZooKeeper.getAndLock
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path to the ZooKeeper node which contains the state handleIOException
- Thrown if the method failed to deserialize the stored state handleException
- Thrown if a ZooKeeper operation failedpublic Collection<String> getAllHandles() throws Exception
StateHandleStore
getAllHandles
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- if get handle operation failedpublic List<Tuple2<RetrievableStateHandle<T>,String>> getAllAndLock() throws Exception
If there is a concurrent modification, the operation is retried until it succeeds.
getAllAndLock
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- If a ZooKeeper or state handle operation failspublic boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception
releaseAndTryRemove
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path of state handle to removetrue
if the state handle could be deleted; false
, if the handle is
locked by another connection.Exception
- If the ZooKeeper operation or discarding the state handle failspublic void release(String pathInZooKeeper) throws Exception
release
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path describing the ZooKeeper nodeException
- if the delete operation of the lock node failspublic void releaseAll() throws Exception
releaseAll
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- if the delete operation of a lock file failspublic void clearEntries() throws Exception
clearEntries
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- ZK errorsCopyright © 2014–2024 The Apache Software Foundation. All rights reserved.