T
- Type of statepublic class KubernetesStateHandleStore<T extends Serializable> extends Object implements StateHandleStore<T,StringResourceVersion>
RetrievableStateStorageHelper
and writes the
returned state handle to ConfigMap.
Added state is persisted via RetrievableStateHandles
, which in
turn are written to ConfigMap. This level of indirection is necessary to keep the amount of data
in ConfigMap small. ConfigMap is build for data less than 1MB whereas state can grow to multiple
MBs and GBs.
This is a very different implementation with ZooKeeperStateHandleStore
. Benefit from the FlinkKubeClient.checkAndUpdateConfigMap(java.lang.String, java.util.function.Function<org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap, java.util.Optional<org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap>>)
transactional operation, we could guarantee that only
the leader could update the store. Then we will completely get rid of the lock-and-release in
Zookeeper implementation.
StateHandleStore.AlreadyExistException, StateHandleStore.NotExistException
Constructor and Description |
---|
KubernetesStateHandleStore(FlinkKubeClient kubeClient,
String configMapName,
RetrievableStateStorageHelper<T> storage,
java.util.function.Predicate<String> configMapKeyFilter,
String lockIdentity)
Creates a
KubernetesStateHandleStore . |
Modifier and Type | Method and Description |
---|---|
RetrievableStateHandle<T> |
addAndLock(String key,
T state)
Creates a state handle, stores it in ConfigMap.
|
void |
clearEntries()
Remove all the filtered keys in the ConfigMap.
|
StringResourceVersion |
exists(String key)
Returns the resource version of the ConfigMap.
|
List<Tuple2<RetrievableStateHandle<T>,String>> |
getAllAndLock()
Gets all available state handles from Kubernetes.
|
Collection<String> |
getAllHandles()
Return a list of all valid keys for state handles.
|
RetrievableStateHandle<T> |
getAndLock(String key)
Gets the
RetrievableStateHandle stored in the given ConfigMap. |
void |
release(String name)
Releases the lock on the specific state handle so that it could be deleted by other
StateHandleStore . |
void |
releaseAll()
Releases all the locks on corresponding state handle so that it could be deleted by other
StateHandleStore . |
boolean |
releaseAndTryRemove(String key)
Remove the key in state config map.
|
void |
releaseAndTryRemoveAll()
Remove all the state handle keys in the ConfigMap and discard the states.
|
void |
replace(String key,
StringResourceVersion resourceVersion,
T state)
Replaces a state handle in ConfigMap and discards the old state handle.
|
String |
toString() |
public KubernetesStateHandleStore(FlinkKubeClient kubeClient, String configMapName, RetrievableStateStorageHelper<T> storage, java.util.function.Predicate<String> configMapKeyFilter, String lockIdentity)
KubernetesStateHandleStore
.kubeClient
- The Kubernetes client.storage
- To persist the actual state and whose returned state handle is then written to
ConfigMapconfigMapName
- ConfigMap to store the state handle store pointerconfigMapKeyFilter
- filter to get the expected keys for state handlelockIdentity
- lock identity of current HA servicepublic RetrievableStateHandle<T> addAndLock(String key, T state) throws PossibleInconsistentStateException, Exception
addAndLock
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
key
- Key in ConfigMapstate
- State to be addedAlreadyExistException
- if the name already existsPossibleInconsistentStateException
- if the write-to-Kubernetes operation failed. This
indicates that it's not clear whether the new state was successfully written to
Kubernetes or not. No state was discarded. Proper error handling has to be applied on the
caller's side.Exception
- if persisting state or writing state handle failedpublic void replace(String key, StringResourceVersion resourceVersion, T state) throws Exception
replace
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
key
- Key in ConfigMapresourceVersion
- resource version when checking existence via exists(java.lang.String)
.state
- State to be addedNotExistException
- if the name does not existPossibleInconsistentStateException
- if a failure occurred during the update operation.
It's unclear whether the operation actually succeeded or not. No state was discarded. The
method's caller should handle this case properly.Exception
- if persisting state or writing state handle failedpublic StringResourceVersion exists(String key) throws Exception
exists
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
key
- Key in ConfigMapStringResourceVersion
format.Exception
- if the check existence operation failedpublic RetrievableStateHandle<T> getAndLock(String key) throws Exception
RetrievableStateHandle
stored in the given ConfigMap.getAndLock
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
key
- Key in ConfigMapIOException
- if the method failed to deserialize the stored state handleNotExistException
- when the name does not existException
- if get state handle from ConfigMap failedpublic List<Tuple2<RetrievableStateHandle<T>,String>> getAllAndLock()
getAllAndLock
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
public Collection<String> getAllHandles() throws Exception
getAllHandles
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
Exception
- if get state handle names from ConfigMap failed.public boolean releaseAndTryRemove(String key) throws Exception
RetrievableStateHandle
stored under the given state node if any.releaseAndTryRemove
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
key
- Key to be removed from ConfigMapException
- if removing the key or discarding the state failedpublic void releaseAndTryRemoveAll() throws Exception
releaseAndTryRemoveAll
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
Exception
- when removing the keys or discarding the state failedpublic void clearEntries() throws Exception
clearEntries
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
Exception
- when removing the keys failedpublic void release(String name)
StateHandleStore
StateHandleStore
. If no lock exists or the underlying storage does not support, nothing will
happen.release
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
public void releaseAll()
StateHandleStore
StateHandleStore
. If no lock exists or the underlying storage does not support,
nothing will happen.releaseAll
in interface StateHandleStore<T extends Serializable,StringResourceVersion>
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.