State Bootstrapping #
Often times applications require some intial state provided by historical data in a file, database, or other system.
Because state is managed by Apache Flink’s snapshotting mechanism, for Stateful Function applications, that means
writing the intial state into a savepoint that can be used to start the job.
Users can bootstrap initial state for Stateful Functions applications using Flink’s State Processor API and a StatefulFunctionSavepointCreator
.
Attention: The savepoint creator currently only supports initializing the state for embedded Java functions.
To get started, include the following libraries in your application:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-state-processor</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.12</artifactId>
<version>3.1.0</version>
</dependency>
State Bootstrap Function #
A StateBootstrapFunction
defines how to bootstrap state for a StatefulFunction
instance with a given input.
Each bootstrap functions instance directly corresponds to a StatefulFunction
type.
Likewise, each instance is uniquely identified by an address, represented by the type and id of the function being bootstrapped.
Any state that is persisted by a bootstrap functions instance will be available to the corresponding live StatefulFunction
instance having the same address.
For example, consider the following state bootstrap function:
public class MyStateBootstrapFunction implements StateBootstrapFunction {
@Persisted
private PersistedValue<MyState> state = PersistedValue.of("my-state", MyState.class);
@Override
public void bootstrap(Context context, Object input) {
state.set(extractStateFromInput(input));
}
}
Assume that this bootstrap function was provided for function type MyFunctionType
, and the id of the bootstrap function instance was id-13
.
The function writes persisted state of name my-state
using the given bootstrap data.
After restoring a Stateful Functions application from the savepoint generated using this bootstrap function, the stateful function instance with address (MyFunctionType, id-13)
will already have state values available under state name my-state
.
Creating A Savepoint #
Savepoints are created by defining certain metadata, such as max parallelism and state backend. The default state backend is RocksDB.
int maxParallelism = 128;
StatefulFunctionsSavepointCreator newSavepoint = new StatefulFunctionsSavepointCreator(maxParallelism);
Each input data set is registered in the savepoint creator with a [router]({{ site.baseurl }}/io-module/index.html#router) that routes each record to zero or more function instances. You may then register any number of function types to the savepoint creator, similar to how functions are registered within a stateful functions module. Finally, specify an output location for the resulting savepoint.
// Read data from a file, database, or other location
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final DataSet<Tuple2<String, Integer>> userSeenCounts = env.fromElements(
Tuple2.of("foo", 4), Tuple2.of("bar", 3), Tuple2.of("joe", 2));
// Register the dataset with a router
newSavepoint.withBootstrapData(userSeenCounts, MyStateBootstrapFunctionRouter::new);
// Register a bootstrap function to process the records
newSavepoint.withStateBootstrapFunctionProvider(
new FunctionType("apache", "my-function"),
ignored -> new MyStateBootstrapFunction());
newSavepoint.write("file:///savepoint/path/");
env.execute();
For full details of how to use Flink’s DataSet
API, please check the official documentation.
Deployment #
After creating a new savpepoint, it can be used to provide the initial state for a Stateful Functions application.
When deploying based on an image, pass the -s
command to the Flink JobMaster image.
version: "2.1"
services:
master:
image: my-statefun-application-image
command: -s file:///savepoint/path
When deploying to a Flink session cluster, specify the savepoint argument in the Flink CLI.
$ ./bin/flink run -s file:///savepoint/path stateful-functions-job.jar