Class StateSerializerProvider<T>
- java.lang.Object
-
- org.apache.flink.runtime.state.StateSerializerProvider<T>
-
- Type Parameters:
T
- the type of the state.
@Internal public abstract class StateSerializerProvider<T> extends Object
AStateSerializerProvider
wraps logic on how to obtain serializers for registered state, either with the previous schema of state in checkpoints or the current schema of state.A provider can be created from either a registered state serializer, or the snapshot of the previous state serializer. For the former case, if the state was restored and a snapshot of the previous state serializer was retrieved later on, the snapshot can be set on the provider which also additionally checks the compatibility of the initially registered serializer. Similarly for the latter case, if a new state serializer is registered later on, it can be set on the provider, which then also checks the compatibility of the new registered serializer.
Simply put, the provider works both directions - either creating it first with a registered serializer or the previous serializer's snapshot, and then setting the previous serializer's snapshot (if the provider was created with a registered serializer) or a new registered state serializer (if the provider was created with a serializer snapshot). Either way, the new registered serializer is checked for schema compatibility once both the new serializer and the previous serializer snapshot is present.
-
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description TypeSerializer<T>
currentSchemaSerializer()
Gets the serializer that recognizes the current serialization schema of the state.static <T> StateSerializerProvider<T>
fromNewRegisteredSerializer(TypeSerializer<T> registeredStateSerializer)
Creates aStateSerializerProvider
from the registered state serializer.static <T> StateSerializerProvider<T>
fromPreviousSerializerSnapshot(TypeSerializerSnapshot<T> stateSerializerSnapshot)
Creates aStateSerializerProvider
for restored state from the previous serializer's snapshot.TypeSerializerSnapshot<T>
getPreviousSerializerSnapshot()
Gets the previous serializer snapshot.protected void
invalidateCurrentSchemaSerializerAccess()
Invalidates access to the current schema serializer.TypeSerializer<T>
previousSchemaSerializer()
Gets the serializer that recognizes the previous serialization schema of the state.abstract TypeSerializerSchemaCompatibility<T>
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer)
For restored state, register a new serializer that potentially has a new serialization schema.abstract TypeSerializerSchemaCompatibility<T>
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> previousSerializerSnapshot)
For restored state, set the state's previous serializer's snapshot.
-
-
-
Method Detail
-
fromPreviousSerializerSnapshot
public static <T> StateSerializerProvider<T> fromPreviousSerializerSnapshot(TypeSerializerSnapshot<T> stateSerializerSnapshot)
Creates aStateSerializerProvider
for restored state from the previous serializer's snapshot.Once a new serializer is registered for the state, it should be provided via the
registerNewSerializerForRestoredState(TypeSerializer)
method.- Type Parameters:
T
- the type of the state.- Parameters:
stateSerializerSnapshot
- the previous serializer's snapshot.- Returns:
- a new
StateSerializerProvider
.
-
fromNewRegisteredSerializer
public static <T> StateSerializerProvider<T> fromNewRegisteredSerializer(TypeSerializer<T> registeredStateSerializer)
Creates aStateSerializerProvider
from the registered state serializer.If the state is a restored one, and the previous serializer's snapshot is obtained later on, is should be supplied via the
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
method.- Type Parameters:
T
- the type of the state.- Parameters:
registeredStateSerializer
- the new state's registered serializer.- Returns:
- a new
StateSerializerProvider
.
-
currentSchemaSerializer
@Nonnull public final TypeSerializer<T> currentSchemaSerializer()
Gets the serializer that recognizes the current serialization schema of the state. This is the serializer that should be used for regular state serialization and deserialization after state has been restored.If this provider was created from a restored state's serializer snapshot, while a new serializer (with a new schema) was not registered for the state (i.e., because the state was never accessed after it was restored), then the schema of state remains identical. Therefore, in this case, it is guaranteed that the serializer returned by this method is the same as the one returned by
previousSchemaSerializer()
.If this provider was created from a serializer instance, then this always returns the that same serializer instance. If later on a snapshot of the previous serializer is supplied via
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
, then the initially supplied serializer instance will be checked for compatibility.- Returns:
- a serializer that reads and writes in the current schema of the state.
-
previousSchemaSerializer
@Nonnull public final TypeSerializer<T> previousSchemaSerializer()
Gets the serializer that recognizes the previous serialization schema of the state. This is the serializer that should be used for restoring the state, i.e. when the state is still in the previous serialization schema.This method only returns a serializer if this provider has the previous serializer's snapshot. Otherwise, trying to access the previous schema serializer will fail with an exception.
- Returns:
- a serializer that reads and writes in the previous schema of the state.
-
getPreviousSerializerSnapshot
@Nullable public final TypeSerializerSnapshot<T> getPreviousSerializerSnapshot()
Gets the previous serializer snapshot.- Returns:
- The previous serializer snapshot, or null if registered serializer was for a new state, not a restored one.
-
registerNewSerializerForRestoredState
@Nonnull public abstract TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer)
For restored state, register a new serializer that potentially has a new serialization schema.Users are allowed to register serializers for state only once. Therefore, this method is irrelevant if this provider was created with a serializer instance, since a state serializer had been registered already.
For the case where this provider was created from a serializer snapshot, then this method should be called at most once. The new serializer will be checked for its schema compatibility with the previous serializer's schema, and returned to the caller. The caller is responsible for checking the result and react appropriately to it, as follows:
TypeSerializerSchemaCompatibility.isCompatibleAsIs()
: nothing needs to be done.currentSchemaSerializer()
now returns the newly registered serializer.TypeSerializerSchemaCompatibility.isCompatibleAfterMigration()
: state needs to be migrated before the serializer returned bycurrentSchemaSerializer()
can be used. The migration should be performed by reading the state withpreviousSchemaSerializer()
, and then writing it again withcurrentSchemaSerializer()
.TypeSerializerSchemaCompatibility.isIncompatible()
: the registered serializer is incompatible.currentSchemaSerializer()
can no longer return a serializer for the state, and therefore this provider shouldn't be used anymore.
- Returns:
- the schema compatibility of the new registered serializer, with respect to the previous serializer.
-
setPreviousSerializerSnapshotForRestoredState
@Nonnull public abstract TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> previousSerializerSnapshot)
For restored state, set the state's previous serializer's snapshot.Users are allowed to set the previous serializer's snapshot once. Therefore, this method is irrelevant if this provider was created with a serializer snapshot, since the serializer snapshot had been set already.
For the case where this provider was created from a serializer instance, then this method should be called at most once. The initially registered state serializer will be checked for its schema compatibility with the previous serializer's schema, and returned to the caller. The caller is responsible for checking the result and react appropriately to it, as follows:
TypeSerializerSchemaCompatibility.isCompatibleAsIs()
: nothing needs to be done.currentSchemaSerializer()
remains to return the initially registered serializer.TypeSerializerSchemaCompatibility.isCompatibleAfterMigration()
: state needs to be migrated before the serializer returned bycurrentSchemaSerializer()
can be used. The migration should be performed by reading the state withpreviousSchemaSerializer()
, and then writing it again withcurrentSchemaSerializer()
.TypeSerializerSchemaCompatibility.isIncompatible()
: the registered serializer is incompatible.currentSchemaSerializer()
can no longer return a serializer for the state, and therefore this provider shouldn't be used anymore.
- Parameters:
previousSerializerSnapshot
- the state's previous serializer's snapshot- Returns:
- the schema compatibility of the initially registered serializer, with respect to the previous serializer.
-
invalidateCurrentSchemaSerializerAccess
protected final void invalidateCurrentSchemaSerializerAccess()
Invalidates access to the current schema serializer. This letscurrentSchemaSerializer()
fail when invoked.Access to the current schema serializer should be invalidated by the methods
registerNewSerializerForRestoredState(TypeSerializer)
orsetPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
once the registered serializer is determined to be incompatible.
-
-