This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Event Time

In this section you will learn about writing time-aware Flink programs. Please take a look at Timely Stream Processing to learn about the concepts behind timely stream processing.

For information about how to use time in Flink programs refer to windowing and ProcessFunction.

A prerequisite for using event time processing is setting the right time characteristic. That setting defines how data stream sources behave (for example, whether they will assign timestamps), and what notion of time should be used by window operations like KeyedStream.timeWindow(Time.seconds(30)).

You can set the time characteristic using StreamExecutionEnvironment.setStreamTimeCharacteristic():

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer<MyEvent>(topic, schema, props));

    .keyBy( (event) -> event.getUser() )
    .reduce( (a, b) -> a.add(b) )
val env = StreamExecutionEnvironment.getExecutionEnvironment


val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](topic, schema, props))

    .keyBy( _.getUser )
    .reduce( (a, b) => a.add(b) )
env = StreamExecutionEnvironment.get_execution_environment()


# alternatively:
# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
# env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

Note that in order to run this example in event time, the program needs to either use sources that directly define event time for the data and emit watermarks themselves, or the program must inject a Timestamp Assigner & Watermark Generator after the sources. Those functions describe how to access the event timestamps, and what degree of out-of-orderness the event stream exhibits.

Where to go next?

Back to top