K
- type of key.N
- type of namespace.S
- type of value.public class CopyOnWriteStateMap<K,N,S> extends StateMap<K,N,S>
CopyOnWriteStateMap
sacrifices some peak performance and memory efficiency for
features like incremental rehashing and asynchronous snapshots through copy-on-write.
Copy-on-write tries to minimize the amount of copying by maintaining version meta data for both,
the map structure and the state objects. However, we must often proactively copy state objects
when we hand them to the user.
As for any state backend, user should not keep references on state objects that they obtained from state backends outside the scope of the user function calls.
Some brief maintenance notes:
1) Flattening the underlying data structure from nested maps (namespace) -> (key) -> (state)
to one flat map (key, namespace) -> (state) brings certain performance trade-offs. In theory, the
flat map has one less level of indirection compared to the nested map. However, the nested map
naturally de-duplicates namespace objects for which #equals() is true. This leads to potentially
a lot of redundant namespace objects for the flattened version. Those, in turn, can again
introduce more cache misses because we need to follow the namespace object on all operations to
ensure entry identities. Obviously, copy-on-write can also add memory overhead. So does the meta
data to track copy-on-write requirement (state and entry versions on CopyOnWriteStateMap.StateMapEntry
).
2) A flat map structure is a lot easier when it comes to tracking copy-on-write of the map structure.
3) Nested structure had the (never used) advantage that we can easily drop and iterate whole namespaces. This could give locality advantages for certain access pattern, e.g. iterating a namespace.
4) Serialization format is changed from namespace-prefix compressed (as naturally provided from the old nested structure) to making all entries self contained as (key, namespace, state).
5) Currently, a state map can only grow, but never shrinks on low load. We could easily add this if required.
6) Heap based state backends like this can easily cause a lot of GC activity. Besides using G1 as garbage collector, we should provide an additional state backend that operates on off-heap memory. This would sacrifice peak performance (due to de/serialization of objects) for a lower, but more constant throughput and potentially huge simplifications w.r.t. copy-on-write.
7) We could try a hybrid of a serialized and object based backends, where key and namespace of the entries are both serialized in one byte-array.
9) We could consider smaller types (e.g. short) for the version counting and think about some reset strategy before overflows, when there is no snapshot running. However, this would have to touch all entries in the map.
This class was initially based on the HashMap
implementation of the Android
JDK, but is now heavily customized towards the use case of map for state entries. IMPORTANT: the
contracts for this class rely on the user not holding any references to objects returned by this
map beyond the life cycle of per-element operations. Or phrased differently, all get-update-put
operations on a mapping should be within one call of processElement. Otherwise, the user must
take care of taking deep copies, e.g. for caching purposes.
Modifier and Type | Class and Description |
---|---|
protected static class |
CopyOnWriteStateMap.StateMapEntry<K,N,S>
One entry in the
CopyOnWriteStateMap . |
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_CAPACITY
Default capacity for a
CopyOnWriteStateMap . |
protected TypeSerializer<S> |
stateSerializer
The serializer of the state.
|
Modifier and Type | Method and Description |
---|---|
boolean |
containsKey(K key,
N namespace)
Returns whether this map contains the specified key/namespace composite key.
|
S |
get(K key,
N namespace)
Returns the state for the composite of active key and given namespace.
|
Stream<K> |
getKeys(N namespace) |
InternalKvState.StateIncrementalVisitor<K,N,S> |
getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) |
TypeSerializer<S> |
getStateSerializer() |
Iterator<StateEntry<K,N,S>> |
iterator() |
void |
put(K key,
N namespace,
S value)
Maps the specified key/namespace composite key to the specified value.
|
S |
putAndGetOld(K key,
N namespace,
S state)
Maps the composite of active key and given namespace to the specified state.
|
void |
releaseSnapshot(StateMapSnapshot<K,N,S,? extends StateMap<K,N,S>> snapshotToRelease)
Releases a snapshot for this
CopyOnWriteStateMap . |
void |
remove(K key,
N namespace)
Removes the mapping for the composite of active key and given namespace.
|
S |
removeAndGetOld(K key,
N namespace)
Removes the mapping for the composite of active key and given namespace, returning the state
that was found under the entry.
|
void |
setStateSerializer(TypeSerializer<S> stateSerializer) |
int |
size()
Returns the total number of entries in this
CopyOnWriteStateMap . |
int |
sizeOfNamespace(Object namespace) |
CopyOnWriteStateMapSnapshot<K,N,S> |
stateSnapshot()
Creates a snapshot of this
CopyOnWriteStateMap , to be written in checkpointing. |
<T> void |
transform(K key,
N namespace,
T value,
StateTransformationFunction<S,T> transformation)
Applies the given
StateTransformationFunction to the state (1st input argument),
using the given value as second input argument. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
forEach, spliterator
public static final int DEFAULT_CAPACITY
CopyOnWriteStateMap
. Must be a power of two, greater than
MINIMUM_CAPACITY
and less than MAXIMUM_CAPACITY
.protected TypeSerializer<S> stateSerializer
public int size()
CopyOnWriteStateMap
. This is the sum of
both sub-maps.public S get(K key, N namespace)
StateMap
public boolean containsKey(K key, N namespace)
StateMap
containsKey
in class StateMap<K,N,S>
key
- the key in the composite key to search for. Not null.namespace
- the namespace in the composite key to search for. Not null.true
if this map contains the specified key/namespace composite key, false
otherwise.public void put(K key, N namespace, S value)
StateMap
#putAndGetOld(K, N, S)
(key, Namespace, State) when the caller is not
interested in the old state.public S putAndGetOld(K key, N namespace, S state)
StateMap
putAndGetOld
in class StateMap<K,N,S>
key
- the key. Not null.namespace
- the namespace. Not null.state
- the state. Can be null.null
if there was
no such mapping.public void remove(K key, N namespace)
StateMap
#removeAndGetOld(K, N)
when the caller is not interested in the old
state.public S removeAndGetOld(K key, N namespace)
StateMap
removeAndGetOld
in class StateMap<K,N,S>
key
- the key of the mapping to remove. Not null.namespace
- the namespace of the mapping to remove. Not null.null
if no mapping for the specified key
was found.public <T> void transform(K key, N namespace, T value, StateTransformationFunction<S,T> transformation) throws Exception
StateMap
StateTransformationFunction
to the state (1st input argument),
using the given value as second input argument. The result of StateTransformationFunction.apply(Object, Object)
is then stored as the new state. This
function is basically an optimization for get-update-put pattern.transform
in class StateMap<K,N,S>
key
- the key. Not null.namespace
- the namespace. Not null.value
- the value to use in transforming the state. Can be null.transformation
- the transformation function.Exception
- if some exception happens in the transformation function.@Nonnull public CopyOnWriteStateMapSnapshot<K,N,S> stateSnapshot()
CopyOnWriteStateMap
, to be written in checkpointing. The
snapshot integrity is protected through copy-on-write from the CopyOnWriteStateMap
.
Users should call releaseSnapshot(StateMapSnapshot)
after using the returned object.stateSnapshot
in class StateMap<K,N,S>
CopyOnWriteStateMap
, for checkpointing.public void releaseSnapshot(StateMapSnapshot<K,N,S,? extends StateMap<K,N,S>> snapshotToRelease)
CopyOnWriteStateMap
. This method should be called once a
snapshot is no more needed, so that the CopyOnWriteStateMap
can stop considering this
snapshot for copy-on-write, thus avoiding unnecessary object creation.releaseSnapshot
in class StateMap<K,N,S>
snapshotToRelease
- the snapshot to release, which was previously created by this state
map.public TypeSerializer<S> getStateSerializer()
public void setStateSerializer(TypeSerializer<S> stateSerializer)
public int sizeOfNamespace(Object namespace)
sizeOfNamespace
in class StateMap<K,N,S>
public InternalKvState.StateIncrementalVisitor<K,N,S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords)
getStateIncrementalVisitor
in class StateMap<K,N,S>
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.