This documentation is for an unreleased version of the Apache Flink Kubernetes Operator. We recommend you use the latest stable version.
Configuration #
Specifying Operator Configuration #
The operator allows users to specify default configuration that will be shared by the Flink operator itself and the Flink deployments.
These configuration files are mounted externally via ConfigMaps. The Configuration files with default values are shipped in the Helm chart. It is recommended to review and adjust them if needed in the values.yaml
file before deploying the Operator in production environments.
To append to the default configuration, define the flink-conf.yaml
key in the defaultConfiguration
section of the Helm values.yaml
file:
defaultConfiguration:
create: true
# Set append to false to replace configuration files
append: true
flink-conf.yaml: |+
# Flink Config Overrides
kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
kubernetes.operator.reconcile.interval: 15 s
kubernetes.operator.observer.progress-check.interval: 5 s
To learn more about metrics and logging configuration please refer to the dedicated docs page.
Flink Version and Namespace specific defaults #
The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc).
# Flink Version specific defaults
kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
# Namespace specific defaults
kubernetes.operator.default-configuration.namespace.ns1.k1: v1
kubernetes.operator.default-configuration.namespace.ns1.k2: v2
kubernetes.operator.default-configuration.namespace.ns2.k1: v1
Flink version specific defaults will have a higher precedence so namespace defaults would be overridden by the same key.
Dynamic Operator Configuration #
The Kubernetes operator supports dynamic config changes through the operator ConfigMaps. Dynamic operator configuration is enabled by default, and can be disabled by setting kubernetes.operator.dynamic.config.enabled
to false. Time interval for checking dynamic config changes is specified by kubernetes.operator.dynamic.config.check.interval
of which default value is 5 minutes.
Verify whether dynamic operator configuration updates is enabled via the deploy/flink-kubernetes-operator
log has:
2022-05-28 13:08:29,222 o.a.f.k.o.c.FlinkConfigManager [INFO ] Enabled dynamic config updates, checking config changes every PT5M
To change config values dynamically the ConfigMap can be directly edited via kubectl patch
or kubectl edit
command. For example to change the reschedule interval you can override kubernetes.operator.reconcile.interval
.
Verify whether the config value of kubernetes.operator.reconcile.interval
is updated to 30 seconds via the deploy/flink-kubernetes-operator
log has:
2022-05-28 13:08:30,115 o.a.f.k.o.c.FlinkConfigManager [INFO ] Updating default configuration to {kubernetes.operator.reconcile.interval=PT30S}
Leader Election and High Availability #
The operator supports high availability through leader election and standby operator instances. To enable leader election you need to add the following two mandatory operator configuration parameters.
kubernetes.operator.leader-election.enabled: true
kubernetes.operator.leader-election.lease-name: flink-operator-lease
Lease name must be unique in the current lease namespace. For other more advanced config parameters please refer to the configuration reference.
Once you enabled leader election you can increase the replicas
for the operator Deployment using the Helm chart to enable high availability.
If replicas
value is greater than 1, you can define topologySpreadConstraints
via operatorPod.topologySpreadConstraints
.
Environment variables #
The operator exposes several environment variables which can be used for custom plugins.
Name | Description | FieldRef |
---|---|---|
HOST_IP | The host which the pod is deployed on | status.hostIP |
POD_IP | Pod IP | status.podIP |
POD_NAME | Pod Name | metadata.name |
Operator Configuration Reference #
System Configuration #
General operator system configuration. Cannot be overridden on a per-resource basis.
Key | Default | Type | Description |
---|---|---|---|
kubernetes.operator.dynamic.namespaces.enabled |
false | Boolean | Enables dynamic change of watched/monitored namespaces. |
kubernetes.operator.exception.field.max.length |
2048 | Integer | Maximum length of each exception field including stack trace to be included in CR status error field. |
kubernetes.operator.exception.stacktrace.enabled |
false | Boolean | Enable exception stacktrace to be included in CR status error field. |
kubernetes.operator.exception.stacktrace.max.length |
2048 | Integer | Maximum length of stacktrace to be included in CR status error field. |
kubernetes.operator.exception.throwable.list.max.count |
2 | Integer | Maximum number of throwable to be included in CR status error field. |
kubernetes.operator.flink.client.cancel.timeout |
1 min | Duration | The timeout for the reconciler to wait for flink to cancel job. |
kubernetes.operator.flink.client.timeout |
10 s | Duration | The timeout for the observer to wait the flink rest client to return. |
kubernetes.operator.leader-election.enabled |
false | Boolean | Enable leader election for the operator to allow running standby instances. |
kubernetes.operator.leader-election.lease-duration |
15 s | Duration | Leader election lease duration. |
kubernetes.operator.leader-election.lease-name |
(none) | String | Leader election lease name, must be unique for leases in the same namespace. |
kubernetes.operator.leader-election.renew-deadline |
10 s | Duration | Leader election renew deadline. |
kubernetes.operator.leader-election.retry-period |
2 s | Duration | Leader election retry period. |
kubernetes.operator.rate-limiter.limit |
5 | Integer | Max number of reconcile loops triggered within the rate limiter refresh period for each resource. Setting the limit <= 0 disables the limiter. |
kubernetes.operator.rate-limiter.refresh-period |
15 s | Duration | Operator rate limiter refresh period for each resource. |
kubernetes.operator.reconcile.interval |
1 min | Duration | The interval for the controller to reschedule the reconcile process. |
kubernetes.operator.reconcile.parallelism |
50 | Integer | The maximum number of threads running the reconciliation loop. Use -1 for infinite. |
kubernetes.operator.resource.cleanup.timeout |
5 min | Duration | The timeout for the resource clean up to wait for flink to shutdown cluster. |
kubernetes.operator.retry.initial.interval |
5 s | Duration | Initial interval of retries on unhandled controller errors. |
kubernetes.operator.retry.interval.multiplier |
1.5 | Double | Interval multiplier of retries on unhandled controller errors. |
kubernetes.operator.retry.max.attempts |
15 | Integer | Max attempts of retries on unhandled controller errors. |
kubernetes.operator.retry.max.interval |
(none) | Duration | Max interval of retries on unhandled controller errors. |
kubernetes.operator.user.artifacts.base.dir |
"/opt/flink/artifacts" | String | The base dir to put the session job artifacts. |
kubernetes.operator.watched.namespaces |
"JOSDK_ALL_NAMESPACES" | String | Comma separated list of namespaces the operator monitors for custom resources. |
Resource/User Configuration #
These options can be configured on both an operator and a per-resource level. When set under spec.flinkConfiguration
for the Flink resources it will override the default value provided in the operator default configuration (flink-conf.yaml
).
Key | Default | Type | Description |
---|---|---|---|
kubernetes.operator.checkpoint.trigger.grace-period |
1 min | Duration | The interval before a checkpoint trigger attempt is marked as unsuccessful. |
kubernetes.operator.checkpoint.type |
FULL | Enum |
Type of checkpoint. Possible values:
|
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled |
true | Boolean | Whether to enable checkpoint progress health check for clusters. |
kubernetes.operator.cluster.health-check.checkpoint-progress.window |
(none) | Duration | If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes. |
kubernetes.operator.cluster.health-check.enabled |
false | Boolean | Whether to enable health check for clusters. |
kubernetes.operator.cluster.health-check.restarts.threshold |
64 | Integer | The threshold which is checked against job restart count within a configured window. If the restart count is reaching the threshold then full cluster restart is initiated. |
kubernetes.operator.cluster.health-check.restarts.window |
2 min | Duration | The duration of the time window where job restart count measured. |
kubernetes.operator.deployment.readiness.timeout |
5 min | Duration | The timeout for deployments to become ready/stable before being rolled back if rollback is enabled. |
kubernetes.operator.deployment.rollback.enabled |
false | Boolean | Whether to enable rolling back failed deployment upgrades. |
kubernetes.operator.exception.label.mapper |
Map | Key-Value pair where key is the REGEX to filter through the exception messages and value is the string to be included in CR status error label field if the REGEX matches. Expected format: headerKey1:headerValue1,headerKey2:headerValue2. | |
kubernetes.operator.jm-deployment-recovery.enabled |
true | Boolean | Whether to enable recovery of missing/deleted jobmanager deployments. |
kubernetes.operator.jm-deployment.shutdown-ttl |
1 d | Duration | Time after which jobmanager pods of terminal application deployments are shut down. |
kubernetes.operator.jm-deployment.startup.probe.enabled |
true | Boolean | Enable job manager startup probe to allow detecting when the jobmanager could not submit the job. |
kubernetes.operator.job.drain-on-savepoint-deletion |
false | Boolean | Indicate whether the job should be drained when stopping with savepoint. |
kubernetes.operator.job.restart.failed |
false | Boolean | Whether to restart failed jobs. |
kubernetes.operator.job.savepoint-on-deletion |
false | Boolean | Indicate whether a savepoint must be taken when deleting a FlinkDeployment or FlinkSessionJob. |
kubernetes.operator.job.upgrade.ignore-pending-savepoint |
false | Boolean | Whether to ignore pending savepoint during job upgrade. |
kubernetes.operator.job.upgrade.inplace-scaling.enabled |
true | Boolean | Whether to enable inplace scaling for Flink 1.18+ using the resource requirements API. On failure or earlier Flink versions it falls back to regular full redeployment. |
kubernetes.operator.job.upgrade.last-state-fallback.enabled |
true | Boolean | Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled. |
kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age |
(none) | Duration | Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered. |
kubernetes.operator.periodic.checkpoint.interval |
(none) | String | Option to enable automatic checkpoint triggering. Can be specified either as a Duration type (i.e. '10m') or as a cron expression in Quartz format (6 or 7 positions, see http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The triggering schedule is not guaranteed, checkpoints will be triggered as part of the regular reconcile loop. NOTE: checkpoints are generally managed by Flink. This setting isn't meant to replace Flink's checkpoint settings, but to complement them in special cases. For instance, a full checkpoint might need to be occasionally triggered to break the chain of incremental checkpoints and consolidate the partial incremental files. WARNING: not intended to be used together with the cron-based periodic checkpoint triggering |
kubernetes.operator.periodic.savepoint.interval |
(none) | String | Option to enable automatic savepoint triggering. Can be specified either as a Duration type (i.e. '10m') or as a cron expression in Quartz format (6 or 7 positions, see http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop. WARNING: not intended to be used together with the cron-based periodic savepoint triggering |
kubernetes.operator.plugins.listeners.<listener-name>.class |
(none) | String | Custom plugins listener class, 'listener-name' is the name of the plugin listener, and its value is a fully qualified class name. |
kubernetes.operator.pod-template.merge-arrays-by-name |
false | Boolean | Configure the array merge behaviour during pod merging. Arrays can be either merged by position or name matching. |
kubernetes.operator.savepoint.cleanup.enabled |
true | Boolean | Whether to enable clean up of savepoint FlinkStateSnapshot resources. Savepoint state will be disposed of as well if the snapshot CR spec is configured as such. For automatic savepoints this can be configured via the kubernetes.operator.savepoint.dispose-on-delete config option. |
kubernetes.operator.savepoint.dispose-on-delete |
false | Boolean | Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted. |
kubernetes.operator.savepoint.format.type |
CANONICAL | Enum |
Type of the binary format in which a savepoint should be taken. Possible values:
|
kubernetes.operator.savepoint.history.max.age |
1 d | Duration | Maximum age for savepoint FlinkStateSnapshot resources to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age. |
kubernetes.operator.savepoint.history.max.count |
10 | Integer | Maximum number of savepoint FlinkStateSnapshot resources entries to retain. |
kubernetes.operator.savepoint.trigger.grace-period |
1 min | Duration | The interval before a savepoint trigger attempt is marked as unsuccessful. |
kubernetes.operator.snapshot.resource.enabled |
true | Boolean | Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to legacy mode during runtime if the CRD is not found, even if this value is true. |
kubernetes.operator.user.artifacts.http.header |
(none) | Map | Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2. |
Autoscaler Configuration #
Like other resource options these can be configured on both an operator and a per-resource level. When set under spec.flinkConfiguration
for the Flink resources it will override the default value provided in the operator default configuration (flink-conf.yaml
).
Note: The option prefix
kubernetes.operator.
was removed in FLIP-334, because the autoscaler module was decoupled from flink-kubernetes-operator.
Key | Default | Type | Description |
---|---|---|---|
job.autoscaler.backlog-processing.lag-threshold |
5 min | Duration | Lag threshold which will prevent unnecessary scalings while removing the pending messages responsible for the lag. |
job.autoscaler.catch-up.duration |
30 min | Duration | The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling. |
job.autoscaler.enabled |
false | Boolean | Enable job autoscaler module. |
job.autoscaler.excluded.periods |
List<String> | A (semicolon-separated) list of expressions indicate excluded periods during which autoscaling execution is forbidden, the expression consist of two optional subexpressions concatenated with &&, one is cron expression in Quartz format (6 or 7 positions), for example, * * 9-11,14-16 * * ? means exclude from 9:00:00am to 11:59:59am and from 2:00:00pm to 4:59:59pm every day, * * * ? * 2-6 means exclude every weekday, etc.see http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html for the usage of cron expression.Caution: in most case cron expression is enough, we introduce the other subexpression: daily expression, because cron can only represent integer hour period without minutes and seconds suffix, daily expression's formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 9:30:30-10:50:20 in Monday and Thursday we can express it as 9:30:30-10:50:20 && * * * ? * 2,5 | |
job.autoscaler.flink.rest-client.timeout |
10 s | Duration | The timeout for waiting the flink rest client to return. |
job.autoscaler.history.max.age |
1 d | Duration | Maximum age for past scaling decisions to retain. |
job.autoscaler.history.max.count |
3 | Integer | Maximum number of past scaling decisions to retain per vertex. |
job.autoscaler.memory.gc-pressure.threshold |
1.0 | Double | Max allowed GC pressure (percentage spent garbage collecting) during scaling operations. Autoscaling will be paused if the GC pressure exceeds this limit. |
job.autoscaler.memory.heap-usage.threshold |
1.0 | Double | Max allowed percentage of heap usage during scaling operations. Autoscaling will be paused if the heap usage exceeds this threshold. |
job.autoscaler.memory.tuning.enabled |
false | Boolean | If enabled, the initial amount of memory specified for TaskManagers will be reduced/increased according to the observed needs. |
job.autoscaler.memory.tuning.maximize-managed-memory |
false | Boolean | If enabled and managed memory is used (e.g. RocksDB turned on), any reduction of heap, network, or metaspace memory will increase the managed memory. |
job.autoscaler.memory.tuning.overhead |
0.2 | Double | Overhead to add to tuning decisions (0-1). This ensures spare capacity and allows the memory to grow beyond the dynamically computed limits, but never beyond the original memory limits. |
job.autoscaler.memory.tuning.scale-down-compensation.enabled |
true | Boolean | If this option is enabled and memory tuning is enabled, TaskManager memory will be increased when scaling down. This ensures that after applying memory tuning there is sufficient memory when running with fewer TaskManagers. |
job.autoscaler.metrics.busy-time.aggregator |
MAX | Enum |
Metric aggregator to use for busyTime metrics. This affects how true processing/output rate will be computed. Using max allows us to handle jobs with data skew more robustly, while avg may provide better stability when we know that the load distribution is even. Possible values:
|
job.autoscaler.metrics.window |
15 min | Duration | Scaling metrics aggregation window size. |
job.autoscaler.observed-true-processing-rate.lag-threshold |
30 s | Duration | Lag threshold for enabling observed true processing rate measurements. |
job.autoscaler.observed-true-processing-rate.min-observations |
2 | Integer | Minimum nr of observations used when estimating / switching to observed true processing rate. |
job.autoscaler.observed-true-processing-rate.switch-threshold |
0.15 | Double | Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup. |
job.autoscaler.quota.cpu |
(none) | Double | Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen. |
job.autoscaler.quota.memory |
(none) | MemorySize | Quota of the memory size. When scaling would go beyond this number the the scaling is not going to happen. |
job.autoscaler.restart.time |
5 min | Duration | Expected restart time to be used until the operator can determine it reliably from history. |
job.autoscaler.restart.time-tracking.enabled |
false | Boolean | Whether to use the actual observed rescaling restart times instead of the fixed 'job.autoscaler.restart.time' configuration. If set to true, the maximum restart duration over a number of samples will be used. The value of 'job.autoscaler.restart.time-tracking.limit' will act as an upper bound, and the value of 'job.autoscaler.restart.time' will still be used when there are no rescale samples. |
job.autoscaler.restart.time-tracking.limit |
15 min | Duration | Maximum cap for the observed restart time when 'job.autoscaler.restart.time-tracking.enabled' is set to true. |
job.autoscaler.scale-down.interval |
1 h | Duration | The delay time for scale down to be executed. If it is greater than 0, the scale down will be delayed. Delayed rescale can merge multiple scale downs within `scale-down.interval` into a scale down, thereby reducing the number of rescales. Reducing the frequency of job restarts can improve job availability. Scale down can be executed directly if it's less than or equal 0. |
job.autoscaler.scale-down.max-factor |
0.6 | Double | Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism. |
job.autoscaler.scale-up.max-factor |
100000.0 | Double | Max scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism. |
job.autoscaler.scaling.effectiveness.detection.enabled |
false | Boolean | Whether to enable detection of ineffective scaling operations and allowing the autoscaler to block further scale ups. |
job.autoscaler.scaling.effectiveness.threshold |
0.1 | Double | Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective. |
job.autoscaler.scaling.enabled |
true | Boolean | Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs. |
job.autoscaler.scaling.event.interval |
30 min | Duration | Time interval to resend the identical event |
job.autoscaler.stabilization.interval |
5 min | Duration | Stabilization period in which no new scaling will be executed |
job.autoscaler.target.utilization |
0.7 | Double | Target vertex utilization |
job.autoscaler.target.utilization.boundary |
0.3 | Double | Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)] |
job.autoscaler.vertex.exclude.ids |
List<String> | A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented. | |
job.autoscaler.vertex.max-parallelism |
200 | Integer | The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator. |
job.autoscaler.vertex.min-parallelism |
1 | Integer | The minimum parallelism the autoscaler can use. |
Autoscaler Standalone Configuration #
Unlike other resource options, these options only work with autoscaler standalone process.
Key | Default | Type | Description |
---|---|---|---|
autoscaler.standalone.control-loop.interval |
10 s | Duration | The interval of autoscaler standalone control loop. |
autoscaler.standalone.control-loop.parallelism |
100 | Integer | The parallelism of autoscaler standalone control loop. |
autoscaler.standalone.event-handler.type |
LOGGING | Enum |
The autoscaler event handler type. Possible values:
|
autoscaler.standalone.fetcher.flink-cluster.host |
"localhost" | String | The host name of flink cluster when the flink-cluster fetcher is used. |
autoscaler.standalone.fetcher.flink-cluster.port |
8081 | Integer | The port of flink cluster when the flink-cluster fetcher is used. |
autoscaler.standalone.jdbc.event-handler.ttl |
90 d | Duration | The time to live based on create time for the JDBC event handler records. When the config is set as '0', the ttl strategy for the records would be disabled. |
autoscaler.standalone.jdbc.password-env-variable |
"JDBC_PWD" | String | The environment variable name of jdbc password when autoscaler.standalone.state-store.type or autoscaler.standalone.event-handler.type has been set to JDBC . In general, the environment variable name doesn't need to be changed. Users need to export the password using this environment variable. |
autoscaler.standalone.jdbc.url |
(none) | String | The jdbc url when autoscaler.standalone.state-store.type or autoscaler.standalone.event-handler.type has been set to JDBC , such as: jdbc:mysql://localhost:3306/flink_autoscaler .This option is required when using JDBC state store or JDBC event handler. |
autoscaler.standalone.jdbc.username |
(none) | String | The jdbc username when autoscaler.standalone.state-store.type or autoscaler.standalone.event-handler.type has been set to JDBC . |
autoscaler.standalone.state-store.type |
MEMORY | Enum |
The autoscaler state store type. Possible values:
|
System Metrics Configuration #
Operator system metrics configuration. Cannot be overridden on a per-resource basis.
Key | Default | Type | Description |
---|---|---|---|
kubernetes.operator.josdk.metrics.enabled |
true | Boolean | Enable forwarding of Java Operator SDK metrics to the Flink metric registry. |
kubernetes.operator.jvm.metrics.enabled |
true | Boolean | Enable Kubernetes Operator JVM metrics. |
kubernetes.operator.kubernetes.client.metrics.enabled |
true | Boolean | Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server. |
kubernetes.operator.kubernetes.client.metrics.http.response.code.groups.enabled |
false | Boolean | Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx. |
kubernetes.operator.metrics.histogram.sample.size |
1000 | Integer | Defines the number of measured samples when calculating statistics. |
kubernetes.operator.metrics.scope.k8soperator.resource |
"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>.<resourcetype>" | String | Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource. |
kubernetes.operator.metrics.scope.k8soperator.resourcens |
"<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.<resourcetype>" | String | Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace. |
kubernetes.operator.metrics.scope.k8soperator.system |
"<host>.k8soperator.<namespace>.<name>.system" | String | Defines the scope format string that is applied to all metrics scoped to the kubernetes operator. |
kubernetes.operator.resource.lifecycle.metrics.enabled |
true | Boolean | Enable resource lifecycle state metrics. This enables both state and transition counts/histograms. |
kubernetes.operator.resource.lifecycle.namespace.histograms.enabled |
true | Boolean | In addition to the system level histograms, enable per namespace tracking of state and transition times. |
kubernetes.operator.resource.metrics.enabled |
true | Boolean | Enables metrics for FlinkDeployment and FlinkSessionJob custom resources. |
Advanced System Configuration #
Advanced operator system configuration. Cannot be overridden on a per-resource basis.
Key | Default | Type | Description |
---|---|---|---|
kubernetes.operator.cluster.resource-view.refresh-interval |
-1 min | Duration | How often to retrieve Kubernetes cluster resource usage information. This information is used to avoid running out of cluster resources when scaling up resources. Negative values disable the feature. |
kubernetes.operator.config.cache.size |
1000 | Integer | Max config cache size. |
kubernetes.operator.config.cache.timeout |
10 min | Duration | Expiration time for cached configs. |
kubernetes.operator.dynamic.config.check.interval |
5 min | Duration | Time interval for checking config changes. |
kubernetes.operator.dynamic.config.enabled |
true | Boolean | Whether to enable on-the-fly config changes through the operator configmap. |
kubernetes.operator.health.canary.resource.timeout |
1 min | Duration | Allowed max time between spec update and reconciliation for canary resources. |
kubernetes.operator.health.probe.enabled |
true | Boolean | Enables health probe for the kubernetes operator. |
kubernetes.operator.health.probe.port |
8085 | Integer | The port the health probe will use to expose the status. |
kubernetes.operator.label.selector |
(none) | String | Label selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported. |
kubernetes.operator.observer.progress-check.interval |
10 s | Duration | The interval for observing status for in-progress operations such as deployment and savepoints. |
kubernetes.operator.observer.rest-ready.delay |
10 s | Duration | Final delay before deployment is marked ready after port becomes accessible. |
kubernetes.operator.resource.deletion.propagation |
Foreground | Enum |
JM/TM Deployment deletion propagation. Possible values:
|
kubernetes.operator.savepoint.history.max.age.threshold |
(none) | Duration | Maximum age threshold for FlinkStateSnapshot resources to retain. |
kubernetes.operator.savepoint.history.max.count.threshold |
(none) | Integer | Maximum number threshold of savepoint FlinkStateSnapshot resources to retain. |
kubernetes.operator.startup.stop-on-informer-error |
true | Boolean | Whether informer errors should stop operator startup. If false, the startup will ignore recoverable errors, caused for example by RBAC issues and will retry periodically. |
kubernetes.operator.termination.timeout |
10 s | Duration | Operator shutdown timeout before reconciliation threads are killed. |
IPV6 Configuration #
If you run Flink Operator in IPV6 environment, the host name verification error will be triggered due to a known bug in Okhttp client. As a workaround before new Okhttp 5.0.0 release, the environment variable below needs to be set for both Flink Operator and Flink Deployment Configuration.
KUBERNETES_DISABLE_HOSTNAME_VERIFICATION=true