This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
This SDK may be used if you want your Stateful Functions application to consume events from, or output events to
Flink DataStreams. Using this SDK,
you may combine pipelines written with the Flink DataStream API or higher-level libraries (such as Table API,
CEP etc.,
basically anything that produces a DataStream) with the programming constructs provided by Stateful Functions to build
complex streaming applications.
To use this, add the Flink DataStream Integration SDK as a dependency to your application:
The following sections covers the important parts on getting started with the SDK. For the full code and working
example, please take a look at this example.
Preparing a DataStream Ingress
All DataStreams used as ingresses must contain stream elements of type RoutableMessage. Each RoutableMessage
carries information about the target function’s address alongside the input event payload.
You can use the RoutableMessageBuilder to transform your DataStreams:
In the above example, we transformed a DataStream<String> into a DataStream<RoutableMessage> by mapping
element in the original stream to a RoutableMessage, with each element targeted for the function type (example:greet).
Binding Functions, Ingresses, and Egresses
Once you have transformed your stream ingresses, you may start binding functions to consume the stream events, as well
as DataStream egresses to produce the outputs to:
As you can see, instead of binding functions, ingresses, and egresses through modules as you would with a typical Stateful
Functions application, you bind them directly to the DataStream job using a StatefulFunctionDataStreamBuilder:
Remote functions are bound using the withRequestReplyRemoteFunction method. Specification of the remote function
such as service endpoint and various connection configurations can be set using the provided RequestReplyFunctionBuilder.
Embedded functions are bind using withFunctionProvider.
Egress identifiers used by functions need to be bind with the withEgressId method.
Consuming a DataStream Egress
Finally, you can obtain an egress as a DataStream from the result StatefulFunctionEgressStreams:
The obtained egress DataStream can be further processed as in a typical Flink streaming application.
Configuration
Like a typical Stateful Functions application, configuration specific to Stateful Functions is set through the flink-conf.yaml file, as explained here.
You can also overwrite the base settings for each individual job:
Attention: The setFlinkJobName method on StatefulFunctionsConfig does not have effect using this SDK.
You need to define the job name as you normally would via Flink's DataStream API.