Class KubernetesStateHandleStore<T extends Serializable>
- java.lang.Object
-
- org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore<T>
-
- Type Parameters:
T
- Type of the state we're storing.
- All Implemented Interfaces:
StateHandleStore<T,StringResourceVersion>
public class KubernetesStateHandleStore<T extends Serializable> extends Object implements StateHandleStore<T,StringResourceVersion>
Class which stores state via the providedRetrievableStateStorageHelper
and writes the returned state handle to ConfigMap.Added state is persisted via
RetrievableStateHandles
, which in turn are written to ConfigMap. This level of indirection is necessary to keep the amount of data in ConfigMap small. ConfigMap is build for data less than 1MB whereas state can grow to multiple MBs and GBs.This is a very different implementation with
ZooKeeperStateHandleStore
. Benefit from theFlinkKubeClient.checkAndUpdateConfigMap(java.lang.String, java.util.function.Function<org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap, java.util.Optional<org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap>>)
transactional operation, we could guarantee that only the leader could update the store. Then we will completely get rid of the lock-and-release in Zookeeper implementation.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.persistence.StateHandleStore
StateHandleStore.AlreadyExistException, StateHandleStore.NotExistException
-
-
Constructor Summary
Constructors Constructor Description KubernetesStateHandleStore(FlinkKubeClient kubeClient, String configMapName, RetrievableStateStorageHelper<T> storage, Predicate<String> configMapKeyFilter, String lockIdentity)
Creates aKubernetesStateHandleStore
.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description RetrievableStateHandle<T>
addAndLock(String key, T state)
Creates a state handle, stores it in ConfigMap.void
clearEntries()
Remove all the filtered keys in the ConfigMap.StringResourceVersion
exists(String key)
Returns the resource version of the ConfigMap.List<Tuple2<RetrievableStateHandle<T>,String>>
getAllAndLock()
Gets all available state handles from Kubernetes.Collection<String>
getAllHandles()
Return a list of all valid keys for state handles.RetrievableStateHandle<T>
getAndLock(String key)
Gets theRetrievableStateHandle
stored in the given ConfigMap.void
release(String name)
Releases the lock on the specific state handle so that it could be deleted by otherStateHandleStore
.void
releaseAll()
Releases all the locks on corresponding state handle so that it could be deleted by otherStateHandleStore
.boolean
releaseAndTryRemove(String key)
Remove the key in state config map.void
replace(String key, StringResourceVersion resourceVersion, T state)
Replaces a state handle in ConfigMap and discards the old state handle.String
toString()
-
-
-
Constructor Detail
-
KubernetesStateHandleStore
public KubernetesStateHandleStore(FlinkKubeClient kubeClient, String configMapName, RetrievableStateStorageHelper<T> storage, Predicate<String> configMapKeyFilter, @Nullable String lockIdentity)
Creates aKubernetesStateHandleStore
.- Parameters:
kubeClient
- The Kubernetes client.storage
- To persist the actual state and whose returned state handle is then written to ConfigMapconfigMapName
- ConfigMap to store the state handle store pointerconfigMapKeyFilter
- filter to get the expected keys for state handlelockIdentity
- lock identity of current HA service
-
-
Method Detail
-
addAndLock
public RetrievableStateHandle<T> addAndLock(String key, T state) throws PossibleInconsistentStateException, Exception
Creates a state handle, stores it in ConfigMap. We could guarantee that only the leader could update the ConfigMap. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.- Specified by:
addAndLock
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Parameters:
key
- Key in ConfigMapstate
- State to be added- Throws:
StateHandleStore.AlreadyExistException
- if the name already existsPossibleInconsistentStateException
- if the write-to-Kubernetes operation failed. This indicates that it's not clear whether the new state was successfully written to Kubernetes or not. No state was discarded. Proper error handling has to be applied on the caller's side.Exception
- if persisting state or writing state handle failed
-
replace
public void replace(String key, StringResourceVersion resourceVersion, T state) throws Exception
Replaces a state handle in ConfigMap and discards the old state handle. Wo do not lock resource version and then replace in Kubernetes. Since the ConfigMap is periodically updated by leader, the resource version changes very fast. We use a "check-existence and update" transactional operation instead.- Specified by:
replace
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Parameters:
key
- Key in ConfigMapresourceVersion
- resource version when checking existence viaexists(java.lang.String)
.state
- State to be added- Throws:
StateHandleStore.NotExistException
- if the name does not existPossibleInconsistentStateException
- if a failure occurred during the update operation. It's unclear whether the operation actually succeeded or not. No state was discarded. The method's caller should handle this case properly.Exception
- if persisting state or writing state handle failed
-
exists
public StringResourceVersion exists(String key) throws Exception
Returns the resource version of the ConfigMap.- Specified by:
exists
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Parameters:
key
- Key in ConfigMap- Returns:
- resource version in
StringResourceVersion
format. - Throws:
Exception
- if the check existence operation failed
-
getAndLock
public RetrievableStateHandle<T> getAndLock(String key) throws Exception
Gets theRetrievableStateHandle
stored in the given ConfigMap.- Specified by:
getAndLock
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Parameters:
key
- Key in ConfigMap- Returns:
- The retrieved state handle from the specified ConfigMap and key
- Throws:
IOException
- if the method failed to deserialize the stored state handleStateHandleStore.NotExistException
- when the name does not existException
- if get state handle from ConfigMap failed
-
getAllAndLock
public List<Tuple2<RetrievableStateHandle<T>,String>> getAllAndLock()
Gets all available state handles from Kubernetes.- Specified by:
getAllAndLock
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Returns:
- All state handles from ConfigMap.
-
getAllHandles
public Collection<String> getAllHandles() throws Exception
Return a list of all valid keys for state handles.- Specified by:
getAllHandles
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Returns:
- List of valid state handle keys in Kubernetes ConfigMap
- Throws:
Exception
- if get state handle names from ConfigMap failed.
-
releaseAndTryRemove
public boolean releaseAndTryRemove(String key) throws Exception
Remove the key in state config map. As well as the state on external storage will be removed. It returns theRetrievableStateHandle
stored under the given state node if any.- Specified by:
releaseAndTryRemove
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Parameters:
key
- Key to be removed from ConfigMap- Returns:
- True if the state handle isn't listed anymore.
- Throws:
Exception
- if removing the key or discarding the state failed
-
clearEntries
public void clearEntries() throws Exception
Remove all the filtered keys in the ConfigMap.- Specified by:
clearEntries
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
- Throws:
Exception
- when removing the keys failed
-
release
public void release(String name)
Description copied from interface:StateHandleStore
Releases the lock on the specific state handle so that it could be deleted by otherStateHandleStore
. If no lock exists or the underlying storage does not support, nothing will happen.- Specified by:
release
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
-
releaseAll
public void releaseAll()
Description copied from interface:StateHandleStore
Releases all the locks on corresponding state handle so that it could be deleted by otherStateHandleStore
. If no lock exists or the underlying storage does not support, nothing will happen.- Specified by:
releaseAll
in interfaceStateHandleStore<T extends Serializable,StringResourceVersion>
-
-