public static class StateTtlConfig.Builder extends Object
StateTtlConfig
.@Nonnull public StateTtlConfig.Builder setUpdateType(StateTtlConfig.UpdateType updateType)
updateType
- The ttl update type configures when to update last access timestamp which prolongs state TTL.@Nonnull public StateTtlConfig.Builder updateTtlOnCreateAndWrite()
@Nonnull public StateTtlConfig.Builder updateTtlOnReadAndWrite()
@Nonnull public StateTtlConfig.Builder setStateVisibility(@Nonnull StateTtlConfig.StateVisibility stateVisibility)
stateVisibility
- The state visibility configures whether expired user value can be returned or not.@Nonnull public StateTtlConfig.Builder returnExpiredIfNotCleanedUp()
@Nonnull public StateTtlConfig.Builder neverReturnExpired()
@Nonnull public StateTtlConfig.Builder setTtlTimeCharacteristic(@Nonnull StateTtlConfig.TtlTimeCharacteristic ttlTimeCharacteristic)
ttlTimeCharacteristic
- The time characteristic configures time scale to use for ttl.@Nonnull public StateTtlConfig.Builder useProcessingTime()
@Nonnull public StateTtlConfig.Builder cleanupFullSnapshot()
@Nonnull public StateTtlConfig.Builder cleanupIncrementally(@Nonnegative int cleanupSize, boolean runCleanupForEveryRecord)
Upon every state access this cleanup strategy checks a bunch of state keys for expiration and cleans up expired ones. It keeps a lazy iterator through all keys with relaxed consistency if backend supports it. This way all keys should be regularly checked and cleaned eventually over time if any state is constantly being accessed.
Additionally to the incremental cleanup upon state access, it can also run per every record. Caution: if there are a lot of registered states using this option, they all will be iterated for every record to check if there is something to cleanup.
Note: if no access happens to this state or no records are processed
in case of runCleanupForEveryRecord
, expired state will persist.
Note: Time spent for the incremental cleanup increases record processing latency.
Note: At the moment incremental cleanup is implemented only for Heap state backend. Setting it for RocksDB will have no effect.
Note: If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys while iterating because of its specific implementation which does not support concurrent modifications. Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.
cleanupSize
- max number of keys pulled from queue for clean up upon state touch for any keyrunCleanupForEveryRecord
- run incremental cleanup per each processed record@Nonnull @Deprecated public StateTtlConfig.Builder cleanupInRocksdbCompactFilter()
cleanupInBackground()
insteadRocksDB runs periodic compaction of state updates and merges them to free storage. During this process, the TTL filter checks timestamp of state entries and drops expired ones. The feature has to be activated in RocksDb backend firstly using the following Flink configuration option: state.backend.rocksdb.ttl.compaction.filter.enabled.
Due to specifics of RocksDB compaction filter, cleanup is not properly guaranteed if put and merge operations are used at the same time: https://github.com/facebook/rocksdb/blob/master/include/rocksdb/compaction_filter.h#L69 It means that the TTL filter should be tested for List state taking into account this caveat.
@Nonnull public StateTtlConfig.Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
RocksDB compaction filter will query current timestamp,
used to check expiration, from Flink every time after processing queryTimeAfterNumEntries
number of state entries.
Updating the timestamp more often can improve cleanup speed
but it decreases compaction performance because it uses JNI call from native code.
queryTimeAfterNumEntries
- number of state entries to process by compaction filter before updating current timestamp@Nonnull @Deprecated public StateTtlConfig.Builder cleanupInBackground()
Depending on actually used backend, the corresponding default cleanup will kick in if supported.
If some specific cleanup is also configured, e.g. cleanupIncrementally(int, boolean)
or
cleanupInRocksdbCompactFilter()
, then the specific one will kick in instead of default.
@Nonnull public StateTtlConfig.Builder disableCleanupInBackground()
If some specific cleanup is configured, e.g. cleanupIncrementally(int, boolean)
or
cleanupInRocksdbCompactFilter()
, this setting does not disable it.
@Nonnull public StateTtlConfig.Builder setTtl(@Nonnull Time ttl)
ttl
- The ttl time.@Nonnull public StateTtlConfig build()
Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.