Traces #
Flink exposes a tracing system that allows gathering and exposing traces to external systems.
Reporting traces #
You can access the tracing system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup()
.
This method returns a MetricGroup
object via which you can report a new single span trace.
Reporting single Span #
A Span
represents something that happened in Flink at certain point of time, that will be reported to a TraceReporter
.
To report a Span
you can use the MetricGroup#addSpan(SpanBuilder)
method.
Currently we don’t support traces with multiple spans. Each Span
is self-contained and represents things like a checkpoint or recovery.
public class MyClass {
void doSomething() {
// (...)
metricGroup.addSpan(
Span.builder(MyClass.class, "SomeAction")
.setStartTsMillis(startTs) // Optional
.setEndTsMillis(endTs) // Optional
.setAttribute("foo", "bar");
}
}
Currently reporting Spans from Python is not supported.
Reporter #
For information on how to set up Flink’s trace reporters please take a look at the trace reporters documentation.
System traces #
Flink reports traces listed below.
The tables below generally feature 5 columns:
-
The “Scope” column describes what is that trace reported scope.
-
The “Name” column describes the name of the reported trace.
-
The “Attributes” column lists the names of all attributes that are reported with the given trace.
-
The “Description” column provides information as to what a given attribute is reporting.
Checkpointing and initialization #
Flink reports a single span trace for the whole checkpoint and job initialization events once that event reaches a terminal state: COMPLETED or FAILED.
Scope | Name | Attributes | Description |
---|---|---|---|
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker | Checkpoint | startTs | Timestamp when the checkpoint has started. |
endTs | Timestamp when the checkpoint has finished. | ||
checkpointId | Id of the checkpoint. | ||
checkpointedSize | Size in bytes of checkpointed state during this checkpoint. Might be smaller than fullSize if incremental checkpoints are used. | ||
fullSize | Full size in bytes of the referenced state by this checkpoint. Might be larger than checkpointSize if incremental checkpoints are used. | ||
checkpointStatus | What was the state of this checkpoint: FAILED or COMPLETED. | ||
JobInitialization | startTs | Timestamp when the job initialization has started. | |
endTs | Timestamp when the job initialization has finished. | ||
checkpointId (optional) | Id of the checkpoint that the job recovered from (if any). | ||
fullSize | Full size in bytes of the referenced state by the checkpoint that was used during recovery (if any). | ||
(Max/Sum)MailboxStartDurationMs | The aggregated (max and sum) across all subtasks duration between subtask being created until all classes and objects of that subtask are initialize. | ||
(Max/Sum)ReadOutputDataDurationMs | The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint's output buffers. | ||
(Max/Sum)InitializeStateDurationMs | The aggregated (max and sum) across all subtasks duration to initialize a state backend (including state files download time) | ||
(Max/Sum)GateRestoreDurationMs | The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint's input buffers. | ||
(Max/Sum)DownloadStateDurationMs (optional - currently only supported by RocksDB Incremental) |
The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS. | ||
(Max/Sum)RestoredStateSizeBytes.[location] | The aggregated (max and sum) across all subtasks size of restored state by location. Possible locations are defined in Enum StateObjectSizeStatsCollector as LOCAL_MEMORY, LOCAL_DISK, REMOTE, UNKNOWN. |