Flink DataStream API Programming Guide
DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for example write the data to files, or to standard output (for example the command line terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.
Please see basic concepts for an introduction to the basic concepts of the Flink API.
In order to create your own Flink DataStream program, we encourage you to start with anatomy of a Flink Program and gradually add your own transformations. The remaining sections act as references for additional operations and advanced features.
- Example Program
- DataStream Transformations
- Data Sources
- Data Sinks
- Iterations
- Execution Parameters
- Debugging
Example Program
The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}
To run the example program, start the input stream with netcat first from a terminal:
nc -lk 9999
Just type some words hitting return for a new word. These will be the input to the word count program. If you want to see counts greater than 1, type the same word again and again within 5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺).
DataStream Transformations
Data transformations transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated topologies.
This section gives a description of all the available transformations.
Transformation | Description |
---|---|
Map DataStream → DataStream |
Takes one element and produces one element. A map function that doubles the values of the input stream:
|
FlatMap DataStream → DataStream |
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
|
Filter DataStream → DataStream |
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
|
KeyBy DataStream → KeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.
|
Reduce KeyedStream → DataStream |
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
emits the new value.
|
Fold KeyedStream → DataStream |
A "rolling" fold on a keyed data stream with an initial value.
Combines the current element with the last folded value and
emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
|
Aggregations KeyedStream → DataStream |
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
Window KeyedStream → WindowedStream |
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
|
WindowAll DataStream → AllWindowedStream |
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
|
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
|
Window Reduce WindowedStream → DataStream |
Applies a functional reduce function to the window and returns the reduced value.
|
Window Fold WindowedStream → DataStream |
Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
|
Aggregations on windows WindowedStream → DataStream |
Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
Union DataStream* → DataStream |
Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will get each element twice in the resulting stream.
|
Window Join DataStream,DataStream → DataStream |
Join two data streams on a given key and a common window.
|
Window CoGroup DataStream,DataStream → DataStream |
Cogroups two data streams on a given key and a common window.
|
Connect DataStream,DataStream → ConnectedStreams |
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
Similar to map and flatMap on a connected data stream
|
Split DataStream → SplitStream |
Split the stream into two or more streams according to some criterion.
|
Select SplitStream → DataStream |
Select one or more streams from a split stream.
|
Iterate DataStream → IterativeStream → DataStream |
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.
|
Extract Timestamps DataStream → DataStream |
Extracts timestamps from records in order to work with windows that use event time semantics. See working with time.
|
Transformation | Description |
---|---|
Map DataStream → DataStream |
Takes one element and produces one element. A map function that doubles the values of the input stream:
|
FlatMap DataStream → DataStream |
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
|
Filter DataStream → DataStream |
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
|
KeyBy DataStream → KeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.
|
Reduce KeyedStream → DataStream |
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
emits the new value.
|
Fold KeyedStream → DataStream |
A "rolling" fold on a keyed data stream with an initial value.
Combines the current element with the last folded value and
emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
|
Aggregations KeyedStream → DataStream |
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
Window KeyedStream → WindowedStream |
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.
|
WindowAll DataStream → AllWindowedStream |
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
|
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
|
Window Reduce WindowedStream → DataStream |
Applies a functional reduce function to the window and returns the reduced value.
|
Window Fold WindowedStream → DataStream |
Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
|
Aggregations on windows WindowedStream → DataStream |
Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
Union DataStream* → DataStream |
Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will get each element twice in the resulting stream.
|
Window Join DataStream,DataStream → DataStream |
Join two data streams on a given key and a common window.
|
Window CoGroup DataStream,DataStream → DataStream |
Cogroups two data streams on a given key and a common window.
|
Connect DataStream,DataStream → ConnectedStreams |
"Connects" two data streams retaining their types, allowing for shared state between the two streams.
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
Similar to map and flatMap on a connected data stream
|
Split DataStream → SplitStream |
Split the stream into two or more streams according to some criterion.
|
Select SplitStream → DataStream |
Select one or more streams from a split stream.
|
Iterate DataStream → IterativeStream → DataStream |
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.
|
Extract Timestamps DataStream → DataStream |
Extracts timestamps from records in order to work with windows that use event time semantics. See working with time.
|
The following transformations are available on data streams of Tuples:
Transformation | Description |
---|---|
Project DataStream → DataStream |
Selects a subset of fields from the tuples
|
Transformation | Description |
---|---|
Project DataStream → DataStream |
Selects a subset of fields from the tuples
|
Physical partitioning
Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.
Transformation | Description |
---|---|
Custom partitioning DataStream → DataStream |
Uses a user-defined Partitioner to select the target task for each element.
|
Random partitioning DataStream → DataStream |
Partitions elements randomly according to a uniform distribution.
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
|
Rescaling DataStream → DataStream |
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers. The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations. In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. Please see this figure for a visualization of the connection pattern in the above example:
|
Broadcasting DataStream → DataStream |
Broadcasts elements to every partition.
|
Transformation | Description |
---|---|
Custom partitioning DataStream → DataStream |
Uses a user-defined Partitioner to select the target task for each element.
|
Random partitioning DataStream → DataStream |
Partitions elements randomly according to a uniform distribution.
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
|
Rescaling DataStream → DataStream |
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers. The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations. In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. Please see this figure for a visualization of the connection pattern in the above example:
|
Broadcasting DataStream → DataStream |
Broadcasts elements to every partition.
|
Task chaining and resource groups
Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:
Use StreamExecutionEnvironment.disableOperatorChaining()
if you want to disable chaining in
the whole job. For more fine grained control, the following functions are available. Note that
these functions can only be used right after a DataStream transformation as they refer to the
previous transformation. For example, you can use someStream.map(...).startNewChain()
, but
you cannot use someStream.startNewChain()
.
A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.
Transformation | Description |
---|---|
Start new chain |
Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
|
Disable chaining |
Do not chain the map operator
|
Set slot sharing group |
Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
|
Transformation | Description |
---|---|
Start new chain |
Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
|
Disable chaining |
Do not chain the map operator
|
Set slot sharing group |
Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
|
Data Sources
Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction)
.
You can either use one of the source functions that come with Flink or write a custom source
by implementing the SourceFunction
for non-parallel sources, or by implementing the
ParallelSourceFunction
interface or extending RichParallelSourceFunction
for parallel sources.
There are several predefined stream sources accessible from the StreamExecutionEnvironment
:
File-based:
-
readTextFile(path)
/TextInputFormat
- Reads files line wise and returns them as Strings. -
readFile(path)
/ Any input format - Reads files as dictated by the input format. -
readFileStream
- create a stream by appending elements when there are changes to a file
Socket-based:
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.
Collection-based:
-
fromCollection(Collection)
- Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type. -
fromCollection(Iterator, Class)
- Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator. -
fromElements(T ...)
- Creates a data stream from the given sequence of objects. All objects must be of the same type. -
fromParallelCollection(SplittableIterator, Class)
- Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator. -
generateSequence(from, to)
- Generates the sequence of numbers in the given interval, in parallel.
Custom:
addSource
- Attache a new source function. For example, to read from Apache Kafka you can useaddSource(new FlinkKafkaConsumer08<>(...))
. See connectors for more details.
Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction)
.
You can either use one of the source functions that come with Flink or write a custom source
by implementing the SourceFunction
for non-parallel sources, or by implementing the
ParallelSourceFunction
interface or extending RichParallelSourceFunction
for parallel sources.
There are several predefined stream sources accessible from the StreamExecutionEnvironment
:
File-based:
-
readTextFile(path)
/TextInputFormat
- Reads files line wise and returns them as Strings. -
readFile(path)
/ Any input format - Reads files as dictated by the input format. -
readFileStream
- create a stream by appending elements when there are changes to a file
Socket-based:
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.
Collection-based:
-
fromCollection(Seq)
- Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type. -
fromCollection(Iterator)
- Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator. -
fromElements(elements: _*)
- Creates a data stream from the given sequence of objects. All objects must be of the same type. -
fromParallelCollection(SplittableIterator)
- Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator. -
generateSequence(from, to)
- Generates the sequence of numbers in the given interval, in parallel.
Custom:
addSource
- Attache a new source function. For example, to read from Apache Kafka you can useaddSource(new FlinkKafkaConsumer08<>(...))
. See connectors for more details.
Data Sinks
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:
-
writeAsText()
/TextOuputFormat
- Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element. -
writeAsCsv(...)
/CsvOutputFormat
- Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects. -
print()
/printToErr()
- Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output. -
writeUsingOutputFormat()
/FileOutputFormat
- Method and base class for custom file outputs. Supports custom object-to-bytes conversion. -
writeToSocket
- Writes elements to a socket according to aSerializationSchema
-
addSink
- Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:
-
writeAsText()
/TextOuputFormat
- Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element. -
writeAsCsv(...)
/CsvOutputFormat
- Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects. -
print()
/printToErr()
- Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output. -
writeUsingOutputFormat()
/FileOutputFormat
- Method and base class for custom file outputs. Supports custom object-to-bytes conversion. -
writeToSocket
- Writes elements to a socket according to aSerializationSchema
-
addSink
- Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
Note that the write*()
methods on DataStream
are mainly intended for debugging purposes.
They are not participating in Flink’s checkpointing, this means these functions usually have
at-least-once semantics. The data flushing to the target system depends on the implementation of the
OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up
in the target system. Also, in failure cases, those records might be lost.
For reliable, exactly-once delivery of a stream into a file system, use the flink-connector-filesystem
.
Also, custom implementations through the .addSink(...)
method can partiticpate in Flink’s checkpointing
for exactly-once semantics.
Iterations
Iterative streaming programs implement a step function and embed it into an IterativeStream
. As a DataStream
program may never finish, there is no maximum number of iterations. Instead, you need to specify which part
of the stream is fed back to the iteration and which part is forwarded downstream using a split
transformation
or a filter
. Here, we show an example using filters. First, we define an IterativeStream
IterativeStream<Integer> iteration = input.iterate();
Then, we specify the logic that will be executed inside the loop using a series of trasformations (here
a simple map
transformation)
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
To close an iteration and define the iteration tail, call the closeWith(feedbackStream)
method of the IterativeStream
.
The DataStream given to the closeWith
function will be fed back to the iteration head.
A common pattern is to use a filter to separate the part of the strem that is fed back,
and the part of the stream which is propagated forward. These filters can, e.g., define
the “termination” logic, where an element is allowed to propagate downstream rather
than being fed back.
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the
iteration head. To override this the user can set an optional boolean flag in the closeWith
method.
For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
Iterative streaming programs implement a step function and embed it into an IterativeStream
. As a DataStream
program may never finish, there is no maximum number of iterations. Instead, you need to specify which part
of the stream is fed back to the iteration and which part is forwarded downstream using a split
transformation
or a filter
. Here, we show an example iteration where the body (the part of the computation that is repeated)
is a simple map transformation, and the elements that are fed back are distinguished by the elements that
are forwarded downstream using filters.
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
})
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the
iteration head. To override this the user can set an optional boolean flag in the closeWith
method.
For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
Execution Parameters
The StreamExecutionEnvironment
contains the ExecutionConfig
which allows to set job specific configuration values for the runtime.
Please refer to execution configuration for an explanation of most parameters. These parameters pertain specifically to the DataStream API:
setAutoWatermarkInterval(long milliseconds)
: Set the interval for automatic watermark emission. You can get the current value withlong getAutoWatermarkInterval()
Fault Tolerance
The Fault Tolerance Documentation describes the options and parameters to enable and configure Flink’s checkpointing mechanism.
Controlling Latency
By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic)
but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files.
While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough.
To control throughput and latency, you can use env.setBufferTimeout(timeoutMillis)
on the execution environment
(or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the
buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.
Usage:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
To maximize throughput, set setBufferTimeout(-1)
which will remove the timeout and buffers will only be
flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms).
A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.
Debugging
Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.
Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.
Local Execution Environment
A LocalStreamEnvironment
starts a Flink system within the same JVM process it was created in. If you
start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your
program.
A LocalEnvironment is created and used as follows:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()
Collection Data Sources
Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.
Collection data sources can be used as follows:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
Note: Currently, the collection data source requires that data types and iterators implement
Serializable
. Furthermore, collection data sources can not be executed in parallel (
parallelism = 1).
Iterator Data Sink
Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows:
import org.apache.flink.contrib.streaming.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
import org.apache.flink.contrib.streaming.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala