Metrics and Logging
This documentation is for an unreleased version of the Apache Flink Kubernetes Operator. We recommend you use the latest stable version.

Metrics and Logging #

Metrics #

The Flink Kubernetes Operator (Operator) extends the Flink Metric System that allows gathering and exposing metrics to centralized monitoring solutions.

Different operator metrics can be turned on/off individually using the configuration. For details check the metrics config reference.

The Operator gathers aggregates metrics about managed resources.

Scope Metrics Description Type
Namespace FlinkDeployment/FlinkSessionJob.Count Number of managed resources per namespace Gauge
Namespace FlinkDeployment.ResourceUsage.Cpu/Memory Total resources used per namespace Gauge
Namespace FlinkDeployment.JmDeploymentStatus.<Status>.Count Number of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR Gauge
Namespace FlinkDeployment.FlinkVersion.<FlinkVersion>.Count Number of managed FlinkDeployment resources per <FlinkVersion> per namespace. <FlinkVersion> is retrieved via REST API from Flink JM. Gauge
Namespace FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.Count Number of managed resources currently in state <State> per namespace. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED Gauge
System/Namespace FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.TimeSeconds Time spent in state <State$gt for a given resource. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED Histogram
System/Namespace FlinkDeployment/FlinkSessionJob.Lifecycle.Transition.<Transition>.TimeSeconds Time statistics for selected lifecycle state transitions. <Transition> can take values from: Resume, Upgrade, Suspend, Stabilization, Rollback, Submission Histogram

Lifecycle metrics #

Based on the resource status the operator monitors resource lifecycle states.

The number of resources and time spend in each of these states at any given time is tracked by the Lifecycle.<STATE>.Count and Lifecycle.<STATE>.TimeSeconds metrics.

In addition to the simple counts we further track a few selected state transitions:

  • Upgrade : End-to-end resource upgrade time from stable to stable
  • Resume : Time from suspended to stable
  • Suspend : Time for any suspend operation
  • Stabilization : Time from deployed to stable state
  • Rollback : Time from deployed to rolled_back state if the resource was rolled back
  • Submission: Flink resource submission time

Kubernetes Client Metrics #

The Operator gathers various metrics related to Kubernetes API server access.

Scope Metrics Description Type
System KubeClient.HttpRequest.Count Number of HTTP request sent to the Kubernetes API Server Counter
System KubeClient.HttpRequest.<RequestMethod>.Count Number of HTTP request sent to the Kubernetes API Server per request method. <RequestMethod> can take values from: GET, POST, PUT, PATCH, DELETE, etc. Counter
System KubeClient.HttpRequest.Failed.Count Number of failed HTTP requests that has no response from the Kubernetes API Server Counter
System KubeClient.HttpResponse.Count Number of HTTP responses received from the Kubernetes API Server Counter
System KubeClient.HttpResponse.<ResponseCode>.Count Number of HTTP responses received from the Kubernetes API Server per response code. <ResponseCode> can take values from: 200, 404, 503, etc. Counter
System KubeClient.HttpResponse.<ResponseCode>.NumPerSecond Number of HTTP responses received from the Kubernetes API Server per response code per second. <ResponseCode> can take values from: 200, 404, 503, etc. Meter
System KubeClient.HttpRequest.NumPerSecond Number of HTTP requests sent to the Kubernetes API Server per second Meter
System KubeClient.HttpRequest.Failed.NumPerSecond Number of failed HTTP requests sent to the Kubernetes API Server per second Meter
System KubeClient.HttpResponse.NumPerSecond Number of HTTP responses received from the Kubernetes API Server per second Meter
System KubeClient.HttpResponse.TimeNanos Latency statistics obtained from the HTTP responses received from the Kubernetes API Server Histogram

Kubernetes client metrics by Http Response Code #

It’s possible to publish additional metrics by Http response code received from API server by setting kubernetes.operator.kubernetes.client.metrics.http.response.code.groups.enabled to true .

Scope Metrics Description Type
System KubeClient.HttpResponse.1xx.Count Number of HTTP Code 1xx responses (informational) received from the Kubernetes API Server per response code. Counter
System KubeClient.HttpResponse.2xx.Count Number of HTTP Code 2xx responses (success) received from the Kubernetes API Server per response code. Counter
System KubeClient.HttpResponse.3xx.Count Number of HTTP Code 3xx responses (redirection) received from the Kubernetes API Server per response code. Counter
System KubeClient.HttpResponse.4xx.Count Number of HTTP Code 4xx responses (client error) received from the Kubernetes API Server per response code. Counter
System KubeClient.HttpResponse.5xx.Count Number of HTTP Code 5xx responses (server error) received from the Kubernetes API Server per response code. Counter
System KubeClient.HttpResponse.1xx.NumPerSecond Number of HTTP Code 1xx responses (informational) received from the Kubernetes API Server per response code per second. Meter
System KubeClient.HttpResponse.2xx.NumPerSecond Number of HTTP Code 2xx responses (success) received from the Kubernetes API Server per response code per second. Meter
System KubeClient.HttpResponse.3xx.NumPerSecond Number of HTTP Code 3xx responses (redirection) received from the Kubernetes API Server per response code per second. Meter
System KubeClient.HttpResponse.4xx.NumPerSecond Number of HTTP Code 4xx responses (client error) received from the Kubernetes API Server per response code per second. Meter
System KubeClient.HttpResponse.5xx.NumPerSecond Number of HTTP Code 5xx responses (server error) received from the Kubernetes API Server per response code per second. Meter

