Metrics

Metrics #

Flink exposes a metric system that allows gathering and exposing metrics to external systems.

Registering metrics #

You can access the metric system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup(). This method returns a MetricGroup object on which you can create and register new metrics.

Metric types #

Flink supports Counters, Gauges, Histograms and Meters.

Counter #

A Counter is used to count something. The current value can be in- or decremented using inc()/inc(long n) or dec()/dec(long n). You can create and register a Counter by calling counter(String name) on a MetricGroup.


public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

class MyMapper(MapFunction):
    def __init__(self):
        self.counter = None

    def open(self, runtime_context: RuntimeContext):
        self.counter = runtime_context \
            .get_metrics_group() \
            .counter("my_counter")

    def map(self, value: str):
        self.counter.inc()
        return value

Alternatively you can also use your own Counter implementation:


public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter())
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}
Still not supported in Python API.

Gauge #

A Gauge provides a value of any type on demand. In order to use a Gauge you must first create a class that implements the org.apache.flink.metrics.Gauge interface. There is no restriction for the type of the returned value. You can register a gauge by calling gauge(String name, Gauge gauge) on a MetricGroup.


public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}

new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
    valueToExpose += 1
    value
  }
}

class MyMapper(MapFunction):
    def __init__(self):
        self.value_to_expose = 0

    def open(self, runtime_context: RuntimeContext):
        runtime_context \
            .get_metrics_group() \
            .gauge("my_gauge", lambda: self.value_to_expose)

    def map(self, value: str):
        self.value_to_expose += 1
        return value

Note that reporters will turn the exposed object into a String, which means that a meaningful toString() implementation is required.

Histogram #

A Histogram measures the distribution of long values. You can register one by calling histogram(String name, Histogram histogram) on a MetricGroup.

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
    this.histogram.update(value);
    return value;
  }
}

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var histogram: Histogram = _

  override def open(parameters: Configuration): Unit = {
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram())
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}
Still not supported in Python API.

Flink does not provide a default implementation for Histogram, but offers a Wrapper that allows usage of Codahale/DropWizard histograms. To use this wrapper add the following dependency in your pom.xml:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.20.0</version>
</dependency>

You can then register a Codahale/DropWizard histogram like this:

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
  }

  @Override
  public Long map(Long value) throws Exception {
    this.histogram.update(value);
    return value;
  }
}

class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _

  override def open(config: Configuration): Unit = {
    val dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))

    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}
Still not supported in Python API.

Meter #

A Meter measures an average throughput. An occurrence of an event can be registered with the markEvent() method. Occurrence of multiple events at the same time can be registered with markEvent(long n) method. You can register a meter by calling meter(String name, Meter meter) on a MetricGroup.

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
    this.meter.markEvent();
    return value;
  }
}

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

class MyMapperMeter(MapFunction):
    def __init__(self):
        self.meter = None

    def open(self, runtime_context: RuntimeContext):
        # an average rate of events per second over 120s, default is 60s.
        self.meter = runtime_context
            .get_metrics_group()
            .meter("my_meter", time_span_in_seconds=120)

    def map(self, value: str):
        self.meter.mark_event()
        return value

Flink offers a Wrapper that allows usage of Codahale/DropWizard meters. To use this wrapper add the following dependency in your pom.xml:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.20.0</version>
</dependency>

You can then register a Codahale/DropWizard meter like this:

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
  }

  @Override
  public Long map(Long value) throws Exception {
    this.meter.markEvent();
    return value;
  }
}

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    val dropwizardMeter: com.codahale.metrics.Meter = new com.codahale.metrics.Meter()

    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}
Still not supported in Python API.

Scope #

Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.

The identifier is based on 3 components: a user-defined name when registering the metric, an optional user-defined scope and a system-provided scope. For example, if A.B is the system scope, C.D the user scope and E the name, then the identifier for the metric will be A.B.C.D.E.

You can configure which delimiter to use for the identifier (default: .) by setting the metrics.scope.delimiter key in Flink configuration file.

User Scope #

You can define a user scope by calling MetricGroup#addGroup(String name), MetricGroup#addGroup(int name) or MetricGroup#addGroup(String key, String value). These methods affect what MetricGroup#getMetricIdentifier and MetricGroup#getScopeComponents return.


counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter")

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

counter = runtime_context \
    .get_metric_group() \
    .add_group("my_metrics") \
    .counter("my_counter")

