ProcessFunction combines event processing with timers and state, making it a powerful building
block for stream processing applications. This is the basis for creating event-driven applications
with Flink. It is very similar to a
RichFlatMapFunction, but with the addition of timers.
If you’ve done the
in the Streaming Analytics training,
you will recall that it uses a
TumblingEventTimeWindow to compute the sum of the tips for
each driver during each hour, like this:
It is reasonably straightforward, and educational, to do the same thing with a
KeyedProcessFunction. Let us begin by replacing the code above with this:
In this code snippet a
PseudoWindow is being applied to a keyed
stream, the result of which is a
DataStream<Tuple3<Long, Long, Float>> (the same kind of stream
produced by the implementation that uses Flink’s built-in time windows).
The overall outline of
PseudoWindow has this shape:
Things to be aware of:
There are several types of ProcessFunctions – this is a
KeyedProcessFunction, but there are also
KeyedProcessFunction is a kind of
RichFunction. Being a
RichFunction, it has access to the
getRuntimeContext methods needed for working with managed keyed state.
There are two callbacks to implement:
processElement is called
with each incoming event;
onTimer is called when timers fire. These can be either event time or
processing time timers. Both
onTimer are provided with a context object
that can be used to interact with a
TimerService (among other things). Both callbacks are also
Collector that can be used to emit results.
Because the fare events can arrive out of order, it will sometimes be necessary to process events
for one hour before having finished computing the results for the previous hour. In fact, if the
watermarking delay is much longer than the window length, then there may be many windows open
simultaneously, rather than just two. This implementation supports this by using a
maps the timestamp for the end of each window to the sum of the tips for that window.
Things to consider:
What happens with late events? Events that are behind the watermark (i.e., late) are being dropped. If you want to do something better than this, consider using a side output, which is explained in the next section.
This example uses a
MapState where the keys are timestamps, and sets a
Timer for that same
timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
when the timer fires.
OnTimerContext context passed in to
onTimer can be used to determine the current key.
Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
onTimer is called. This onTimer method removes the related entry from
which has the effect of making it impossible to accommodate late events. This is the equivalent of
setting the allowedLateness to zero when working with Flink’s time windows.
ListState types that are optimized for RocksDB. Where possible,
these should be used instead of a
ValueState object holding some sort of collection. The RocksDB
state backend can append to
ListState without going through (de)serialization, and for
key/value pair is a separate RocksDB object, so
MapState can be efficiently accessed and updated.
There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
Side outputs are a convenient way to do this. Beyond error reporting, side outputs are also a good way to implement an n-way split of a stream.
You are now in a position to do something with the late events that were ignored in the previous section.
A side output channel is associated with an
OutputTag<T>. These tags have generic types that
correspond to the type of the side output’s
DataStream, and they have names.
Shown above is a static
OutputTag<TaxiFare> that can be referenced both when emitting
late events in the
processElement method of the
and when accessing the stream from this side output in the
main method of the job:
Alternatively, you can use two OutputTags with the same name to refer to the same side output, but if you do, they must have the same type.
In this example you have seen how a
ProcessFunction can be used to reimplement a straightforward time
window. Of course, if Flink’s built-in windowing API meets your needs, by all means, go ahead and
use it. But if you find yourself considering doing something contorted with Flink’s windows, don’t
be afraid to roll your own.
ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on
exercise below provides an example of something completely different.
Another common use case for ProcessFunctions is for expiring stale state. If you think back to the
Rides and Fares Exercise,
RichCoFlatMapFunction is used to compute a simple join, the sample solution assumes that
the TaxiRides and TaxiFares are perfectly matched, one-to-one for each
rideId. If an event is lost,
the other event for the same
rideId will be held in state forever. This could instead be implemented
KeyedCoProcessFunction, and a timer could be used to detect and clear any stale state.
The hands-on exercise that goes with this section is the Long Ride Alerts Exercise.