JVM Metrics #

The Operator gathers metrics about the JVM process and exposes it similarly to core Flink System metrics. The list of metrics are not repeated in this document.

JOSDK Metrics #

The Flink operator also forwards metrics created by the Java Operator SDK framework itself under the JOSDK metric name prefix. Some of these metrics are on system, namespace and resource level.

Metric Reporters #

The well known Metric Reporters are shipped in the operator image and are ready to use.

In order to specify metrics configuration for the operator, prefix them with kubernetes.operator.. This logic ensures that we can separate Flink job and operator metrics configuration.

Let’s look at a few examples.

Slf4j #

The default metrics reporter in the operator is Slf4j. It does not require any external monitoring systems, and it is enabled in the values.yaml file by default, mainly for demonstrating purposes.

defaultConfiguration:
  create: true
  append: true
  flink-conf.yaml: |+
    kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
    kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE    

To use a more robust production grade monitoring solution the configuration needs to be changed.

How to Enable Prometheus (Example) #

The following example shows how to enable the Prometheus metric reporter:

defaultConfiguration:
  create: true
  append: true
  flink-conf.yaml: |+
    kubernetes.operator.metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    kubernetes.operator.metrics.reporter.prom.port: 9999    

Some metric reporters, including the Prometheus, need a port to be exposed on the container. This can be achieved be defining a value for the otherwise empty metrics.port variable. Either in the values.yaml file:

metrics:
  port: 9999

or using the option --set metrics.port=9999 in the command line.

Set up Prometheus locally #

The Prometheus Operator among other options provides an elegant, declarative way to specify how group of pods should be monitored using custom resources.

To install the Prometheus operator via Helm run:

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack

The Grafana dashboard can be accessed through port-forwarding:

kubectl port-forward deployment/prometheus-grafana 3000

The credentials for the dashboard can be retrieved by:

kubectl get secret prometheus-grafana -o jsonpath="{.data.admin-user}" | base64 --decode ; echo
kubectl get secret prometheus-grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo

To enable the operator metrics in Prometheus create a pod-monitor.yaml file with the following content:

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: flink-kubernetes-operator
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: flink-kubernetes-operator
  podMetricsEndpoints:
      - port: metrics

and apply it on your Kubernetes environment:

kubectl create -f pod-monitor.yaml

Once the custom resource is created in the Kubernetes environment the operator metrics are ready to explore http://localhost:3000/explore.

Logging #

The Operator controls the logging behaviour for Flink applications and the Operator itself using configuration files mounted externally via ConfigMaps. Configuration files with default values are shipped in the Helm chart. It is recommended to review and adjust them if needed in the values.yaml file before deploying the Operator in production environments.

To append/override the default log configuration properties for the operator and Flink deployments define the log4j-operator.properties and log4j-console.properties keys respectively:

defaultConfiguration:
  create: true
  append: true
  log4j-operator.properties: |+
    # Flink Operator Logging Overrides
    # rootLogger.level = DEBUG
    # The monitor interval in seconds to enable log4j automatic reconfiguration
    # monitorInterval = 30    
  log4j-console.properties: |+
    # Flink Deployment Logging Overrides
    # rootLogger.level = DEBUG
    # The monitor interval in seconds to enable log4j automatic reconfiguration
    # monitorInterval = 30    

Logging in the operator is intentionally succinct and does not include contextual information such as namespace or name of the FlinkDeployment objects. We rely on the MDC provided by the operator-sdk to access this information and use it directly in the log layout.

See the Java Operator SDK docs for more detail.

To learn more about accessing the job logs or changing the log level dynamically check the corresponding section of the core documentation.

FlinkDeployment Logging Configuration #

Users have the freedom to override the default log4j-console.properties settings on a per-deployment level by putting the entire log configuration into spec.logConfiguration:

spec:
  ...
  logConfiguration:
    log4j-console.properties: |+
      rootLogger.level = DEBUG
      rootLogger.appenderRef.file.ref = LogFile
      ...      

FlinkDeployment Prometheus Configuration #

The following example shows how to enable the Prometheus metric reporter for the FlinkDeployment:

spec:
  ...
  flinkConfiguration:
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: 9249-9250