Most operations require a user-defined function. This section lists different
ways of how they can be specified. We also cover Accumulators
, which can be
used to gain insights into your Flink application.
The most basic way is to implement one of the provided interfaces:
You can pass a function as an anonymous class:
Flink also supports Java 8 Lambdas in the Java API.
All transformations that require a user-defined function can instead take as argument a rich function. For example, instead of
you can write
and pass the function as usual to a map
transformation:
Rich functions can also be defined as an anonymous class:
As already seen in previous examples all operations accept lambda functions for describing the operation:
All transformations that take as argument a lambda function can instead take as argument a rich function. For example, instead of
you can write
and pass the function to a map
transformation:
Rich functions can also be defined as an anonymous class:
Rich functions provide, in addition to the user-defined function (map,
reduce, etc), four methods: open
, close
, getRuntimeContext
, and
setRuntimeContext
. These are useful for parameterizing the function
(see Passing Parameters to Functions),
creating and finalizing local state, accessing broadcast variables (see
Broadcast Variables), and for accessing runtime
information such as accumulators and counters (see
Accumulators and Counters), and information
on iterations (see Iterations).
Accumulators are simple constructs with an add operation and a final accumulated result, which is available after the job ended.
The most straightforward accumulator is a counter: You can increment it using the
Accumulator.add(V value)
method. At the end of the job Flink will sum up (merge) all partial
results and send the result to the client. Accumulators are useful during debugging or if you
quickly want to find out more about your data.
Flink currently has the following built-in accumulators. Each of them implements the Accumulator interface.
How to use accumulators:
First you have to create an accumulator object (here a counter) in the user-defined transformation function where you want to use it.
Second you have to register the accumulator object, typically in the open()
method of the
rich function. Here you also define the name.
You can now use the accumulator anywhere in the operator function, including in the open()
and
close()
methods.
The overall result will be stored in the JobExecutionResult
object which is
returned from the execute()
method of the execution environment
(currently this only works if the execution waits for the
completion of the job).
All accumulators share a single namespace per job. Thus you can use the same accumulator in different operator functions of your job. Flink will internally merge all accumulators with the same name.
A note on accumulators and iterations: Currently the result of accumulators is only available after the overall job has ended. We plan to also make the result of the previous iteration available in the next iteration. You can use Aggregators to compute per-iteration statistics and base the termination of iterations on such statistics.
Custom accumulators:
To implement your own accumulator you simply have to write your implementation of the Accumulator interface. Feel free to create a pull request if you think your custom accumulator should be shipped with Flink.
You have the choice to implement either Accumulator or SimpleAccumulator.
Accumulator<V,R>
is most flexible: It defines a type V
for the value to add, and a
result type R
for the final result. E.g. for a histogram, V
is a number and R
is
a histogram. SimpleAccumulator
is for the cases where both types are the same, e.g. for counters.