This Getting Started guide describes how to deploy a Session cluster on Kubernetes.
Introduction
This page describes deploying a standalone Flink cluster on top of Kubernetes, using Flink’s standalone deployment.
We generally recommend new users to deploy Flink on Kubernetes using native Kubernetes deployments.
Preparation
This guide expects a Kubernetes environment to be present. You can ensure that your Kubernetes setup is working by running a command like kubectl get nodes, which lists all connected Kubelets.
If you want to run Kubernetes locally, we recommend using MiniKube.
Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster.
Otherwise Flink components are not able to reference themselves through a Kubernetes service.
Starting a Kubernetes Cluster (Session Mode)
A Flink Session cluster is executed as a long-running Kubernetes Deployment. You can run multiple Flink jobs on a Session cluster.
Each job needs to be submitted to the cluster after the cluster has been deployed.
A Flink Session cluster deployment in Kubernetes has at least three components:
a Service exposing the JobManager’s REST and UI ports
Using the file contents provided in the the common resource definitions, create the following files, and create the respective components with the kubectl command:
# Configuration and service definition$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
# Create the deployments for the cluster$ kubectl create -f jobmanager-session-deployment.yaml
$ kubectl create -f taskmanager-session-deployment.yaml
Next, we set up a port forward to access the Flink UI and submit jobs:
Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
The args attribute in the jobmanager-job.yaml has to specify the main class of the user job.
See also how to specify the JobManager arguments to understand
how to pass other args to the Flink image in the jobmanager-job.yaml.
The job artifacts should be available from the job-artifacts-volume in the resource definition examples.
The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster.
If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts.
Alternatively, you can build a custom image which already contains the artifacts instead.
All configuration options are listed on the configuration page. Configuration options can be added to the flink-conf.yaml section of the flink-configuration-configmap.yaml config map.
Accessing Flink in Kubernetes
You can then access the Flink UI and submit jobs via different ways:
Moreover, you can use the following command below to submit jobs to the cluster:
Create a NodePort service on the rest service of jobmanager:
Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://<public-node-ip>:<node-port> in your browser.
If you use minikube, you can get its public ip by running minikube ip.
Similarly to the port-forward solution, you can also use the following command below to submit jobs to the cluster:
Debugging and Log Access
Many common errors are easy to detect by checking Flink’s log files. If you have access to Flink’s web user interface, you can access the JobManager and TaskManager logs from there.
If there are problems starting Flink, you can also use Kubernetes utilities to access the logs. Use kubectl get pods to see all running pods.
For the quickstart example from above, you should see three pods:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-589967dcfc-m49xv 1/1 Running 3 3m32s
flink-taskmanager-64847444ff-7rdl4 1/1 Running 3 3m28s
flink-taskmanager-64847444ff-nnd6m 1/1 Running 3 3m28s
You can now access the logs by running kubectl logs flink-jobmanager-589967dcfc-m49xv
Note The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to custom Flink image and enable plugins for more information.
Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps.
See how to configure service accounts for pods for more information.
When High-Availability is enabled, Flink will use its own HA-services for service discovery.
Therefore, JobManager pods should be started with their IP address instead of a Kubernetes service as its jobmanager.rpc.address.
Refer to the appendix for full configuration.
Standby JobManagers
Usually, it is enough to only start a single JobManager pod, because Kubernetes will restart it once the pod crashes.
If you want to achieve faster recovery, configure the replicas in jobmanager-session-deployment-ha.yaml or parallelism in jobmanager-application-ha.yaml to a value greater than 1 to start standby JobManagers.
Enabling Queryable State
You can access the queryable state of TaskManager if you create a NodePort service for it:
Run kubectl create -f taskmanager-query-state-service.yaml to create the NodePort service for the taskmanager pod. The example of taskmanager-query-state-service.yaml can be found in appendix.
jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.
jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.
taskmanager-query-state-service.yaml. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.