State Bootstrapping

State Bootstrapping #

Often times applications require some intial state provided by historical data in a file, database, or other system. Because state is managed by Apache Flink’s snapshotting mechanism, for Stateful Function applications, that means writing the intial state into a savepoint that can be used to start the job. Users can bootstrap initial state for Stateful Functions applications using Flink’s State Processor API and a StatefulFunctionSavepointCreator.

Attention: The savepoint creator currently only supports initializing the state for embedded Java functions.

To get started, include the following libraries in your application:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>statefun-flink-state-processor</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-state-processor-api_2.12</artifactId>
    <version>3.1.0</version>
</dependency>

State Bootstrap Function #

A StateBootstrapFunction defines how to bootstrap state for a StatefulFunction instance with a given input.

Each bootstrap functions instance directly corresponds to a StatefulFunction type. Likewise, each instance is uniquely identified by an address, represented by the type and id of the function being bootstrapped. Any state that is persisted by a bootstrap functions instance will be available to the corresponding live StatefulFunction instance having the same address.

For example, consider the following state bootstrap function:

public class MyStateBootstrapFunction implements StateBootstrapFunction {

	@Persisted
	private PersistedValue<MyState> state = PersistedValue.of("my-state", MyState.class);

	@Override
	public void bootstrap(Context context, Object input) {
		state.set(extractStateFromInput(input));
	}
 }

Assume that this bootstrap function was provided for function type MyFunctionType, and the id of the bootstrap function instance was id-13. The function writes persisted state of name my-state using the given bootstrap data. After restoring a Stateful Functions application from the savepoint generated using this bootstrap function, the stateful function instance with address (MyFunctionType, id-13) will already have state values available under state name my-state.

Creating A Savepoint #

Savepoints are created by defining certain metadata, such as max parallelism and state backend. The default state backend is RocksDB.

int maxParallelism = 128;
StatefulFunctionsSavepointCreator newSavepoint = new StatefulFunctionsSavepointCreator(maxParallelism);

Each input data set is registered in the savepoint creator with a [router]({{ site.baseurl }}/io-module/index.html#router) that routes each record to zero or more function instances. You may then register any number of function types to the savepoint creator, similar to how functions are registered within a stateful functions module. Finally, specify an output location for the resulting savepoint.

// Read data from a file, database, or other location
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<String, Integer>> userSeenCounts = env.fromElements(
	Tuple2.of("foo", 4), Tuple2.of("bar", 3), Tuple2.of("joe", 2));

// Register the dataset with a router
newSavepoint.withBootstrapData(userSeenCounts, MyStateBootstrapFunctionRouter::new);

// Register a bootstrap function to process the records
newSavepoint.withStateBootstrapFunctionProvider(
		new FunctionType("apache", "my-function"),
		ignored -> new MyStateBootstrapFunction());

newSavepoint.write("file:///savepoint/path/");

env.execute();

For full details of how to use Flink’s DataSet API, please check the official documentation.

Deployment #

After creating a new savpepoint, it can be used to provide the initial state for a Stateful Functions application.

When deploying based on an image, pass the -s command to the Flink JobMaster image.

version: "2.1"
services:
  master:
    image: my-statefun-application-image
    command: -s file:///savepoint/path

When deploying to a Flink session cluster, specify the savepoint argument in the Flink CLI.

$ ./bin/flink run -s file:///savepoint/path stateful-functions-job.jar