counter = runtime_context \
    .get_metric_group() \
    .add_group("my_metrics_key", "my_metrics_value") \
    .counter("my_counter")

System Scope #

The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.

Which context information should be included can be configured by setting the following keys in Flink configuration file. Each of these keys expect a format string that may contain constants (e.g. “taskmanager”) and variables (e.g. “<task_id>”) which will be replaced at runtime.

  • metrics.scope.jm
    • Default: <host>.jobmanager
    • Applied to all metrics that were scoped to a job manager.
  • metrics.scope.jm-job
    • Default: <host>.jobmanager.<job_name>
    • Applied to all metrics that were scoped to a job manager and job.
  • metrics.scope.tm
    • Default: <host>.taskmanager.<tm_id>
    • Applied to all metrics that were scoped to a task manager.
  • metrics.scope.tm-job
    • Default: <host>.taskmanager.<tm_id>.<job_name>
    • Applied to all metrics that were scoped to a task manager and job.
  • metrics.scope.task
    • Default: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    • Applied to all metrics that were scoped to a task.
  • metrics.scope.operator
    • Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    • Applied to all metrics that were scoped to an operator.

There are no restrictions on the number or order of variables. Variables are case sensitive.

The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

If you also want to include the task name but omit the task manager information you can specify the following format:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.

Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.

List of all Variables #

  • JobManager: <host>
  • TaskManager: <host>, <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_id>,<operator_name>, <subtask_index>

Important: For the Batch API, <operator_id> is always equal to <task_id>.

User Variables #

You can define a user variable by calling MetricGroup#addGroup(String key, String value). This method affects what MetricGroup#getMetricIdentifier, MetricGroup#getScopeComponents and MetricGroup#getAllVariables() returns.

Important: User variables cannot be used in scope formats.


counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

Reporter #

For information on how to set up Flink’s metric reporters please take a look at the metric reporters documentation.

System metrics #

By default Flink gathers several metrics that provide deep insights on the current state. This section is a reference of all these metrics.

The tables below generally feature 5 columns:

  • The “Scope” column describes which scope format is used to generate the system scope. For example, if the cell contains “Operator” then the scope format for “metrics.scope.operator” is used. If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.

  • The (optional)“Infix” column describes which infix is appended to the system scope.

  • The “Metrics” column lists the names of all metrics that are registered for the given scope and infix.

  • The “Description” column provides information as to what a given metric is measuring.

  • The “Type” column describes which metric type is used for the measurement.

Note that all dots in the infix/metric name columns are still subject to the “metrics.delimiter” setting.

Thus, in order to infer the metric identifier:

  1. Take the scope-format based on the “Scope” column
  2. Append the value in the “Infix” column if present, and account for the “metrics.delimiter” setting
  3. Append metric name.

CPU #

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.CPU Load The recent CPU usage of the JVM. Gauge
Time The CPU time used by the JVM. Gauge

Memory #

The memory-related metrics require Oracle’s memory management (also included in OpenJDK’s Hotspot implementation) to be in place. Some metrics might not be exposed when using other JVM implementations (e.g. IBM’s J9).

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Memory Heap.Used The amount of heap memory currently used (in bytes). Gauge
Heap.Committed The amount of heap memory guaranteed to be available to the JVM (in bytes). Gauge
Heap.Max The maximum amount of heap memory that can be used for memory management (in bytes).
This value might not be necessarily equal to the maximum value specified through -Xmx or the equivalent Flink configuration parameter. Some GC algorithms allocate heap memory that won't be available to the user code and, therefore, not being exposed through the heap metrics.
Gauge
NonHeap.Used The amount of non-heap memory currently used (in bytes). Gauge
NonHeap.Committed The amount of non-heap memory guaranteed to be available to the JVM (in bytes). Gauge
NonHeap.Max The maximum amount of non-heap memory that can be used for memory management (in bytes). Gauge
Metaspace.Used The amount of memory currently used in the Metaspace memory pool (in bytes). Gauge
Metaspace.Committed The amount of memory guaranteed to be available to the JVM in the Metaspace memory pool (in bytes). Gauge
Metaspace.Max The maximum amount of memory that can be used in the Metaspace memory pool (in bytes). Gauge
Direct.Count The number of buffers in the direct buffer pool. Gauge
Direct.MemoryUsed The amount of memory used by the JVM for the direct buffer pool (in bytes). Gauge
Direct.TotalCapacity The total capacity of all buffers in the direct buffer pool (in bytes). Gauge
Mapped.Count The number of buffers in the mapped buffer pool. Gauge
Mapped.MemoryUsed The amount of memory used by the JVM for the mapped buffer pool (in bytes). Gauge
Mapped.TotalCapacity The number of buffers in the mapped buffer pool (in bytes). Gauge
Status.Flink.Memory Managed.Used The amount of managed memory currently used. Gauge
Managed.Total The total amount of managed memory. Gauge

