public class DefaultKeyedStateStore extends Object implements KeyedStateStore
RuntimeContext
.Modifier and Type | Field and Description |
---|---|
protected ExecutionConfig |
executionConfig |
protected KeyedStateBackend<?> |
keyedStateBackend |
Constructor and Description |
---|
DefaultKeyedStateStore(KeyedStateBackend<?> keyedStateBackend,
ExecutionConfig executionConfig) |
Modifier and Type | Method and Description |
---|---|
<IN,ACC,OUT> |
getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT> stateProperties)
Gets a handle to the system's key/value folding state.
|
<T,ACC> FoldingState<T,ACC> |
getFoldingState(FoldingStateDescriptor<T,ACC> stateProperties)
Gets a handle to the system's key/value folding state.
|
<T> ListState<T> |
getListState(ListStateDescriptor<T> stateProperties)
Gets a handle to the system's key/value list state.
|
<UK,UV> MapState<UK,UV> |
getMapState(MapStateDescriptor<UK,UV> stateProperties)
Gets a handle to the system's key/value map state.
|
protected <S extends State> |
getPartitionedState(StateDescriptor<S,?> stateDescriptor) |
<T> ReducingState<T> |
getReducingState(ReducingStateDescriptor<T> stateProperties)
Gets a handle to the system's key/value reducing state.
|
<T> ValueState<T> |
getState(ValueStateDescriptor<T> stateProperties)
Gets a handle to the system's key/value state.
|
protected final KeyedStateBackend<?> keyedStateBackend
protected final ExecutionConfig executionConfig
public DefaultKeyedStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig)
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties)
KeyedStateStore
Because the scope of each value is the key of the currently processed element, and the elements are distributed by the Flink runtime, the system can transparently scale out and redistribute the state and KeyedStream.
The following code example shows how to implement a continuous counter that counts how many times elements of a certain key occur, and emits an updated count for that element on each occurrence.
DataStream<MyType> stream = ...;
KeyedStream<MyType> keyedStream = stream.keyBy("id");
keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
private ValueState<Long> count;
public void open(Configuration cfg) {
state = getRuntimeContext().getState(
new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
}
public Tuple2<MyType, Long> map(MyType value) {
long count = state.value() + 1;
state.update(value);
return new Tuple2<>(value, count);
}
});
getState
in interface KeyedStateStore
T
- The type of value stored in the state.stateProperties
- The descriptor defining the properties of the stats.public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties)
KeyedStateStore
KeyedStateStore.getState(ValueStateDescriptor)
, but is optimized for state that
holds lists. One can adds elements to the list, or retrieve the list as a whole.
This state is only accessible if the function is executed on a KeyedStream.
DataStream<MyType> stream = ...;
KeyedStream<MyType> keyedStream = stream.keyBy("id");
keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() {
private ListState<MyType> state;
public void open(Configuration cfg) {
state = getRuntimeContext().getListState(
new ListStateDescriptor<>("myState", MyType.class));
}
public void flatMap(MyType value, Collector<MyType> out) {
if (value.isDivider()) {
for (MyType t : state.get()) {
out.collect(t);
}
} else {
state.add(value);
}
}
});
getListState
in interface KeyedStateStore
T
- The type of value stored in the state.stateProperties
- The descriptor defining the properties of the stats.public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties)
KeyedStateStore
KeyedStateStore.getState(ValueStateDescriptor)
, but is optimized for state that
aggregates values.
This state is only accessible if the function is executed on a KeyedStream.
DataStream<MyType> stream = ...;
KeyedStream<MyType> keyedStream = stream.keyBy("id");
keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
private ReducingState<Long> state;
public void open(Configuration cfg) {
state = getRuntimeContext().getReducingState(
new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
}
public Tuple2<MyType, Long> map(MyType value) {
state.add(value.count());
return new Tuple2<>(value, state.get());
}
});
getReducingState
in interface KeyedStateStore
T
- The type of value stored in the state.stateProperties
- The descriptor defining the properties of the stats.public <IN,ACC,OUT> AggregatingState<IN,OUT> getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT> stateProperties)
KeyedStateStore
KeyedStateStore.getState(ValueStateDescriptor)
, but is optimized for state that
aggregates values with different types.
This state is only accessible if the function is executed on a KeyedStream.
DataStream<MyType> stream = ...;
KeyedStream<MyType> keyedStream = stream.keyBy("id");
AggregateFunction<...> aggregateFunction = ...
keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
private AggregatingState<MyType, Long> state;
public void open(Configuration cfg) {
state = getRuntimeContext().getAggregatingState(
new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class));
}
public Tuple2<MyType, Long> map(MyType value) {
state.add(value);
return new Tuple2<>(value, state.get());
}
});
getAggregatingState
in interface KeyedStateStore
IN
- The type of the values that are added to the state.ACC
- The type of the accumulator (intermediate aggregation state).OUT
- The type of the values that are returned from the state.stateProperties
- The descriptor defining the properties of the stats.public <T,ACC> FoldingState<T,ACC> getFoldingState(FoldingStateDescriptor<T,ACC> stateProperties)
KeyedStateStore
KeyedStateStore.getState(ValueStateDescriptor)
, but is optimized for state that
aggregates values with different types.
This state is only accessible if the function is executed on a KeyedStream.
DataStream<MyType> stream = ...;
KeyedStream<MyType> keyedStream = stream.keyBy("id");
keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
private FoldingState<MyType, Long> state;
public void open(Configuration cfg) {
state = getRuntimeContext().getReducingState(
new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
}
public Tuple2<MyType, Long> map(MyType value) {
state.add(value);
return new Tuple2<>(value, state.get());
}
});
getFoldingState
in interface KeyedStateStore
T
- The type of value stored in the state.stateProperties
- The descriptor defining the properties of the stats.public <UK,UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK,UV> stateProperties)
KeyedStateStore
KeyedStateStore.getState(ValueStateDescriptor)
, but is optimized for state that
is composed of user-defined key-value pairs
This state is only accessible if the function is executed on a KeyedStream.
DataStream<MyType> stream = ...;
KeyedStream<MyType> keyedStream = stream.keyBy("id");
keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
private MapState<MyType, Long> state;
public void open(Configuration cfg) {
state = getRuntimeContext().getMapState(
new MapStateDescriptor<>("sum", MyType.class, Long.class));
}
public Tuple2<MyType, Long> map(MyType value) {
return new Tuple2<>(value, state.get(value));
}
});
getMapState
in interface KeyedStateStore
UK
- The type of the user keys stored in the state.UV
- The type of the user values stored in the state.stateProperties
- The descriptor defining the properties of the stats.protected <S extends State> S getPartitionedState(StateDescriptor<S,?> stateDescriptor) throws Exception
Exception
Copyright © 2014–2019 The Apache Software Foundation. All rights reserved.