T
- the type of the state.@Internal public abstract class StateSerializerProvider<T> extends Object
StateSerializerProvider
wraps logic on how to obtain serializers for registered state,
either with the previous schema of state in checkpoints or the current schema of state.
A provider can be created from either a registered state serializer, or the snapshot of the previous state serializer. For the former case, if the state was restored and a snapshot of the previous state serializer was retrieved later on, the snapshot can be set on the provider which also additionally checks the compatibility of the initially registered serializer. Similarly for the latter case, if a new state serializer is registered later on, it can be set on the provider, which then also checks the compatibility of the new registered serializer.
Simply put, the provider works both directions - either creating it first with a registered serializer or the previous serializer's snapshot, and then setting the previous serializer's snapshot (if the provider was created with a registered serializer) or a new registered state serializer (if the provider was created with a serializer snapshot). Either way, the new registered serializer is checked for schema compatibility once both the new serializer and the previous serializer snapshot is present.
Modifier and Type | Method and Description |
---|---|
TypeSerializer<T> |
currentSchemaSerializer()
Gets the serializer that recognizes the current serialization schema of the state.
|
static <T> StateSerializerProvider<T> |
fromNewRegisteredSerializer(TypeSerializer<T> registeredStateSerializer)
Creates a
StateSerializerProvider from the registered state serializer. |
static <T> StateSerializerProvider<T> |
fromPreviousSerializerSnapshot(TypeSerializerSnapshot<T> stateSerializerSnapshot)
Creates a
StateSerializerProvider for restored state from the previous serializer's
snapshot. |
TypeSerializerSnapshot<T> |
getPreviousSerializerSnapshot()
Gets the previous serializer snapshot.
|
protected void |
invalidateCurrentSchemaSerializerAccess()
Invalidates access to the current schema serializer.
|
TypeSerializer<T> |
previousSchemaSerializer()
Gets the serializer that recognizes the previous serialization schema of the state.
|
abstract TypeSerializerSchemaCompatibility<T> |
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer)
For restored state, register a new serializer that potentially has a new serialization
schema.
|
abstract TypeSerializerSchemaCompatibility<T> |
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> previousSerializerSnapshot)
For restored state, set the state's previous serializer's snapshot.
|
public static <T> StateSerializerProvider<T> fromPreviousSerializerSnapshot(TypeSerializerSnapshot<T> stateSerializerSnapshot)
StateSerializerProvider
for restored state from the previous serializer's
snapshot.
Once a new serializer is registered for the state, it should be provided via the registerNewSerializerForRestoredState(TypeSerializer)
method.
T
- the type of the state.stateSerializerSnapshot
- the previous serializer's snapshot.StateSerializerProvider
.public static <T> StateSerializerProvider<T> fromNewRegisteredSerializer(TypeSerializer<T> registeredStateSerializer)
StateSerializerProvider
from the registered state serializer.
If the state is a restored one, and the previous serializer's snapshot is obtained later
on, is should be supplied via the setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
method.
T
- the type of the state.registeredStateSerializer
- the new state's registered serializer.StateSerializerProvider
.@Nonnull public final TypeSerializer<T> currentSchemaSerializer()
If this provider was created from a restored state's serializer snapshot, while a new
serializer (with a new schema) was not registered for the state (i.e., because the state was
never accessed after it was restored), then the schema of state remains identical. Therefore,
in this case, it is guaranteed that the serializer returned by this method is the same as the
one returned by previousSchemaSerializer()
.
If this provider was created from a serializer instance, then this always returns the that
same serializer instance. If later on a snapshot of the previous serializer is supplied via
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
, then the
initially supplied serializer instance will be checked for compatibility.
@Nonnull public final TypeSerializer<T> previousSchemaSerializer()
This method only returns a serializer if this provider has the previous serializer's snapshot. Otherwise, trying to access the previous schema serializer will fail with an exception.
@Nullable public final TypeSerializerSnapshot<T> getPreviousSerializerSnapshot()
@Nonnull public abstract TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer)
Users are allowed to register serializers for state only once. Therefore, this method is irrelevant if this provider was created with a serializer instance, since a state serializer had been registered already.
For the case where this provider was created from a serializer snapshot, then this method should be called at most once. The new serializer will be checked for its schema compatibility with the previous serializer's schema, and returned to the caller. The caller is responsible for checking the result and react appropriately to it, as follows:
TypeSerializerSchemaCompatibility.isCompatibleAsIs()
: nothing needs to be done.
currentSchemaSerializer()
now returns the newly registered serializer.
TypeSerializerSchemaCompatibility.isCompatibleAfterMigration()
: state needs to
be migrated before the serializer returned by currentSchemaSerializer()
can be
used. The migration should be performed by reading the state with previousSchemaSerializer()
, and then writing it again with currentSchemaSerializer()
.
TypeSerializerSchemaCompatibility.isIncompatible()
: the registered serializer
is incompatible. currentSchemaSerializer()
can no longer return a serializer
for the state, and therefore this provider shouldn't be used anymore.
@Nonnull public abstract TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> previousSerializerSnapshot)
Users are allowed to set the previous serializer's snapshot once. Therefore, this method is irrelevant if this provider was created with a serializer snapshot, since the serializer snapshot had been set already.
For the case where this provider was created from a serializer instance, then this method should be called at most once. The initially registered state serializer will be checked for its schema compatibility with the previous serializer's schema, and returned to the caller. The caller is responsible for checking the result and react appropriately to it, as follows:
TypeSerializerSchemaCompatibility.isCompatibleAsIs()
: nothing needs to be done.
currentSchemaSerializer()
remains to return the initially registered
serializer.
TypeSerializerSchemaCompatibility.isCompatibleAfterMigration()
: state needs to
be migrated before the serializer returned by currentSchemaSerializer()
can be
used. The migration should be performed by reading the state with previousSchemaSerializer()
, and then writing it again with currentSchemaSerializer()
.
TypeSerializerSchemaCompatibility.isIncompatible()
: the registered serializer
is incompatible. currentSchemaSerializer()
can no longer return a serializer
for the state, and therefore this provider shouldn't be used anymore.
previousSerializerSnapshot
- the state's previous serializer's snapshotprotected final void invalidateCurrentSchemaSerializerAccess()
currentSchemaSerializer()
fail when invoked.
Access to the current schema serializer should be invalidated by the methods registerNewSerializerForRestoredState(TypeSerializer)
or setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
once the registered
serializer is determined to be incompatible.
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.