T
- Type of statepublic class ZooKeeperStateHandleStore<T extends Serializable> extends Object
RetrievableStateStorageHelper
and writes the
returned state handle to ZooKeeper. The ZooKeeper node can be locked by creating an ephemeral
child and only allowing the deletion of the ZooKeeper node if it does not have any children.
That way we protect concurrent accesses from different ZooKeeperStateHandleStore instances.
Added state is persisted via RetrievableStateHandles
,
which in turn are written to ZooKeeper. This level of indirection is necessary to keep the
amount of data in ZooKeeper small. ZooKeeper is build for data in the KB range whereas
state can grow to multiple MBs.
State modifications require some care, because it is possible that certain failures bring the state handle backend and ZooKeeper out of sync.
ZooKeeper holds the ground truth about state handles, i.e. the following holds:
State handle in ZooKeeper => State handle exists
But not:
State handle exists => State handle in ZooKeeper
There can be lingering state handles when failures happen during operation. They need to be cleaned up manually (see FLINK-2513 about a possible way to overcome this).
Constructor and Description |
---|
ZooKeeperStateHandleStore(org.apache.curator.framework.CuratorFramework client,
RetrievableStateStorageHelper<T> storage)
Creates a
ZooKeeperStateHandleStore . |
Modifier and Type | Method and Description |
---|---|
RetrievableStateHandle<T> |
addAndLock(String pathInZooKeeper,
T state)
Creates a state handle, stores it in ZooKeeper and locks it.
|
void |
deleteChildren()
Recursively deletes all children.
|
int |
exists(String pathInZooKeeper)
Returns the version of the node if it exists or
-1 if it doesn't. |
List<Tuple2<RetrievableStateHandle<T>,String>> |
getAllAndLock()
Gets all available state handles from ZooKeeper and locks the respective state nodes.
|
Collection<String> |
getAllPaths()
Return a list of all valid paths for state handles.
|
RetrievableStateHandle<T> |
getAndLock(String pathInZooKeeper)
Gets the
RetrievableStateHandle stored in the given ZooKeeper node and locks it. |
protected String |
getLockPath(String rootPath)
Returns the path for the lock node relative to the given path.
|
void |
release(String pathInZooKeeper)
Releases the lock from the node under the given ZooKeeper path.
|
void |
releaseAll()
Releases all lock nodes of this ZooKeeperStateHandleStore.
|
boolean |
releaseAndTryRemove(String pathInZooKeeper)
Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
|
void |
releaseAndTryRemoveAll()
Releases all lock nodes of this ZooKeeperStateHandleStores and tries to remove all state nodes which
are not locked anymore.
|
void |
replace(String pathInZooKeeper,
int expectedVersion,
T state)
Replaces a state handle in ZooKeeper and discards the old state handle.
|
public ZooKeeperStateHandleStore(org.apache.curator.framework.CuratorFramework client, RetrievableStateStorageHelper<T> storage)
ZooKeeperStateHandleStore
.client
- The Curator ZooKeeper client. Important: It is
expected that the client's namespace ensures that the root
path is exclusive for all state handles managed by this
instance, e.g. client.usingNamespace("/stateHandles")
storage
- to persist the actual state and whose returned state handle is then written
to ZooKeeperpublic RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T state) throws Exception
ZooKeeperStateHandleStore
instance as long as this instance remains connected
to ZooKeeper.
Important: This will not store the actual state in ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection makes sure that data in ZooKeeper is small.
The operation will fail if there is already an node under the given path
pathInZooKeeper
- Destination path in ZooKeeper (expected to *not* exist yet)state
- State to be addedRetrievableStateHandle
.Exception
- If a ZooKeeper or state handle operation failspublic void replace(String pathInZooKeeper, int expectedVersion, T state) throws Exception
pathInZooKeeper
- Destination path in ZooKeeper (expected to exist and start with a '/')expectedVersion
- Expected version of the node to replacestate
- The new state to replace the old oneException
- If a ZooKeeper or state handle operation failspublic int exists(String pathInZooKeeper) throws Exception
-1
if it doesn't.pathInZooKeeper
- Path in ZooKeeper to check-1
otherwise.Exception
- If the ZooKeeper operation failspublic RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception
RetrievableStateHandle
stored in the given ZooKeeper node and locks it. A
locked node cannot be removed by another ZooKeeperStateHandleStore
instance as long
as this instance remains connected to ZooKeeper.pathInZooKeeper
- Path to the ZooKeeper node which contains the state handleIOException
- Thrown if the method failed to deserialize the stored state handleException
- Thrown if a ZooKeeper operation failedpublic Collection<String> getAllPaths() throws Exception
Exception
- if a ZooKeeper operation failspublic List<Tuple2<RetrievableStateHandle<T>,String>> getAllAndLock() throws Exception
If there is a concurrent modification, the operation is retried until it succeeds.
Exception
- If a ZooKeeper or state handle operation fails@Nullable public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception
RetrievableStateHandle
stored under the given state node if any.pathInZooKeeper
- Path of state handle to removeException
- If the ZooKeeper operation or discarding the state handle failspublic void releaseAndTryRemoveAll() throws Exception
The delete operation is executed asynchronously
Exception
- if the delete operation failspublic void release(String pathInZooKeeper) throws Exception
pathInZooKeeper
- Path describing the ZooKeeper nodeException
- if the delete operation of the lock node failspublic void releaseAll() throws Exception
Exception
- if the delete operation of a lock file failspublic void deleteChildren() throws Exception
Exception
- ZK errorsCopyright © 2014–2020 The Apache Software Foundation. All rights reserved.