This documentation is for an unreleased version of Apache Flink Stateful Functions. We recommend you use the latest stable version.
Deployment and Operations #
Stateful Functions runtime, which manages state and messaging for an application, is built on top of Apache Flink, which means it inherits Flink’s deployment and operations model. Read through the official Apache Flink documentation to learn how to run and maintain an application in production. The following pages outline Stateful Functions' specific concepts, configurations, and metrics.
Images #
The recommended deployment mode for Stateful Functions runtime is to use the official Docker image. This way, user code does not need to package any Apache Flink components. The provided image allows teams to package their applications with all the necessary runtime dependencies quickly.
The community provides images containing the entire Stateful Functions runtime: apache/flink-statefun:3.4-SNAPSHOT
.
All you need to do is attach your module configurations to the container.
The Flink community does not publish images for snapshot versions.
You can build this version locally by cloning the repo and following the instructions in tools/docker/README.md
Deployment #
The packaged image can then be deployed as a standalone Flink cluster on Kubernetes.
After creating the below cluster components, the application can be launched using the kubectl
command.
# Configuration and service definition
$ kubectl create -f application-module.yaml
$ kubectl create -f flink-config.yaml
$ kubectl create -f jobmanager-service.yaml
# Deploy the StateFun runtime
$ kubectl create -f jobmanager-job.yaml
$ kubectl create -f taskmanager-job-deployment-yaml
To terminate the cluster, simply delete the deployments.
$ kubectl delete -f taskmanager-job-deployment-yaml
$ kubectl delete -f jobmanager-job.yaml
$ kubectl delete -f jobmanager-service.yaml
$ kubectl delete -f flink-config.yaml
$ kubectl delete -f application-module.yaml
Cluster Components #
application-module.yaml
The application function and io module configuration. See the full documentation for more details.
apiVersion: v1
kind: ConfigMap
metadata:
name: application-module
namespace: statefun
labels:
app: statefun
data:
module.yaml: |+
kind: ...
spec:
...
---
kind: ...
spec:
...
---
...
flink-config.yaml
flink-config.yaml
contains all Flink runtime configurations for the cluster,
including both Apache Flink configurations and StatFun specific configurations.
In the Apache Flink documentation, you will often see this referred to as the flink-conf.yaml
file.
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: statefun
data:
flink-conf.yaml: |+
jobmanager.rpc.address: statefun-master
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
state.checkpoints.dir: s3://my-checkpoint-bucket
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.incremental: true
execution.checkpointing.interval: 10sec
execution.checkpointing.mode: EXACTLY_ONCE
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
restart-strategy.fixed-delay.delay: 1sec
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 1g
parallelism.default: 3
log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: statefun
component: jobmanager
jobmanager-rest-service.yaml
An optional service that exposes the jobmanager rest
port as a public Kubernetes nodes port.
apiVersion: v1
kind: Service
metadata:
name: statefun-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: statefun
component: jobmanager
jobmanager-application.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: jobmanager
template:
metadata:
labels:
app: statefun
component: master
spec:
containers:
- name: master
image: apache/flink-statefun:3.4-SNAPSHOT
imagePullPolicy: Always
env:
- name: ROLE
value: master
- name: MASTER_HOST
value: statefun-jobmanager
resources:
requests:
memory: "1.5Gi"
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: application-module
mountPath: /opt/statefun/modules/application-module
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: application-module
configMap:
name: application-module
items:
- key: module.yaml
path: module.yaml
taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-worker
namespace: statefun
spec:
replicas: 3
selector:
matchLabels:
app: statefun
component: worker
template:
metadata:
labels:
app: statefun
component: worker
spec:
containers:
- name: worker
image: apache/flink-statefun:3.4-SNAPSHOT
imagePullPolicy: Always
env:
- name: ROLE
value: worker
- name: MASTER_HOST
value: statefun-jobmanager
resources:
requests:
memory: "1.5Gi"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: greeter-module
mountPath: /opt/statefun/modules/greeter
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: application-module
configMap:
name: application-module
items:
- key: module.yaml
path: module.yaml