Flink exposes a metric system that allows gathering and exposing metrics to external systems.
You can access the metric system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup()
.
This method returns a MetricGroup
object on which you can create and register new metrics.
Flink supports Counters
, Gauges
, Histograms
and Meters
.
A Counter
is used to count something. The current value can be in- or decremented using inc()/inc(long n)
or dec()/dec(long n)
.
You can create and register a Counter
by calling counter(String name)
on a MetricGroup
.
Alternatively you can also use your own Counter
implementation:
A Gauge
provides a value of any type on demand. In order to use a Gauge
you must first create a class that implements the org.apache.flink.metrics.Gauge
interface.
There is no restriction for the type of the returned value.
You can register a gauge by calling gauge(String name, Gauge gauge)
on a MetricGroup
.
Note that reporters will turn the exposed object into a String
, which means that a meaningful toString()
implementation is required.
A Histogram
measures the distribution of long values.
You can register one by calling histogram(String name, Histogram histogram)
on a MetricGroup
.
Flink does not provide a default implementation for Histogram
, but offers a Wrapper that allows usage of Codahale/DropWizard histograms.
To use this wrapper add the following dependency in your pom.xml
:
You can then register a Codahale/DropWizard histogram like this:
A Meter
measures an average throughput. An occurrence of an event can be registered with the markEvent()
method. Occurrence of multiple events at the same time can be registered with markEvent(long n)
method.
You can register a meter by calling meter(String name, Meter meter)
on a MetricGroup
.
Flink offers a Wrapper that allows usage of Codahale/DropWizard meters.
To use this wrapper add the following dependency in your pom.xml
:
You can then register a Codahale/DropWizard meter like this:
Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.
THe identifier is based on 3 components: the user-defined name when registering the metric, an optional user-defined scope and a system-provided scope.
For example, if A.B
is the system scope, C.D
the user scope and E
the name, then the identifier for the metric will be A.B.C.D.E
.
You can configure which delimiter to use for the identifier (default: .
) by setting the metrics.scope.delimiter
key in conf/flink-conf.yaml
.
You can define a user scope by calling MetricGroup#addGroup(String name)
, MetricGroup#addGroup(int name)
or Metric#addGroup(String key, String value)
.
These methods affect what MetricGroup#getMetricIdentifier
and MetricGroup#getScopeComponents
return.
The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.
Which context information should be included can be configured by setting the following keys in conf/flink-conf.yaml
.
Each of these keys expect a format string that may contain constants (e.g. “taskmanager”) and variables (e.g. “<task_id>”) which will be replaced at runtime.
metrics.scope.jm
metrics.scope.jm.job
metrics.scope.tm
metrics.scope.tm.job
metrics.scope.task
metrics.scope.operator
There are no restrictions on the number or order of variables. Variables are case sensitive.
The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
If you also want to include the task name but omit the task manager information you can specify the following format:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
.
Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.
Important: For the Batch API, <operator_id> is always equal to <task_id>.
You can define a user variable by calling MetricGroup#addGroup(String key, String value)
.
This method affects what MetricGroup#getMetricIdentifier
, MetricGroup#getScopeComponents
and MetricGroup#getAllVariables()
returns.
Important: User variables cannot be used in scope formats.
Metrics can be exposed to an external system by configuring one or several reporters in conf/flink-conf.yaml
. These
reporters will be instantiated on each job and task manager when they are started.
metrics.reporter.<name>.<config>
: Generic setting <config>
for the reporter named <name>
.metrics.reporter.<name>.class
: The reporter class to use for the reporter named <name>
.metrics.reporter.<name>.interval
: The reporter interval to use for the reporter named <name>
.metrics.reporter.<name>.scope.delimiter
: The delimiter to use for the identifier (default value use metrics.scope.delimiter
) for the reporter named <name>
.metrics.reporters
: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.All reporters must at least have the class
property, some allow specifying a reporting interval
. Below,
we will list more settings specific to each reporter.
Example reporter configuration that specifies multiple reporters:
Important: The jar containing the reporter must be accessible when Flink is started by placing it in the /lib folder.
You can write your own Reporter
by implementing the org.apache.flink.metrics.reporter.MetricReporter
interface.
If the Reporter should send out reports regularly you have to implement the Scheduled
interface as well.
The following sections list the supported reporters.
You don’t have to include an additional dependency since the JMX reporter is available by default but not activated.
Parameters:
port
- (optional) the port on which JMX listens for connections.
In order to be able to run several instances of the reporter on one host (e.g. when one TaskManager is colocated with the JobManager) it is advisable to use a port range like 9250-9260
.
When a range is specified the actual port is shown in the relevant job or task manager log.
If this setting is set Flink will start an extra JMX connector for the given port/range.
Metrics are always available on the default local JMX interface.Example configuration:
Metrics exposed through JMX are identified by a domain and a list of key-properties, which together form the object name.
The domain always begins with org.apache.flink
followed by a generalized metric identifier. In contrast to the usual
identifier it is not affected by scope-formats, does not contain any variables and is constant across jobs.
An example for such a domain would be org.apache.flink.job.task.numBytesOut
.
The key-property list contains the values for all variables, regardless of configured scope formats, that are associated
with a given metric.
An example for such a list would be host=localhost,job_name=MyJob,task_name=MyTask
.
The domain thus identifies a metric class, while the key-property list identifies one (or multiple) instances of that metric.
In order to use this reporter you must copy /opt/flink-metrics-graphite-1.5.4.jar
into the /lib
folder
of your Flink distribution.
Parameters:
host
- the Graphite server hostport
- the Graphite server portprotocol
- protocol to use (TCP/UDP)Example configuration:
In order to use this reporter you must copy /opt/flink-metrics-prometheus-1.5.4.jar
into the /lib
folder
of your Flink distribution.
Parameters:
port
- (optional) the port the Prometheus exporter listens on, defaults to 9249. In order to be able to run several instances of the reporter on one host (e.g. when one TaskManager is colocated with the JobManager) it is advisable to use a port range like 9250-9260
.Example configuration:
Flink metric types are mapped to Prometheus metric types as follows:
Flink | Prometheus | Note |
---|---|---|
Counter | Gauge | Prometheus counters cannot be decremented. |
Gauge | Gauge | Only numbers and booleans are supported. |
Histogram | Summary | Quantiles .5, .75, .95, .98, .99 and .999 |
Meter | Gauge | The gauge exports the meter’s rate. |
All Flink metrics variables (see List of all Variables) are exported to Prometheus as labels.
In order to use this reporter you must copy /opt/flink-metrics-statsd-1.5.4.jar
into the /lib
folder
of your Flink distribution.
Parameters:
host
- the StatsD server hostport
- the StatsD server portExample configuration:
In order to use this reporter you must copy /opt/flink-metrics-datadog-1.5.4.jar
into the /lib
folder
of your Flink distribution.
Note any variables in Flink metrics, such as <host>
, <job_name>
, <tm_id>
, <subtask_index>
, <task_name>
, and <operator_name>
,
will be sent to Datadog as tags. Tags will look like host:localhost
and job_name:myjobname
.
Parameters:
apikey
- the Datadog API keytags
- (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma onlyExample configuration:
In order to use this reporter you must copy /opt/flink-metrics-slf4j-1.5.4.jar
into the /lib
folder
of your Flink distribution.
Example configuration:
By default Flink gathers several metrics that provide deep insights on the current state. This section is a reference of all these metrics.
The tables below generally feature 5 columns:
The “Scope” column describes which scope format is used to generate the system scope. For example, if the cell contains “Operator” then the scope format for “metrics.scope.operator” is used. If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.
The (optional)”Infix” column describes which infix is appended to the system scope.
The “Metrics” column lists the names of all metrics that are registered for the given scope and infix.
The “Description” column provides information as to what a given metric is measuring.
The “Type” column describes which metric type is used for the measurement.
Note that all dots in the infix/metric name columns are still subject to the “metrics.delimiter” setting.
Thus, in order to infer the metric identifier:
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.CPU | Load | The recent CPU usage of the JVM. | Gauge |
Time | The CPU time used by the JVM. | Gauge |
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Memory | Heap.Used | The amount of heap memory currently used (in bytes). | Gauge |
Heap.Committed | The amount of heap memory guaranteed to be available to the JVM (in bytes). | Gauge | ||
Heap.Max | The maximum amount of heap memory that can be used for memory management (in bytes). | Gauge | ||
NonHeap.Used | The amount of non-heap memory currently used (in bytes). | Gauge | ||
NonHeap.Committed | The amount of non-heap memory guaranteed to be available to the JVM (in bytes). | Gauge | ||
NonHeap.Max | The maximum amount of non-heap memory that can be used for memory management (in bytes). | Gauge | ||
Direct.Count | The number of buffers in the direct buffer pool. | Gauge | ||
Direct.MemoryUsed | The amount of memory used by the JVM for the direct buffer pool (in bytes). | Gauge | ||
Direct.TotalCapacity | The total capacity of all buffers in the direct buffer pool (in bytes). | Gauge | ||
Mapped.Count | The number of buffers in the mapped buffer pool. | Gauge | ||
Mapped.MemoryUsed | The amount of memory used by the JVM for the mapped buffer pool (in bytes). | Gauge | ||
Mapped.TotalCapacity | The number of buffers in the mapped buffer pool (in bytes). | Gauge |
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Threads | Count | The total number of live threads. | Gauge |
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.GarbageCollector | <GarbageCollector>.Count | The total number of collections that have occurred. | Gauge |
<GarbageCollector>.Time | The total time spent performing garbage collection. | Gauge |
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | ClassesLoaded | The total number of classes loaded since the start of the JVM. | Gauge |
ClassesUnloaded | The total number of classes unloaded since the start of the JVM. | Gauge |
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
TaskManager | Status.Network | AvailableMemorySegments | The number of unused memory segments. | Gauge |
TotalMemorySegments | The number of allocated memory segments. | Gauge | ||
Task | buffers | inputQueueLength | The number of queued input buffers. | Gauge |
outputQueueLength | The number of queued output buffers. | Gauge | ||
inPoolUsage | An estimate of the input buffers usage. | Gauge | ||
outPoolUsage | An estimate of the output buffers usage. | Gauge | ||
Network.<Input|Output>.<gate> (only available if taskmanager.net.detailed-metrics config option is set) |
totalQueueLen | Total number of queued buffers in all input/output channels. | Gauge | |
minQueueLen | Minimum number of queued buffers in all input/output channels. | Gauge | ||
maxQueueLen | Maximum number of queued buffers in all input/output channels. | Gauge | ||
avgQueueLen | Average number of queued buffers in all input/output channels. | Gauge |
Scope | Metrics | Description | Type |
---|---|---|---|
JobManager | numRegisteredTaskManagers | The number of registered taskmanagers. | Gauge |
numRunningJobs | The number of running jobs. | Gauge | |
taskSlotsAvailable | The number of available task slots. | Gauge | |
taskSlotsTotal | The total number of task slots. | Gauge |
Scope | Metrics | Description | Type |
---|---|---|---|
Job (only available on JobManager) | restartingTime | The time it took to restart the job, or how long the current restart has been in progress (in milliseconds). | Gauge |
uptime |
The time that the job has been running without interruption.
Returns -1 for completed jobs (in milliseconds). |
Gauge | |
downtime |
For jobs currently in a failing/recovering situation, the time elapsed during this outage.
Returns 0 for running jobs and -1 for completed jobs (in milliseconds). |
Gauge | |
fullRestarts | The total number of full restarts since this job was submitted (in milliseconds). | Gauge |
Scope | Metrics | Description | Type |
---|---|---|---|
Job (only available on JobManager) | lastCheckpointDuration | The time it took to complete the last checkpoint (in milliseconds). | Gauge |
lastCheckpointSize | The total size of the last checkpoint (in bytes). | Gauge | |
lastCheckpointExternalPath | The path where the last external checkpoint was stored. | Gauge | |
lastCheckpointRestoreTimestamp | Timestamp when the last checkpoint was restored at the coordinator (in milliseconds). | Gauge | |
lastCheckpointAlignmentBuffered | The number of buffered bytes during alignment over all subtasks for the last checkpoint (in bytes). | Gauge | |
numberOfInProgressCheckpoints | The number of in progress checkpoints. | Gauge | |
numberOfCompletedCheckpoints | The number of successfully completed checkpoints. | Gauge | |
numberOfFailedCheckpoints | The number of failed checkpoints. | Gauge | |
totalNumberOfCheckpoints | The number of total checkpoints (in progress, completed, failed). | Gauge | |
Task | checkpointAlignmentTime | The time in nanoseconds that the last barrier alignment took to complete, or how long the current alignment has taken so far (in nanoseconds). | Gauge |
Scope | Metrics | Description | Type |
---|---|---|---|
Job (only available on TaskManager) | <source_id>.<source_subtask_index>.<operator_id>.<operator_subtask_index>.latency | The latency distributions from a given source subtask to an operator subtask (in milliseconds). | Histogram |
Task | numBytesInLocal | The total number of bytes this task has read from a local source. | Counter |
numBytesInLocalPerSecond | The number of bytes this task reads from a local source per second. | Meter | |
numBytesInRemote | The total number of bytes this task has read from a remote source. | Counter | |
numBytesInRemotePerSecond | The number of bytes this task reads from a remote source per second. | Meter | |
Task | numBuffersInLocal | The total number of network buffers this task has read from a local source. | Counter |
numBuffersInLocalPerSecond | The number of network buffers this task reads from a local source per second. | Meter | |
numBuffersInRemote | The total number of network buffers this task has read from a remote source. | Counter | |
numBuffersInRemotePerSecond | The number of network buffers this task reads from a remote source per second. | Meter | |
numBytesOut | The total number of bytes this task has emitted. | Counter | |
numBytesOutPerSecond | The number of bytes this task emits per second. | Meter | |
numBuffersOut | The total number of network buffers this task has emitted. | Counter | |
numBuffersOutPerSecond | The number of network buffers this task emits per second. | Meter | |
Task/Operator | numRecordsIn | The total number of records this operator/task has received. | Counter |
numRecordsInPerSecond | The number of records this operator/task receives per second. | Meter | |
numRecordsOut | The total number of records this operator/task has emitted. | Counter | |
numRecordsOutPerSecond | The number of records this operator/task sends per second. | Meter | |
numLateRecordsDropped | The number of records this operator/task has dropped due to arriving late. | Counter | |
currentInputWatermark |
The last watermark this operator/tasks has received (in milliseconds).
Note: For operators/tasks with 2 inputs this is the minimum of the last received watermarks. |
Gauge | |
Operator | currentInput1Watermark |
The last watermark this operator has received in its first input (in milliseconds).
Note: Only for operators with 2 inputs. |
Gauge |
currentInput2Watermark |
The last watermark this operator has received in its second input (in milliseconds).
Note: Only for operators with 2 inputs. |
Gauge | |
currentOutputWatermark | The last watermark this operator has emitted (in milliseconds). | Gauge | |
numSplitsProcessed | The total number of InputSplits this data source has processed (if the operator is a data source). | Gauge |
Scope | Metrics | User Variables | Description | Type |
---|---|---|---|---|
Operator | commitsSucceeded | n/a | The total number of successful offset commits to Kafka, if offset committing is turned on and checkpointing is enabled. | Counter |
Operator | commitsFailed | n/a | The total number of offset commit failures to Kafka, if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress, so a commit failure does not affect the integrity of Flink's checkpointed partition offsets. | Counter |
Operator | committedOffsets | topic, partition | The last successfully committed offsets to Kafka, for each partition. A particular partition's metric can be specified by topic name and partition id. | Gauge |
Operator | currentOffsets | topic, partition | The consumer's current read offset, for each partition. A particular partition's metric can be specified by topic name and partition id. | Gauge |
Scope | Metrics | User Variables | Description | Type |
---|---|---|---|---|
Operator | millisBehindLatest | stream, shardId | The number of milliseconds the consumer is behind the head of the stream, indicating how far behind current time the consumer is, for each Kinesis shard. A particular shard's metric can be specified by stream name and shard id. A value of 0 indicates record processing is caught up, and there are no new records to process at this moment. A value of -1 indicates that there is no reported value for the metric, yet. | Gauge |
Flink allows to track the latency of records traveling through the system. This feature is disabled by default.
To enable the latency tracking you must set the latencyTrackingInterval
to a positive number in either the
Flink configuration or ExecutionConfig
.
At the latencyTrackingInterval
, the sources will periodically emit a special record, called a LatencyMarker
.
The marker contains a timestamp from the time when the record has been emitted at the sources.
Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator,
it will add to the latency tracked by the marker.
Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.
All intermediate operators keep a list of the last n
latencies from each source to compute
a latency distribution.
The sink operators keep a list from each source, and each parallel source instance to allow detecting
latency issues caused by individual machines.
Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.
Warning Enabling latency metrics can significantly impact the performance of the cluster. It is highly recommended to only use them for debugging purposes.
Metrics can be queried through the Monitoring REST API.
Below is a list of available endpoints, with a sample JSON response. All endpoints are of the sample form http://hostname:8081/jobmanager/metrics
, below we list only the path part of the URLs.
Values in angle brackets are variables, for example http://hostname:8081/jobs/<jobid>/metrics
will have to be requested for example as http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics
.
Request metrics for a specific entity:
/jobmanager/metrics
/taskmanagers/<taskmanagerid>/metrics
/jobs/<jobid>/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
Request metrics aggregated across all entities of the respective type:
/taskmanagers/metrics
/jobs/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
Request metrics aggregated over a subset of all entities of the respective type:
/taskmanagers/metrics?taskmanagers=A,B,C
/jobs/metrics?jobs=D,E,F
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
Request a list of available metrics:
GET /jobmanager/metrics
Request the values for specific (unaggregated) metrics:
GET taskmanagers/ABCDE/metrics?get=metric1,metric2
Request aggregated values for specific metrics:
GET /taskmanagers/metrics?get=metric1,metric2
Request specific aggregated values for specific metrics:
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
Metrics that were gathered for each task or operator can also be visualized in the Dashboard. On the main page for a
job, select the Metrics
tab. After selecting one of the tasks in the top graph you can select metrics to display using
the Add Metric
drop-down menu.
<subtask_index>.<metric_name>
.<subtask_index>.<operator_name>.<metric_name>
.Each metric will be visualized as a separate graph, with the x-axis representing time and the y-axis the measured value. All graphs are automatically updated every 10 seconds, and continue to do so when navigating to another page.
There is no limit as to the number of visualized metrics; however only numeric metrics can be visualized.