Configuration

Configuration #

Specifying Operator Configuration #

The operator allows users to specify default configuration that will be shared by the Flink operator itself and the Flink deployments.

These configuration files are mounted externally via ConfigMaps. The Configuration files with default values are shipped in the Helm chart. It is recommended to review and adjust them if needed in the values.yaml file before deploying the Operator in production environments.

To append to the default configuration, simply define the flink-conf.yaml key in the defaultConfiguration section of the Helm values.yaml file:

defaultConfiguration:
  create: true
  # Set append to false to replace configuration files
  append: true
  flink-conf.yaml: |+
    # Flink Config Overrides
    kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
    kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE

    kubernetes.operator.reconcile.interval: 15 s
    kubernetes.operator.observer.progress-check.interval: 5 s

To learn more about metrics and logging configuration please refer to the dedicated docs page.

Dynamic Operator Configuration #

The Kubernetes operator supports dynamic config changes through the operator ConfigMaps. Dynamic operator configuration is enabled by default, and can be disabled by setting kubernetes.operator.dynamic.config.enabled to false. Time interval for checking dynamic config changes is specified by kubernetes.operator.dynamic.config.check.interval of which default value is 5 minutes.

Verify whether dynamic operator configuration updates is enabled via the deploy/flink-kubernetes-operator log has:

2022-05-28 13:08:29,222 o.a.f.k.o.c.FlinkConfigManager [INFO ] Enabled dynamic config updates, checking config changes every PT5M

To change config values dynamically the ConfigMap can be directly edited via kubectl patch or kubectl edit command. For example to change the reschedule interval you can override kubernetes.operator.reconcile.interval.

Verify whether the config value of kubernetes.operator.reconcile.interval is updated to 30 seconds via the deploy/flink-kubernetes-operator log has:

2022-05-28 13:08:30,115 o.a.f.k.o.c.FlinkConfigManager [INFO ] Updating default configuration to {kubernetes.operator.reconcile.interval=PT30S}

Operator Configuration Reference #

System Configuration #

General operator system configuration. Cannot be overridden on a per-resource basis.

Key Default Type Description
kubernetes.operator.dynamic.namespaces.enabled
false Boolean Enables dynamic change of watched/monitored namespaces.
kubernetes.operator.flink.client.cancel.timeout
1 min Duration The timeout for the reconciler to wait for flink to cancel job.
kubernetes.operator.flink.client.timeout
10 s Duration The timeout for the observer to wait the flink rest client to return.
kubernetes.operator.reconcile.interval
1 min Duration The interval for the controller to reschedule the reconcile process.
kubernetes.operator.reconcile.parallelism
10 Integer The maximum number of threads running the reconciliation loop. Use -1 for infinite.
kubernetes.operator.resource.cleanup.timeout
1 min Duration The timeout for the resource clean up to wait for flink to shutdown cluster.
kubernetes.operator.retry.initial.interval
5 s Duration Initial interval of automatic reconcile retries on recoverable errors.
kubernetes.operator.retry.interval.multiplier
2.0 Double Interval multiplier of automatic reconcile retries on recoverable errors.
kubernetes.operator.retry.max.attempts
10 Integer Max attempts of automatic reconcile retries on recoverable errors.
kubernetes.operator.user.artifacts.base.dir
"/opt/flink/artifacts" String The base dir to put the session job artifacts.
kubernetes.operator.watched.namespaces
"JOSDK_ALL_NAMESPACES" String Comma separated list of namespaces the operator monitors for custom resources.

Resource/User Configuration #

These options can be configured on both an operator and a per-resource level. When set under spec.flinkConfiguration for the Flink resources it will override the default value provided in the operator default configuration (flink-conf.yaml).