Threads #

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Threads Count The total number of live threads. Gauge

GarbageCollection #

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.GarbageCollector <Collector/All>.Count The total number of collections that have occurred for the given (or all) collector. Gauge
<Collector/All>.Time The total time spent performing garbage collection for the given (or all) collector. Gauge
<Collector/All>.TimeMsPerSecond The time (in milliseconds) spent garbage collecting per second for the given (or all) collector. Gauge

ClassLoader #

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded The total number of classes loaded since the start of the JVM. Gauge
ClassesUnloaded The total number of classes unloaded since the start of the JVM. Gauge

Network #

Deprecated: use Default shuffle service metrics
Scope Infix Metrics Description Type
TaskManager Status.Network AvailableMemorySegments The number of unused memory segments. Gauge
TotalMemorySegments The number of allocated memory segments. Gauge
Task buffers inputQueueLength The number of queued input buffers. (ignores LocalInputChannels which are using blocking subpartitions) Gauge
outputQueueLength The number of queued output buffers. Gauge
inPoolUsage An estimate of the input buffers usage. (ignores LocalInputChannels) Gauge
inputFloatingBuffersUsage An estimate of the floating input buffers usage. (ignores LocalInputChannels) Gauge
inputExclusiveBuffersUsage An estimate of the exclusive input buffers usage. (ignores LocalInputChannels) Gauge
outPoolUsage An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used. Gauge
Network.<Input|Output>.<gate|partition>
(only available if taskmanager.network.detailed-metrics config option is set)
totalQueueLen Total number of queued buffers in all input/output channels. Gauge
minQueueLen Minimum number of queued buffers in all input/output channels. Gauge
maxQueueLen Maximum number of queued buffers in all input/output channels. Gauge
avgQueueLen Average number of queued buffers in all input/output channels. Gauge

Default shuffle service #

Metrics related to data exchange between task executors using netty network communication.

Scope Infix Metrics Description Type
TaskManager Status.Shuffle.Netty AvailableMemorySegments The number of unused memory segments. Gauge
UsedMemorySegments The number of used memory segments. Gauge
TotalMemorySegments The number of allocated memory segments. Gauge
AvailableMemory The amount of unused memory in bytes. Gauge
UsedMemory The amount of used memory in bytes. Gauge
TotalMemory The amount of allocated memory in bytes. Gauge
RequestedMemoryUsage Experimental: The usage of the network memory. Shows (as percentage) the total amount of requested memory from all of the subtasks. It can exceed 100% as not all requested memory is required for subtask to make progress. However if usage exceeds 100% throughput can suffer greatly and please consider increasing available network memory, or decreasing configured size of network buffer pools. Gauge
Task Shuffle.Netty.Input.Buffers inputQueueLength The number of queued input buffers. Gauge
inputQueueSize The real size of queued input buffers in bytes. The size for local input channels is always `0` since the local channel takes records directly from the output queue. Gauge
inPoolUsage An estimate of the input buffers usage. (ignores LocalInputChannels) Gauge
inputFloatingBuffersUsage An estimate of the floating input buffers usage. (ignores LocalInputChannels) Gauge
inputExclusiveBuffersUsage An estimate of the exclusive input buffers usage. (ignores LocalInputChannels) Gauge
Shuffle.Netty.Output.Buffers outputQueueLength The number of queued output buffers. Gauge
outputQueueSize The real size of queued output buffers in bytes. Gauge
outPoolUsage An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used. Gauge
Shuffle.Netty.<Input|Output>.<gate|partition>
(only available if taskmanager.network.detailed-metrics config option is set)
totalQueueLen Total number of queued buffers in all input/output channels. Gauge
minQueueLen Minimum number of queued buffers in all input/output channels. Gauge
maxQueueLen Maximum number of queued buffers in all input/output channels. Gauge
avgQueueLen Average number of queued buffers in all input/output channels. Gauge
Shuffle.Netty.Input numBytesInLocal The total number of bytes this task has read from a local source. Counter
numBytesInLocalPerSecond The number of bytes this task reads from a local source per second. Meter
numBytesInRemote The total number of bytes this task has read from a remote source. Counter
numBytesInRemotePerSecond The number of bytes this task reads from a remote source per second. Meter
numBuffersInLocal The total number of network buffers this task has read from a local source. Counter
numBuffersInLocalPerSecond The number of network buffers this task reads from a local source per second. Meter
numBuffersInRemote The total number of network buffers this task has read from a remote source. Counter
numBuffersInRemotePerSecond The number of network buffers this task reads from a remote source per second. Meter

Cluster #

Scope Metrics Description Type
JobManager numRegisteredTaskManagers The number of registered taskmanagers. Gauge
numPendingTaskManagers (only applicable to Native Kubernetes / YARN) The number of outstanding taskmanagers that Flink has requested. Gauge
numRunningJobs The number of running jobs. Gauge
taskSlotsAvailable The number of available task slots. Gauge
taskSlotsTotal The total number of task slots. Gauge

Availability #

The metrics in this table are available for each of the following job states: INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING. Whether these metrics are reported depends on the metrics.job.status.enable setting.

Evolving The semantics of these metrics may change in later releases.

Scope Metrics Description Type
Job (only available on JobManager) <jobStatus>State For a given state, return 1 if the job is currently in that state, otherwise return 0. Gauge
<jobStatus>Time For a given state, if the job is currently in that state, return the time (in milliseconds) since the job transitioned into that state, otherwise return 0. Gauge
<jobStatus>TimeTotal For a given state, return how much time (in milliseconds) the job has spent in that state in total. Gauge

Experimental

While the job is in the RUNNING state the metrics in this table provide additional details on what the job is currently doing. Whether these metrics are reported depends on the metrics.job.status.enable setting.

Scope Metrics Description Type
Job (only available on JobManager) deployingState Return 1 if the job is currently deploying* tasks, otherwise return 0. Gauge
deployingTime Return the time (in milliseconds) since the job has started deploying* tasks, otherwise return 0. Gauge
deployingTimeTotal Return how much time (in milliseconds) the job has spent deploying* tasks in total. Gauge

*A job is considered to be deploying tasks when:

  • for streaming jobs, any task is in the DEPLOYING state
  • for batch jobs, if at least 1 task is in the DEPLOYING state, and there are no INITIALIZING/RUNNING tasks
Scope Metrics Description Type
Job (only available on JobManager) uptime Attention: deprecated, use runningTime. Gauge
downtime Attention: deprecated, use restartingTime, cancellingTime failingTime. Gauge
fullRestarts Attention: deprecated, use numRestarts. Gauge
numRestarts The total number of restarts since this job was submitted, including full restarts and fine-grained restarts. Gauge

Checkpointing #

Note that for failed checkpoints, metrics are updated on a best efforts basis and may be not accurate.

Scope Metrics Description Type
Job (only available on JobManager) lastCheckpointDuration The time it took to complete the last checkpoint (in milliseconds). Gauge
lastCheckpointSize The checkpointed size of the last checkpoint (in bytes), this metric could be different from lastCheckpointFullSize if incremental checkpoint or changelog is enabled. Gauge
lastCompletedCheckpointId The identifier of the last completed checkpoint. Gauge
lastCheckpointFullSize The full size of the last checkpoint (in bytes). Gauge
lastCheckpointExternalPath The path where the last external checkpoint was stored. Gauge
lastCheckpointRestoreTimestamp Timestamp when the last checkpoint was restored at the coordinator (in milliseconds). Gauge
numberOfInProgressCheckpoints The number of in progress checkpoints. Gauge
numberOfCompletedCheckpoints The number of successfully completed checkpoints. Gauge
numberOfFailedCheckpoints The number of failed checkpoints. Gauge
totalNumberOfCheckpoints The number of total checkpoints (in progress, completed, failed). Gauge
Task checkpointAlignmentTime The time in nanoseconds that the last barrier alignment took to complete, or how long the current alignment has taken so far (in nanoseconds). This is the time between receiving first and the last checkpoint barrier. You can find more information in the [Monitoring State and Checkpoints section](//nightlies.apache.org/flink/flink-docs-release-1.20/docs/ops/state/large_state_tuning/#monitoring-state-and-checkpoints) Gauge
checkpointStartDelayNanos The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew. Gauge
Job (only available on TaskManager) fileMerging.logicalFileCount The number of logical files of file merging mechanism. Gauge
fileMerging.logicalFileSize The total size of logical files of file merging mechanism on one task manager for one job. Gauge
fileMerging.physicalFileCount The number of physical files of file merging mechanism. Gauge
fileMerging.physicalFileSize The total size of physical files of file merging mechanism on one task manager for one job, usually larger than fileMerging.logicalFileSize. Gauge

State Access Latency #

Scope Metrics Description Type
Task/Operator stateClearLatency The latency of clear operation for state Histogram
valueStateGetLatency The latency of Get operation for value state Histogram
valueStateUpdateLatency The latency of update operation for value state Histogram
listStateGetLatency The latency of get operation for list state Histogram
listStateAddLatency The latency of add operation for list state Histogram
listStateAddAllLatency The latency of addAll operation for list state Histogram
listStateUpdateLatency The latency of update operation for list state Histogram
listStateMergeNamespacesLatency The latency of merge namespace operation for list state Histogram
mapStateGetLatency The latency of get operation for map state Histogram
mapStatePutLatency The latency of put operation for map state Histogram
mapStatePutAllLatency The latency of putAll operation for map state Histogram
mapStateRemoveLatency The latency of remove operation for map state Histogram
mapStateContainsLatency The latency of contains operation for map state Histogram
mapStateEntriesInitLatency The init latency of entries operation for map state Histogram
mapStateKeysInitLatency The init latency of keys operation for map state Histogram
mapStateValuesInitLatency The init latency of values operation for map state Histogram
mapStateIteratorInitLatency The init latency of iterator operation for map state Histogram
mapStateIsEmptyLatency The latency of isEmpty operation for map state Histogram
mapStateIteratorHasNextLatency The latency of iterator#hasNext operation for map state Histogram
mapStateIteratorNextLatency The latency of iterator#next operation for map state Histogram
mapStateIteratorRemoveLatency The latency of iterator#remove operation for map state Histogram
aggregatingStateGetLatency The latency of get operation for aggregating state Histogram
aggregatingStateAddLatency The latency of add operation for aggregating state Histogram
aggregatingStateMergeNamespacesLatency The latency of merge namespace operation for aggregating state Histogram
reducingStateGetLatency The latency of get operation for reducing state Histogram
reducingStateAddLatency The latency of add operation for reducing state Histogram
reducingStateMergeNamespacesLatency The latency of merge namespace operation for reducing state Histogram

RocksDB #

Certain RocksDB native metrics are available but disabled by default, you can find full documentation here

State Changelog #

Note that the metrics are only available via reporters.

Scope Metrics Description Type
Job (only available on TaskManager) numberOfUploadRequests Total number of upload requests made Counter
numberOfUploadFailures Total number of failed upload requests (request may be retried after the failure) Counter
attemptsPerUpload The number of attempts per upload Histogram
totalAttemptsPerUpload The total count distributions of attempts for per upload Histogram
uploadBatchSizes The number of upload tasks (coming from one or more writers, i.e. backends/tasks) that were grouped together and form a single upload resulting in a single file Histogram
uploadLatenciesNanos The latency distributions of uploads Histogram
uploadSizes The size distributions of uploads Histogram
uploadQueueSize Current size of upload queue. Queue items can be packed together and form a single upload. Gauge
Task/Operator startedMaterialization The number of started materializations. Counter
completedMaterialization The number of successfully completed materializations. Counter
failedMaterialization The number of failed materializations. Counter
lastDurationOfMaterialization The duration of the last materialization (in milliseconds). Gauge
lastFullSizeOfMaterialization The full size of the materialization part of the last reported checkpoint (in bytes). Gauge
lastIncSizeOfMaterialization The incremental size of the materialization part of the last reported checkpoint (in bytes). Gauge
lastFullSizeOfNonMaterialization The full size of the non-materialization part of the last reported checkpoint (in bytes). Gauge
lastIncSizeOfNonMaterialization The incremental size of the non-materialization part of the last reported checkpoint (in bytes). Gauge

IO #

Scope Metrics Description Type
Job (only available on TaskManager) [<source_id>.[<source_subtask_index>.]]<operator_id>.<operator_subtask_index>.latency The latency distributions from a given source (subtask) to an operator subtask (in milliseconds), depending on the latency granularity. Histogram
Task numBytesInLocal Attention: deprecated, use Default shuffle service metrics. Counter
numBytesInLocalPerSecond Attention: deprecated, use Default shuffle service metrics. Meter
numBytesInRemote Attention: deprecated, use Default shuffle service metrics. Counter
numBytesInRemotePerSecond Attention: deprecated, use Default shuffle service metrics. Meter
numBuffersInLocal Attention: deprecated, use Default shuffle service metrics. Counter
numBuffersInLocalPerSecond Attention: deprecated, use Default shuffle service metrics. Meter
numBuffersInRemote Attention: deprecated, use Default shuffle service metrics. Counter
numBuffersInRemotePerSecond Attention: deprecated, use Default shuffle service metrics. Meter
numBytesOut The total number of bytes this task has emitted. Counter
numBytesOutPerSecond The number of bytes this task emits per second. Meter
numBuffersOut The total number of network buffers this task has emitted. Counter
numBuffersOutPerSecond The number of network buffers this task emits per second. Meter
numFiredTimers The total number of timers this task has fired. Counter
numFiredTimersPerSecond The number of timers this task fires per second. Meter
isBackPressured Whether the task is back-pressured. Gauge
idleTimeMsPerSecond The time (in milliseconds) this task is idle (has no data to process) per second. Idle time excludes back pressured time, so if the task is back pressured it is not idle. Meter
busyTimeMsPerSecond The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated. Gauge
backPressuredTimeMsPerSecond The time (in milliseconds) this task is back pressured (soft or hard) per second. It's a sum of softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond. Gauge
softBackPressuredTimeMsPerSecond The time (in milliseconds) this task is softly back pressured per second. Softly back pressured task will be still responsive and capable of for example triggering unaligned checkpoints. Gauge
hardBackPressuredTimeMsPerSecond The time (in milliseconds) this task is back pressured in a hard way per second. During hard back pressured task is completely blocked and unresponsive preventing for example unaligned checkpoints from triggering. Gauge
maxSoftBackPressuredTimeMs Maximum recorded duration of a single consecutive period of the task being softly back pressured in the last sampling period. Please check softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond for more information. Gauge
maxHardBackPressuredTimeMs Maximum recorded duration of a single consecutive period of the task being in the hard back pressure state in the last sampling period. Please check softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond for more information. Gauge
changelogBusyTimeMsPerSecond The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'state.changelog.dstl.dfs.upload.max-in-flight' for more information. Gauge
mailboxMailsPerSecond The number of actions processed from the task's mailbox per second which includes all actions, e.g., checkpointing, timer, or cancellation actions. Meter
mailboxLatencyMs The latency is the time that actions spend waiting in the task's mailbox before being processed. The metric is a statistic of the latency in milliseconds that is measured approximately once every second and includes the last 60 measurements. Histogram
mailboxQueueSize The number of actions in the task's mailbox that are waiting to be processed. Gauge
initializationTime The time in milliseconds that one task spends on initialization, return 0 when the task is not in initialization/running status. Most of the initialization time is usually spent in restoring from the checkpoint. Counter
Task (only if buffer debloating is enabled and in non-source tasks) estimatedTimeToConsumeBuffersMs The estimated time (in milliseconds) by the buffer debloater to consume all of the buffered data in the network exchange preceding this task. This value is calculated by approximated amount of the in-flight data and calculated throughput. Gauge
debloatedBufferSize The desired buffer size (in bytes) calculated by the buffer debloater. Buffer debloater is trying to reduce buffer size when the amount of in-flight data (after taking into account current throughput) exceeds the configured target value. Gauge
Task/Operator numRecordsIn The total number of records this operator/task has received. Counter
numRecordsInPerSecond The number of records this operator/task receives per second. Meter
numRecordsOut The total number of records this operator/task has emitted. Counter
numRecordsOutPerSecond The number of records this operator/task sends per second. Meter
numLateRecordsDropped The number of records this operator/task has dropped due to arriving late. Counter
currentInputWatermark The last watermark this operator/tasks has received (in milliseconds).

Note: For operators/tasks with 2 inputs this is the minimum of the last received watermarks.

Gauge
Operator currentInputNWatermark The last watermark this operator has received in its N'th input (in milliseconds), with index N starting from 1. For example currentInput1Watermark, currentInput2Watermark, ...

Note: Only for operators with 2 or more inputs.

Gauge
currentOutputWatermark The last watermark this operator has emitted (in milliseconds). Gauge
watermarkAlignmentDrift The current drift from the minimal watermark emitted by all sources/tasks/splits that belong to the same watermark group.

Note: Available only when watermark alignment is enabled and the first common watermark is announced. You can configure the update interval in the WatermarkStrategy.

Gauge
numSplitsProcessed The total number of InputSplits this data source has processed (if the operator is a data source). Gauge

Connectors #

Kafka Connectors #

Please refer to Kafka monitoring.

Kinesis Source #

Scope Metrics User Variables Description Type
Operator millisBehindLatest stream, shardId The number of milliseconds the consumer is behind the head of the stream, indicating how far behind current time the consumer is, for each Kinesis shard. A particular shard's metric can be specified by stream name and shard id. A value of 0 indicates record processing is caught up, and there are no new records to process at this moment. A value of -1 indicates that there is no reported value for the metric, yet. Gauge
Operator sleepTimeMillis stream, shardId The number of milliseconds the consumer spends sleeping before fetching records from Kinesis. A particular shard's metric can be specified by stream name and shard id. Gauge
Operator maxNumberOfRecordsPerFetch stream, shardId The maximum number of records requested by the consumer in a single getRecords call to Kinesis. If ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS is set to true, this value is adaptively calculated to maximize the 2 Mbps read limits from Kinesis. Gauge
Operator numberOfAggregatedRecordsPerFetch stream, shardId The number of aggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis. Gauge
Operator numberOfDeggregatedRecordsPerFetch stream, shardId The number of deaggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis. Gauge
Operator averageRecordSizeBytes stream, shardId The average size of a Kinesis record in bytes, fetched by the consumer in a single getRecords call. Gauge
Operator runLoopTimeNanos stream, shardId The actual time taken, in nanoseconds, by the consumer in the run loop. Gauge
Operator loopFrequencyHz stream, shardId The number of calls to getRecords in one second. Gauge
Operator bytesRequestedPerFetch stream, shardId The bytes requested (2 Mbps / loopFrequencyHz) in a single call to getRecords. Gauge

Kinesis Sink #

Scope Metrics Description Type
Operator numRecordsOutErrors (deprecated, please use numRecordsSendErrors) Number of rejected record writes. Counter
Operator numRecordsSendErrors Number of rejected record writes. Counter
Operator CurrentSendTime Number of ms taken for 1 round trip of the last request batch. Gauge

HBase Connectors #

Scope Metrics User Variables Description Type
Operator lookupCacheHitRate n/a Cache hit ratio for lookup. Gauge

System resources #

System resources reporting is disabled by default. When metrics.system-resource is enabled additional metrics listed below will be available on Job- and TaskManager. System resources metrics are updated periodically and they present average values for a configured interval (metrics.system-resource-probing-interval).

System resources reporting requires an optional dependency to be present on the classpath (for example placed in Flink’s lib directory):

  • com.github.oshi:oshi-core:6.1.5 (licensed under MIT license)

Including it’s transitive dependencies:

  • net.java.dev.jna:jna-platform:jar:5.10.0
  • net.java.dev.jna:jna:jar:5.10.0

Failures in this regard will be reported as warning messages like NoClassDefFoundError logged by SystemResourcesMetricsInitializer during the startup.

System CPU #

Scope Infix Metrics Description
Job-/TaskManager System.CPU Usage Overall % of CPU usage on the machine.
Idle % of CPU Idle time on the machine.
Sys % of System CPU time on the machine.
User % of User CPU time on the machine.
IOWait % of IOWait CPU time on the machine.
Irq % of Irq CPU time on the machine.
SoftIrq % of SoftIrq CPU time on the machine.
Nice % of Nice CPU time on the machine.
Steal % of Steal CPU time on the machine.
Load1min Average CPU load over 1 minute
Load5min Average CPU load over 5 minute
Load15min Average CPU load over 15 minute
UsageCPU* % of CPU usage per each processor

System memory #

Scope Infix Metrics Description
Job-/TaskManager System.Memory Available Available memory in bytes
Total Total memory in bytes
System.Swap Used Used swap bytes
Total Total swap in bytes

System network #

Scope Infix Metrics Description
Job-/TaskManager System.Network.INTERFACE_NAME ReceiveRate Average receive rate in bytes per second
SendRate Average send rate in bytes per second

Speculative Execution #

Metrics below can be used to measure the effectiveness of speculative execution.

Scope Metrics Description Type
Job (only available on JobManager) numSlowExecutionVertices Number of slow execution vertices at the moment. Gauge
numEffectiveSpeculativeExecutions Number of effective speculative execution attempts, i.e. speculative execution attempts which finish earlier than their corresponding original attempts. Counter

End-to-End latency tracking #

Flink allows to track the latency of records travelling through the system. This feature is disabled by default. To enable the latency tracking you must set the latencyTrackingInterval to a positive number in either the Flink configuration or ExecutionConfig.

At the latencyTrackingInterval, the sources will periodically emit a special record, called a LatencyMarker. The marker contains a timestamp from the time when the record has been emitted at the sources. Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator, it will add to the latency tracked by the marker.

Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.

The LatencyMarkers are used to derive a distribution of the latency between the sources of the topology and each downstream operator. These distributions are reported as histogram metrics. The granularity of these distributions can be controlled in the Flink configuration. For the highest granularity subtask Flink will derive the latency distribution between every source subtask and every downstream subtask, which results in quadratic (in the terms of the parallelism) number of histograms.

Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.

Warning Enabling latency metrics can significantly impact the performance of the cluster (in particular for subtask granularity). It is highly recommended to only use them for debugging purposes.

State access latency tracking #

Flink also allows to track the keyed state access latency for standard Flink state-backends or customized state backends which extending from AbstractStateBackend. This feature is disabled by default. To enable this feature you must set the state.latency-track.keyed-state-enabled to true in the Flink configuration.

Once tracking keyed state access latency is enabled, Flink will sample the state access latency every N access, in which N is defined by state.latency-track.sample-interval. This configuration has a default value of 100. A smaller value will get more accurate results but have a higher performance impact since it is sampled more frequently.

As the type of this latency metrics is histogram, state.latency-track.history-size will control the maximum number of recorded values in history, which has the default value of 128. A larger value of this configuration will require more memory, but will provide a more accurate result.

Warning Enabling state-access-latency metrics may impact the performance. It is recommended to only use them for debugging purposes.

REST API integration #

Metrics can be queried through the Monitoring REST API.

Below is a list of available endpoints, with a sample JSON response. All endpoints are of the sample form http://hostname:8081/jobmanager/metrics, below we list only the path part of the URLs.

Values in angle brackets are variables, for example http://hostname:8081/jobs/<jobid>/metrics will have to be requested for example as http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics.

Request metrics for a specific entity:

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

Request metrics aggregated across all entities of the respective type:

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
  • /jobs/<jobid>/vertices/<vertexid>/jm-operator-metrics

Request metrics aggregated over a subset of all entities of the respective type:

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

Warning Metric names can contain special characters that you need to escape when querying metrics. For example, “a_+_b” would be escaped to “a_%2B_b”.

List of characters that should be escaped:

Character Escape Sequence
# %23
$ %24
& %26
+ %2B
/ %2F
; %3B
= %3D
? %3F
@ %40

Request a list of available metrics:

GET /jobmanager/metrics

[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

Request the values for specific (unaggregated) metrics:

GET taskmanagers/ABCDE/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

Request aggregated values for specific metrics:

GET /taskmanagers/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

Request specific aggregated values for specific metrics:

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14
  }
]

Dashboard integration #

Metrics that were gathered for each task or operator can also be visualized in the Dashboard. On the main page for a job, select the Metrics tab. After selecting one of the tasks in the top graph you can select metrics to display using the Add Metric drop-down menu.

  • Task metrics are listed as <subtask_index>.<metric_name>.
  • Operator metrics are listed as <subtask_index>.<operator_name>.<metric_name>.

Each metric will be visualized as a separate graph, with the x-axis representing time and the y-axis the measured value. All graphs are automatically updated every 10 seconds, and continue to do so when navigating to another page.

There is no limit as to the number of visualized metrics; however only numeric metrics can be visualized.

Back to top