This page describes how to deploy a Flink Job and Session cluster on Kubernetes.
Info This page describes deploying a standalone Flink cluster on top of Kubernetes. You can find more information on native Kubernetes deployments here.
Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using MiniKube.
minikube ssh 'sudo ip link set docker0 promisc on'
before deploying a Flink cluster.
Otherwise Flink components are not able to self reference themselves through a Kubernetes service.
Before deploying the Flink Kubernetes components, please read the Flink Docker image documentation, its tags, how to customize the Flink Docker image and enable plugins to use the image in the Kubernetes definition files.
Using the common resource definitions, launch the common cluster components
with the kubectl
command:
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
Note that you could define your own customized options of flink-conf.yaml
within flink-configuration-configmap.yaml
.
Then launch the specific components depending on whether you want to deploy a Session or Job cluster.
You can then access the Flink UI via different ways:
kubectl proxy
:
kubectl proxy
in a terminal.kubectl port-forward
:
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
to forward your jobmanager’s web ui port to local 8081../bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
NodePort
service on the rest service of jobmanager:
kubectl create -f jobmanager-rest-service.yaml
to create the NodePort
service on jobmanager. The example of jobmanager-rest-service.yaml
can be found in appendix.kubectl get svc flink-jobmanager-rest
to know the node-port
of this service and navigate to http://<public-node-ip>:<node-port> in your browser.minikube ip
.port-forward
solution, you could also use the following command below to submit jobs to the cluster:./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar
You can also access the queryable state of TaskManager if you create a NodePort
service for it:
kubectl create -f taskmanager-query-state-service.yaml
to create the NodePort
service on taskmanager. The example of taskmanager-query-state-service.yaml
can be found in appendix.kubectl get svc flink-taskmanager-query-state
to know the node-port
of this service. Then you can create the QueryableStateClient(<public-node-ip>, <node-port> to submit the state queries.In order to terminate the Flink cluster, delete the specific Session or Job cluster components
and use kubectl
to terminate the common components:
kubectl delete -f jobmanager-service.yaml
kubectl delete -f flink-configuration-configmap.yaml
# if created then also the rest service
kubectl delete -f jobmanager-rest-service.yaml
# if created then also the queryable state service
kubectl delete -f taskmanager-query-state-service.yaml
A Flink Session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.
A Flink Session cluster deployment in Kubernetes has at least three components:
After creating the common cluster components, use the Session specific resource definitions
to launch the Session cluster with the kubectl
command:
kubectl create -f jobmanager-session-deployment.yaml
kubectl create -f taskmanager-session-deployment.yaml
To terminate the Session cluster, these components can be deleted along with the common ones with the kubectl
command:
kubectl delete -f taskmanager-session-deployment.yaml
kubectl delete -f jobmanager-session-deployment.yaml
A Flink Job cluster is a dedicated cluster which runs a single job. You can find more details here.
A basic Flink Job cluster deployment in Kubernetes has three components:
Check the Job cluster specific resource definitions and adjust them accordingly.
The args
attribute in the jobmanager-job.yaml
has to specify the main class of the user job.
See also how to specify the JobManager arguments to understand
how to pass other args
to the Flink image in the jobmanager-job.yaml
.
The job artifacts should be available from the job-artifacts-volume
in the resource definition examples.
The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster.
If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts.
Alternatively, you can build a custom image which already contains the artifacts instead.
After creating the common cluster components, use the Job cluster specific resource definitions
to launch the cluster with the kubectl
command:
kubectl create -f jobmanager-job.yaml
kubectl create -f taskmanager-job-deployment.yaml
To terminate the single job cluster, these components can be deleted along with the common ones
with the kubectl
command:
kubectl delete -f taskmanager-job-deployment.yaml
kubectl delete -f jobmanager-job.yaml
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
jobmanager-rest-service.yaml
. Optional service, that exposes the jobmanager rest
port as public Kubernetes node’s port.
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
taskmanager-query-state-service.yaml
. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.
apiVersion: v1
kind: Service
metadata:
name: flink-taskmanager-query-state
spec:
type: NodePort
ports:
- name: query-state
port: 6125
targetPort: 6125
nodePort: 30025
selector:
app: flink
component: taskmanager
jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.11.6-scala_2.11
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.11.6-scala_2.11
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
jobmanager-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: flink:1.11.6-scala_2.11
env:
args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.11.6-scala_2.11
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts