Kubernetes

Introduction #

Kubernetes is a popular container-orchestration system for automating computer application deployment, scaling, and management. Flink’s native Kubernetes integration allows you to directly deploy Flink on a running Kubernetes cluster. Moreover, Flink is able to dynamically allocate and de-allocate TaskManagers depending on the required resources because it can directly talk to Kubernetes.

Apache Flink also provides a Kubernetes operator for managing Flink clusters on Kubernetes. It supports both standalone and native deployment mode and greatly simplifies deployment, configuration and the life cycle management of Flink resources on Kubernetes.

For more information, please refer to the Flink Kubernetes Operator documentation.

Preparation #

The doc assumes a running Kubernetes cluster fulfilling the following requirements:

  • Kubernetes >= 1.9.
  • KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods.
  • Enabled Kubernetes DNS.
  • default service account with RBAC permissions to create, delete pods.

If you have problems setting up a Kubernetes cluster, please take a look at how to setup a Kubernetes cluster.

Session Mode #

Flink runs on all UNIX-like environments, i.e. Linux, Mac OS X, and Cygwin (for Windows).
You can refer overview to check supported versions and download the binary release of Flink, then extract the archive:

tar -xzf flink-*.tgz

You should set FLINK_HOME environment variables like:

export FLINK_HOME=/path/flink-*

Start a session cluster #

To start a session cluster on k8s, run the bash script that comes with Flink:

cd /path/flink-*
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

After successful startup, the return information is as follows:

org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://my-first-flink-cluster-rest.default:8081

please refer to Flink documentation to expose Flink’s Web UI and REST endpoint.
You should ensure that REST endpoint can be accessed by the node of your submission.
Then, you need to add these two config to your flink-conf.yaml:

rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}

{{REST_PORT}} and {{NODE_IP}} should be replaced by the actual values of your JobManager Web Interface.

Download the tar file of Flink CDC from release page, then extract the archive:

tar -xzf flink-cdc-*.tar.gz

Extracted flink-cdc contains four directories: bin,lib,log and conf.

Download the connector jars from release page, and move it to the lib directory.
Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on specific branch by yourself.

Here is an example file for synchronizing the entire database mysql-to-doris.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
 type: mysql
 hostname: localhost
 port: 3306
 username: root
 password: 123456
 tables: app_db.\.*
 server-id: 5400-5404
 server-time-zone: UTC

sink:
 type: doris
 fenodes: 127.0.0.1:8030
 username: root
 password: ""

pipeline:
 name: Sync MySQL Database to Doris
 parallelism: 2

You need to modify the configuration file according to your needs, refer to connectors more information.

Finally, submit job to Flink Standalone cluster using Cli.

cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml

After successful submission, the return information is as follows:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

Then you can find a job named Sync MySQL Database to Doris running through Flink Web UI.

Please note that submitting with native application mode and Flink Kubernetes operator are not supported for now.