This SDK may be used if you want your Stateful Functions application to consume events from, or output events to
Flink DataStreams. Using this SDK,
you may combine pipelines written with the Flink DataStream API or higher-level libraries (such as Table API,
CEP etc.,
basically anything that produces a DataStream
) with the programming constructs provided by Stateful Functions to build
complex streaming applications.
To use this, add the Flink DataStream Integration SDK as a dependency to your application:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-datastream</artifactId>
<version>2.2.2</version>
</dependency>
The following sections covers the important parts on getting started with the SDK. For the full code and working example, please take a look at this example.
All DataStream
s used as ingresses must contain stream elements of type RoutableMessage
. Each RoutableMessage
carries information about the target function’s address alongside the input event payload.
You can use the RoutableMessageBuilder
to transform your DataStream
s:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> names = env.addSource(...)
DataStream<RoutableMessage> namesIngress =
names.map(name ->
RoutableMessageBuilder.builder()
.withTargetAddress(new FunctionType("example", "greet"), name)
.withMessageBody(name)
.build()
);
In the above example, we transformed a DataStream<String>
into a DataStream<RoutableMessage>
by mapping
element in the original stream to a RoutableMessage
, with each element targeted for the function type (example:greet)
.
Once you have transformed your stream ingresses, you may start binding functions to consume the stream events, as well as DataStream egresses to produce the outputs to:
FunctionType GREET = new FunctionType("example", "greet");
FunctionType REMOTE_GREET = new FunctionType("example", "remote-greet");
EgressIdentifier<String> GREETINGS = new EgressIdentifier<>("example", "greetings", String.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> names = env.addSource(...);
DataStream<RoutableMessage> namesIngress =
names.map(name ->
RoutableMessageBuilder.builder()
.withTargetAddress(GREET, name)
.withMessageBody(name)
.build()
);
StatefulFunctionEgressStreams egresses =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(namesIngress)
.withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
.withPersistedState("seen_count")
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxNumBatchRequests(500))
.withFunctionProvider(GREET, unused -> new MyFunction())
.withEgressId(GREETINGS)
.build(env);
As you can see, instead of binding functions, ingresses, and egresses through modules as you would with a typical Stateful
Functions application, you bind them directly to the DataStream
job using a StatefulFunctionDataStreamBuilder
:
withRequestReplyRemoteFunction
method. Specification of the remote function
such as service endpoint and various connection configurations can be set using the provided RequestReplyFunctionBuilder
.withFunctionProvider
.withEgressId
method.Finally, you can obtain an egress as a DataStream
from the result StatefulFunctionEgressStreams
:
EgressIdentifier<String> GREETINGS = new EgressIdentifier<>("example", "greetings", String.class);
StatefulFunctionEgressStreams egresses = ...
DataStream<String> greetingsEgress = egresses.getDataStreamForEgressId(GREETINGS);
The obtained egress DataStream
can be further processed as in a typical Flink streaming application.
Like a typical Stateful Functions application, configuration specific to Stateful Functions is set through the flink-conf.yaml
file, as explained here.
You can also overwrite the base settings for each individual job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
statefunConfig.setGlobalConfiguration("someGlobalConfig", "foobar");
statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
StatefulFunctionEgressStreams egresses =
StatefulFunctionDataStreamBuilder.builder("example")
...
.withConfiguration(statefunConfig)
.build(env);