T
- Type of statepublic class ZooKeeperStateHandleStore<T extends Serializable> extends Object implements StateHandleStore<T,IntegerResourceVersion>
RetrievableStateStorageHelper
and writes the
returned state handle to ZooKeeper. The ZooKeeper node can be locked by creating an ephemeral
child and only allowing the deletion of the ZooKeeper node if it does not have any children. That
way we protect concurrent accesses from different ZooKeeperStateHandleStore instances.
Added state is persisted via RetrievableStateHandles
, which in
turn are written to ZooKeeper. This level of indirection is necessary to keep the amount of data
in ZooKeeper small. ZooKeeper is build for data in the KB range whereas state can grow to
multiple MBs.
State modifications require some care, because it is possible that certain failures bring the state handle backend and ZooKeeper out of sync.
ZooKeeper holds the ground truth about state handles, i.e. the following holds:
State handle in ZooKeeper => State handle exists
But not:
State handle exists => State handle in ZooKeeper
There can be lingering state handles when failures happen during operation. They need to be cleaned up manually (see FLINK-2513 about a possible way to overcome this).
StateHandleStore.AlreadyExistException, StateHandleStore.NotExistException
Constructor and Description |
---|
ZooKeeperStateHandleStore(org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework client,
RetrievableStateStorageHelper<T> storage)
Creates a
ZooKeeperStateHandleStore . |
Modifier and Type | Method and Description |
---|---|
RetrievableStateHandle<T> |
addAndLock(String pathInZooKeeper,
T state)
Creates a state handle, stores it in ZooKeeper and locks it.
|
void |
clearEntries()
Recursively deletes all children.
|
IntegerResourceVersion |
exists(String pathInZooKeeper)
Returns the version of the node if it exists or
-1 if it doesn't. |
List<Tuple2<RetrievableStateHandle<T>,String>> |
getAllAndLock()
Gets all available state handles from ZooKeeper and locks the respective state nodes.
|
Collection<String> |
getAllHandles()
Return a list of all valid paths for state handles.
|
RetrievableStateHandle<T> |
getAndLock(String pathInZooKeeper)
Gets the
RetrievableStateHandle stored in the given ZooKeeper node and locks it. |
void |
release(String pathInZooKeeper)
Releases the lock from the node under the given ZooKeeper path.
|
void |
releaseAll()
Releases all lock nodes of this ZooKeeperStateHandleStore.
|
boolean |
releaseAndTryRemove(String pathInZooKeeper)
Releases the lock for the given state node and tries to remove the state node if it is no
longer locked.
|
void |
releaseAndTryRemoveAll()
Releases all lock nodes of this ZooKeeperStateHandleStores and tries to remove all state
nodes which are not locked anymore.
|
void |
replace(String pathInZooKeeper,
IntegerResourceVersion expectedVersion,
T state)
Replaces a state handle in ZooKeeper and discards the old state handle.
|
protected void |
setStateHandle(String path,
byte[] serializedStateHandle,
int expectedVersion) |
String |
toString() |
public ZooKeeperStateHandleStore(org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework client, RetrievableStateStorageHelper<T> storage)
ZooKeeperStateHandleStore
.client
- The Curator ZooKeeper client. Important: It is expected that
the client's namespace ensures that the root path is exclusive for all state handles
managed by this instance, e.g. client.usingNamespace("/stateHandles")
storage
- to persist the actual state and whose returned state handle is then written to
ZooKeeperpublic RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T state) throws PossibleInconsistentStateException, Exception
ZooKeeperStateHandleStore
instance as long as this instance remains
connected to ZooKeeper.
Important: This will not store the actual state in ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection makes sure that data in ZooKeeper is small.
The operation will fail if there is already a node under the given path.
addAndLock
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Destination path in ZooKeeper (expected to *not* exist yet)state
- State to be addedRetrievableStateHandle
.PossibleInconsistentStateException
- if the write-to-ZooKeeper operation failed. This
indicates that it's not clear whether the new state was successfully written to ZooKeeper
or not. Proper error handling has to be applied on the caller's side.Exception
- If a ZooKeeper or state handle operation failspublic void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersion, T state) throws Exception
replace
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Destination path in ZooKeeper (expected to exist and start with a '/')expectedVersion
- Expected version of the node to replacestate
- The new state to replace the old oneException
- If a ZooKeeper or state handle operation fails@VisibleForTesting protected void setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion) throws Exception
Exception
public IntegerResourceVersion exists(String pathInZooKeeper) throws Exception
-1
if it doesn't.exists
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path in ZooKeeper to check-1
otherwise.Exception
- If the ZooKeeper operation failspublic RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception
RetrievableStateHandle
stored in the given ZooKeeper node and locks it. A
locked node cannot be removed by another ZooKeeperStateHandleStore
instance as long
as this instance remains connected to ZooKeeper.getAndLock
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path to the ZooKeeper node which contains the state handleIOException
- Thrown if the method failed to deserialize the stored state handleException
- Thrown if a ZooKeeper operation failedpublic Collection<String> getAllHandles() throws Exception
getAllHandles
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- if a ZooKeeper operation failspublic List<Tuple2<RetrievableStateHandle<T>,String>> getAllAndLock() throws Exception
If there is a concurrent modification, the operation is retried until it succeeds.
getAllAndLock
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- If a ZooKeeper or state handle operation failspublic boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception
RetrievableStateHandle
stored under the given state
node if any.releaseAndTryRemove
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path of state handle to removeException
- If the ZooKeeper operation or discarding the state handle failspublic void releaseAndTryRemoveAll() throws Exception
The delete operation is executed asynchronously
releaseAndTryRemoveAll
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- if the delete operation failspublic void release(String pathInZooKeeper) throws Exception
release
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
pathInZooKeeper
- Path describing the ZooKeeper nodeException
- if the delete operation of the lock node failspublic void releaseAll() throws Exception
releaseAll
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- if the delete operation of a lock file failspublic void clearEntries() throws Exception
clearEntries
in interface StateHandleStore<T extends Serializable,IntegerResourceVersion>
Exception
- ZK errorsCopyright © 2014–2022 The Apache Software Foundation. All rights reserved.