This page is targeted as a guideline for users who require the use of custom serialization for their state, covering how to provide a custom state serializer as well as guidelines and best practices for implementing serializers that allow state schema evolution.
If you’re simply using Flink’s own serializers, this page is irrelevant and can be ignored.
When registering a managed operator or keyed state, a StateDescriptor
is required
to specify the state’s name, as well as information about the type of the state. The type information is used by Flink’s
type serialization framework to create appropriate serializers for the state.
It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
simply by directly instantiating the StateDescriptor
with your own TypeSerializer
implementation:
This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary internal details about how Flink interacts with these abstractions.
When restoring from savepoints, Flink allows changing the serializers used to read and write previously registered state,
so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be
registered for the state (i.e., the serializer that comes with the StateDescriptor
used to access the state in the
restored job). This new serializer may have a different schema than that of the previous serializer. Therefore, when
implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in
mind is how the serialization schema can be changed in the future.
When speaking of schema, in this context the term is interchangeable between referring to the data model of a state type and the serialized binary format of a state type. The schema, generally speaking, can change for a few cases:
In order for the new execution to have information about the written schema of state and detect whether or not the
schema has changed, upon taking a savepoint of an operator’s state, a snapshot of the state serializer needs to be
written along with the state bytes. This is abstracted a TypeSerializerSnapshot
, explained in the next subsection.
TypeSerializerSnapshot
abstractionA serializer’s TypeSerializerSnapshot
is a point-in-time information that serves as the single source of truth about
the state serializer’s write schema, as well as any additional information mandatory to restore a serializer that
would be identical to the given point-in-time. The logic about what should be written and read at restore time
as the serializer snapshot is defined in the writeSnapshot
and readSnapshot
methods.
Note that the snapshot’s own write schema may also need to change over time (e.g. when you wish to add more information
about the serializer to the snapshot). To facilitate this, snapshots are versioned, with the current version
number defined in the getCurrentVersion
method. On restore, when the serializer snapshot is read from savepoints,
the version of the schema in which the snapshot was written in will be provided to the readSnapshot
method so that
the read implementation can handle different versions.
At restore time, the logic that detects whether or not the new serializer’s schema has changed should be implemented in
the resolveSchemaCompatibility
method. When previous registered state is registered again with new serializers in the
restored execution of an operator, the new serializer is provided to the previous serializer’s snapshot via this method.
This method returns a TypeSerializerSchemaCompatibility
representing the result of the compatibility resolution,
which can be one of the following:
TypeSerializerSchemaCompatibility.compatibleAsIs()
: this result signals that the new serializer is compatible,
meaning that the new serializer has identical schema with the previous serializer. It is possible that the new
serializer has been reconfigured in the resolveSchemaCompatibility
method so that it is compatible.TypeSerializerSchemaCompatibility.compatibleAfterMigration()
: this result signals that the new serializer has a
different serialization schema, and it is possible to migrate from the old schema by using the previous serializer
(which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with
the new serializer (which recognizes the new schema).TypeSerializerSchemaCompatibility.incompatible()
: this result signals that the new serializer has a
different serialization schema, but it is not possible to migrate from the old schema.The last bit of detail is how the previous serializer is obtained in the case that migration is required.
Another important role of a serializer’s TypeSerializerSnapshot
is that it serves as a factory to restore
the previous serializer. More specifically, the TypeSerializerSnapshot
should implement the restoreSerializer
method
to instantiate a serializer instance that recognizes the previous serializer’s schema and configuration, and can therefore
safely read data written by the previous serializer.
TypeSerializer
and TypeSerializerSnapshot
abstractionsTo wrap up, this section concludes how Flink, or more specifically the state backends, interact with the abstractions. The interaction is slightly different depending on the state backend, but this is orthogonal to the implementation of state serializers and their serializer snapshots.
RocksDBStateBackend
)TypeSerializer
for the state is used to read / write state on every state access.TypeSerializer#snapshotConfiguration
method.TypeSerializer#resolveSchemaCompatibility
to check for schema compatibility.TypeSerializerSnapshot#restoreSerializer()
, and is used to deserialize state bytes to objects, which in turn
are re-written again with the new serializer, which recognizes schema B to complete the migration. All entries
of the accessed state is migrated all-together before processing continues.MemoryStateBackend
, FsStateBackend
)TypeSerializer
is maintained by the state backend.TypeSerializer#snapshotConfiguration
method.TypeSerializerSnapshot#restoreSerializer()
, and is used to deserialize state bytes to objects.TypeSerializer#resolveSchemaCompatibility
to check for schema compatibility.A serializer’s snapshot, being the single source of truth for how a registered state was serialized, serves as an entry point to reading state in savepoints. In order to be able to restore and access previous state, the previous state serializer’s snapshot must be able to be restored.
Flink restores serializer snapshots by first instantiating the TypeSerializerSnapshot
with its classname (written
along with the snapshot bytes). Therefore, to avoid being subject to unintended classname changes or instantiation
failures, TypeSerializerSnapshot
classes should:
TypeSerializerSnapshot
class across different serializersSince schema compatibility checks goes through the serializer snapshots, having multiple serializers returning
the same TypeSerializerSnapshot
class as their snapshot would complicate the implementation for the
TypeSerializerSnapshot#resolveSchemaCompatibility
and TypeSerializerSnapshot#restoreSerializer()
method.
This would also be a bad separation of concerns; a single serializer’s serialization schema,
configuration, as well as how to restore it, should be consolidated in its own dedicated TypeSerializerSnapshot
class.
CompositeSerializerSnapshot
utility for serializers that contain nested serializersThere may be cases where a TypeSerializer
relies on other nested TypeSerializer
s; take for example Flink’s
TupleSerializer
, where it is configured with nested TypeSerializer
s for the tuple fields. In this case,
the snapshot of the most outer serializer should also contain snapshots of the nested serializers.
The CompositeSerializerSnapshot
can be used specifically for this scenario. It wraps the logic of resolving
the overall schema compatibility check result for the composite serializer.
For an example of how it should be used, one can refer to Flink’s
ListSerializerSnapshot implementation.
This section is a guide for API migration from serializers and serializer snapshots that existed before Flink 1.7.
Before Flink 1.7, serializer snapshots were implemented as a TypeSerializerConfigSnapshot
(which is now deprecated,
and will eventually be removed in the future to be fully replaced by the new TypeSerializerSnapshot
interface).
Moreover, the responsibility of serializer schema compatibility checks lived within the TypeSerializer
,
implemented in the TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)
method.
Another major difference between the new and old abstractions is that the deprecated TypeSerializerConfigSnapshot
did not have the capability of instantiating the previous serializer. Therefore, in the case where your serializer
still returns a subclass of TypeSerializerConfigSnapshot
as its snapshot, the serializer instance itself will always
be written to savepoints using Java serialization so that the previous serializer may be available at restore time.
This is very undesirable, since whether or not restoring the job will be successful is susceptible to availability
of the previous serializer’s class, or in general, whether or not the serializer instance can be read back at restore
time using Java serialization. This means that you be limited to the same serializer for your state,
and could be problematic once you want to upgrade serializer classes or perform schema migration.
To be future-proof and have flexibility to migrate your state serializers and schema, it is highly recommended to migrate from the old abstractions. The steps to do this is as follows:
TypeSerializerSnapshot
. This will be the new snapshot for your serializer.TypeSerializerSnapshot
as the serializer snapshot for your serializer in the
TypeSerializer#snapshotConfiguration()
method.TypeSerializerConfigSnapshot
of the serializer must still exist in the classpath,
and the implementation for the TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)
method must not be
removed. The purpose of this process is to replace the TypeSerializerConfigSnapshot
written in old savepoints
with the newly implemented TypeSerializerSnapshot
for the serializer.TypeSerializerSnapshot
as the
state serializer snapshot, and the serializer instance will no longer be written in the savepoint.
At this point, it is now safe to remove all implementations of the old abstraction (remove the old
TypeSerializerConfigSnapshot
implementation as will as the
TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)
from the serializer).