Key Default Type Description
kubernetes.operator.deployment.readiness.timeout
1 min Duration The timeout for deployments to become ready/stable before being rolled back if rollback is enabled.
kubernetes.operator.deployment.rollback.enabled
false Boolean Whether to enable rolling back failed deployment upgrades.
kubernetes.operator.jm-deployment-recovery.enabled
true Boolean Whether to enable recovery of missing/deleted jobmanager deployments.
kubernetes.operator.job.upgrade.ignore-pending-savepoint
false Boolean Whether to ignore pending savepoint during job upgrade.
kubernetes.operator.job.upgrade.last-state-fallback.enabled
true Boolean Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.
kubernetes.operator.periodic.savepoint.interval
0 ms Duration Interval at which periodic savepoints will be triggered. The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop.
kubernetes.operator.savepoint.format.type
CANONICAL

Enum

Type of the binary format in which a savepoint should be taken.

Possible values:
  • "CANONICAL": A canonical, common for all state backends format. It lets you switch state backends.
  • "NATIVE": A format specific for the chosen state backend, in its native binary format. Might be faster to take and restore from than the canonical one.
kubernetes.operator.savepoint.history.max.age
86400000 ms Duration Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.
kubernetes.operator.savepoint.history.max.count
10 Integer Maximum number of savepoint history entries to retain.
kubernetes.operator.savepoint.trigger.grace-period
1 min Duration The interval before a savepoint trigger attempt is marked as unsuccessful.
kubernetes.operator.user.artifacts.http.header
(none) Map Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.

System Metrics Configuration #

Operator system metrics configuration. Cannot be overridden on a per-resource basis.

Key Default Type Description
kubernetes.operator.josdk.metrics.enabled
true Boolean Enable forwarding of Java Operator SDK metrics to the Flink metric registry.
kubernetes.operator.jvm.metrics.enabled
true Boolean Enable Kubernetes Operator JVM metrics.
kubernetes.operator.kubernetes.client.metrics.enabled
true Boolean Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.
kubernetes.operator.metrics.histogram.sample.size
1000 Integer Defines the number of measured samples when calculating statistics.
kubernetes.operator.metrics.scope.k8soperator.resource
"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>.<resourcetype>" String Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.
kubernetes.operator.metrics.scope.k8soperator.resourcens
"<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.<resourcetype>" String Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.
kubernetes.operator.metrics.scope.k8soperator.system
"<host>.k8soperator.<namespace>.<name>.system" String Defines the scope format string that is applied to all metrics scoped to the kubernetes operator.
kubernetes.operator.resource.lifecycle.metrics.enabled
true Boolean Enable resource lifecycle state metrics. This enables both state and transition counts/histograms.
kubernetes.operator.resource.lifecycle.namespace.histograms.enabled
true Boolean In addition to the system level histograms, enable per namespace tracking of state and transition times.
kubernetes.operator.resource.metrics.enabled
true Boolean Enables metrics for FlinkDeployment and FlinkSessionJob custom resources.

Advanced System Configuration #

Advanced operator system configuration. Cannot be overridden on a per-resource basis.

Key Default Type Description
kubernetes.operator.config.cache.size
1000 Integer Max config cache size.
kubernetes.operator.config.cache.timeout
10 min Duration Expiration time for cached configs.
kubernetes.operator.dynamic.config.check.interval
5 min Duration Time interval for checking config changes.
kubernetes.operator.dynamic.config.enabled
true Boolean Whether to enable on-the-fly config changes through the operator configmap.
kubernetes.operator.health.probe.enabled
true Boolean Enables health probe for the kubernetes operator.
kubernetes.operator.health.probe.port
8085 Integer The port the health probe will use to expose the status.
kubernetes.operator.label.selector
(none) String Label selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported.
kubernetes.operator.observer.progress-check.interval
10 s Duration The interval for observing status for in-progress operations such as deployment and savepoints.
kubernetes.operator.observer.rest-ready.delay
10 s Duration Final delay before deployment is marked ready after port becomes accessible.
kubernetes.operator.savepoint.history.max.age.threshold
(none) Duration Maximum age threshold for savepoint history entries to retain.
kubernetes.operator.savepoint.history.max.count.threshold
(none) Integer Maximum number threshold of savepoint history entries to retain.