This page describes how to deploy Flink natively on Kubernetes.
This Getting Started section guides you through setting up a fully functional Flink Cluster on Kubernetes.
Kubernetes is a popular container-orchestration system for automating computer application deployment, scaling, and management. Flink’s native Kubernetes integration allows you to directly deploy Flink on a running Kubernetes cluster. Moreover, Flink is able to dynamically allocate and de-allocate TaskManagers depending on the required resources because it can directly talk to Kubernetes.
The Getting Started section assumes a running Kubernetes cluster fulfilling the following requirements:
~/.kube/config
. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods
.default
service account with RBAC permissions to create, delete pods.If you have problems setting up a Kubernetes cluster, then take a look at how to setup a Kubernetes cluster.
Once you have your Kubernetes cluster running and kubectl
is configured to point to it, you can launch a Flink cluster in Session Mode via
Note When using Minikube, you need to call minikube tunnel
in order to expose Flink’s LoadBalancer service on Minikube.
Congratulations! You have successfully run a Flink application by deploying Flink on Kubernetes.
For production use, we recommend deploying Flink Applications in the Application Mode, as these modes provide a better isolation for the Applications.
The Application Mode requires that the user code is bundled together with the Flink image because it runs the user code’s main()
method on the cluster.
The Application Mode makes sure that all Flink components are properly cleaned up after the termination of the application.
The Flink community provides a base Docker image which can be used to bundle the user code:
After creating and publishing the Docker image under custom-image-name
, you can start an Application cluster with the following command:
Note local
is the only supported scheme in Application Mode.
The kubernetes.cluster-id
option specifies the cluster name and must be unique.
If you do not specify this option, then Flink will generate a random name.
The kubernetes.container.image
option specifies the image to start the pods with.
Once the application cluster is deployed you can interact with it:
You can override configurations set in conf/flink-conf.yaml
by passing key-value pairs -Dkey=value
to bin/flink
.
Flink on Kubernetes does not support Per-Job Cluster Mode.
You have seen the deployment of a Session cluster in the Getting Started guide at the top of this page.
The Session Mode can be executed in two modes:
detached mode (default): The kubernetes-session.sh
deploys the Flink cluster on Kubernetes and then terminates.
attached mode (-Dexecution.attached=true
): The kubernetes-session.sh
stays alive and allows entering commands to control the running Flink cluster.
For example, stop
stops the running Session cluster.
Type help
to list all supported commands.
In order to re-attach to a running Session cluster with the cluster id my-first-flink-cluster
use the following command:
You can override configurations set in conf/flink-conf.yaml
by passing key-value pairs -Dkey=value
to bin/kubernetes-session.sh
.
In order to stop a running Session Cluster with cluster id my-first-flink-cluster
you can either delete the Flink deployment or use:
The Kubernetes-specific configuration options are listed on the configuration page.
Flink uses Fabric8 Kubernetes client to communicate with Kubernetes APIServer to create/delete Kubernetes resources(e.g. Deployment, Pod, ConfigMap, Service, etc.), as well as watch the Pods and ConfigMaps. Except for the above Flink config options, some expert options of Fabric8 Kubernetes client could be configured via system properties or environment variables.
For example, users could use the following Flink config options to set the concurrent max requests, which allows running more jobs in a session cluster when Kubernetes HA Services are used.
Please note that, each Flink job will consume 3
concurrent requests.
containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
env.java.opts.jobmanager: "-Dkubernetes.max.concurrent.requests=200"
Flink’s Web UI and REST endpoint can be exposed in several ways via the kubernetes.rest-service.exposed.type configuration option.
localhost:8081
to submit a Flink job to the session or view the dashboard.NodePort: Exposes the service on each Node’s IP at a static port (the NodePort
).
<NodeIP>:<NodePort>
can be used to contact the JobManager service.
NodeIP
can also be replaced with the Kubernetes ApiServer address.
You can find its address in your kube config file.
LoadBalancer: Exposes the service externally using a cloud provider’s load balancer.
Since the cloud provider and Kubernetes needs some time to prepare the load balancer, you may get a NodePort
JobManager Web Interface in the client log.
You can use kubectl get services/<cluster-id>-rest
to get EXTERNAL-IP and construct the load balancer JobManager Web Interface manually http://<EXTERNAL-IP>:8081
.
Please refer to the official documentation on publishing services in Kubernetes for more information.
The Kubernetes integration exposes conf/log4j-console.properties
and conf/logback-console.xml
as a ConfigMap to the pods.
Changes to these files will be visible to a newly started cluster.
By default, the JobManager and TaskManager will output the logs to the console and /opt/flink/log
in each pod simultaneously.
The STDOUT
and STDERR
output will only be redirected to the console.
You can access them via
If the pod is running, you can also use kubectl exec -it <pod-name> bash
to tunnel in and view the logs or debug the process.
Flink will automatically de-allocate idling TaskManagers in order to not waste resources. This behaviour can make it harder to access the logs of the respective pods. You can increase the time before idling TaskManagers are released by configuring resourcemanager.taskmanager-timeout so that you have more time to inspect the log files.
If you have configured your logger to detect configuration changes automatically, then you can dynamically adapt the log level by changing the respective ConfigMap (assuming that the cluster id is my-first-flink-cluster
):
In order to use plugins, you must copy them to the correct location in the Flink JobManager/TaskManager pod. You can use the built-in plugins without mounting a volume or building a custom Docker image. For example, use the following command to enable the S3 plugin for your Flink session cluster.
If you want to use a custom Docker image, then you can specify it via the configuration option kubernetes.container.image
.
The Flink community provides a rich Flink Docker image which can be a good starting point.
See how to customize Flink’s Docker image for how to enable plugins, add dependencies and other options.
Kubernetes Secrets is an object that contains a small amount of sensitive data such as a password, a token, or a key. Such information might otherwise be put in a pod specification or in an image. Flink on Kubernetes can use Secrets in two ways:
Using Secrets as files from a pod;
Using Secrets as environment variables;
The following command will mount the secret mysecret
under the path /path/to/secret
in the started pods:
The username and password of the secret mysecret
can then be found stored in the files /path/to/secret/username
and /path/to/secret/password
.
For more details see the official Kubernetes documentation.
The following command will expose the secret mysecret
as environment variable in the started pods:
The env variable SECRET_USERNAME
contains the username and the env variable SECRET_PASSWORD
contains the password of the secret mysecret
.
For more details see the official Kubernetes documentation.
For high availability on Kubernetes, you can use the existing high availability services.
Flink uses Kubernetes OwnerReference’s to clean up all cluster components.
All the Flink created resources, including ConfigMap
, Service
, and Pod
, have the OwnerReference
being set to deployment/<cluster-id>
.
When the deployment is deleted, all related resources will be deleted automatically.
Currently, all Kubernetes versions >= 1.9
are supported.
Namespaces in Kubernetes divide cluster resources between multiple users via resource quotas. Flink on Kubernetes can use namespaces to launch Flink clusters. The namespace can be configured via kubernetes.namespace.
Role-based access control (RBAC) is a method of regulating access to compute or network resources based on the roles of individual users within an enterprise. Users can configure RBAC roles and service accounts used by JobManager to access the Kubernetes API server within the Kubernetes cluster.
Every namespace has a default service account. However, the default
service account may not have the permission to create or delete pods within the Kubernetes cluster.
Users may need to update the permission of the default
service account or specify another service account that has the right role bound.
If you do not want to use the default
service account, use the following command to create a new flink-service-account
service account and set the role binding.
Then use the config option -Dkubernetes.service-account=flink-service-account
to make the JobManager pod use the flink-service-account
service account to create/delete TaskManager pods and leader ConfigMaps.
Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager.
Please refer to the official Kubernetes documentation on RBAC Authorization for more information.