Traces

Traces #

Flink exposes a tracing system that allows gathering and exposing traces to external systems.

Reporting traces #

You can access the tracing system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup(). This method returns a MetricGroup object via which you can report a new single span trace.

Reporting single Span #

A Span represents something that happened in Flink at certain point of time, that will be reported to a TraceReporter. To report a Span you can use the MetricGroup#addSpan(SpanBuilder) method.

Currently we don’t support traces with multiple spans. Each Span is self-contained and represents things like a checkpoint or recovery.

public class MyClass {
    void doSomething() {
        // (...)
        metricGroup.addSpan(
                Span.builder(MyClass.class, "SomeAction")
                        .setStartTsMillis(startTs) // Optional
                        .setEndTsMillis(endTs) // Optional
                        .setAttribute("foo", "bar");
    }
}
Currently reporting Spans from Python is not supported.

Reporter #

For information on how to set up Flink’s trace reporters please take a look at the trace reporters documentation.

System traces #

Flink reports traces listed below.

The tables below generally feature 5 columns:

  • The “Scope” column describes what is that trace reported scope.

  • The “Name” column describes the name of the reported trace.

  • The “Attributes” column lists the names of all attributes that are reported with the given trace.

  • The “Description” column provides information as to what a given attribute is reporting.

Checkpointing and initialization #

Flink reports a single span trace for the whole checkpoint and job initialization events once that event reaches a terminal state: COMPLETED or FAILED.

Scope Name Attributes Description
org.apache.flink.
runtime.checkpoint.
CheckpointStatsTracker
Checkpoint startTs Timestamp when the checkpoint has started.
endTs Timestamp when the checkpoint has finished.
checkpointId Id of the checkpoint.
checkpointedSize Size in bytes of checkpointed state during this checkpoint. Might be smaller than fullSize if incremental checkpoints are used.
fullSize Full size in bytes of the referenced state by this checkpoint. Might be larger than checkpointSize if incremental checkpoints are used.
checkpointStatus What was the state of this checkpoint: FAILED or COMPLETED.
JobInitialization startTs Timestamp when the job initialization has started.
endTs Timestamp when the job initialization has finished.
checkpointId (optional) Id of the checkpoint that the job recovered from (if any).
fullSize Full size in bytes of the referenced state by the checkpoint that was used during recovery (if any).
(Max/Sum)MailboxStartDurationMs The aggregated (max and sum) across all subtasks duration between subtask being created until all classes and objects of that subtask are initialize.
(Max/Sum)ReadOutputDataDurationMs The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint's output buffers.
(Max/Sum)InitializeStateDurationMs The aggregated (max and sum) across all subtasks duration to initialize a state backend (including state files download time)
(Max/Sum)GateRestoreDurationMs The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint's input buffers.
(Max/Sum)DownloadStateDurationMs

(optional - currently only supported by RocksDB Incremental)
The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS.
(Max/Sum)RestoredStateSizeBytes.[location] The aggregated (max and sum) across all subtasks size of restored state by location. Possible locations are defined in Enum StateObjectSizeStatsCollector as LOCAL_MEMORY, LOCAL_DISK, REMOTE, UNKNOWN.

Back to top