Kubernetes
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

简介 #

Kubernetes(K8s)是一种流行的容器编排系统,用于自动化部署、扩展和管理应用程序。Flink的原生Kubernetes集成允许您直接在正在运行的 Kubernetes 集群上部署 Flink。此外,Flink 能够根据所需资源动态分配和取消分配TaskManager,因为它可以直接与Kubernetes通信。

Apache Flink还提供了Kubernetes Operator,用于管理Kubernetes上的Flink集群。它支持独立部署和原生部署模式,极大简化了Flink在Kubernetes上的部署、配置和生命周期管理。

更多信息请参考:Flink Kubernetes Operator文档

准备 #

假设您正在运行的Kubernetes集群满足以下要求:

  • Kubernetes版本 >= 1.9。
  • KubeConfig,作为列出、创建、删除pods和services权限的入口,可通过~/.kube/config进行配置。 您可以通过运行命令:kubectl auth can-i <list|create|edit|delete> pods 来验证权限。
  • 已启用 Kubernetes DNS。
  • default用户具有创建、删除POD的 RBAC 权限。

如果您在配置Kubernetes集群时遇到问题,请参考:如何配置Kubernetes集群

Session模式 #

Flink可以在所有类UNIX环境上运行,即Linux、Mac OS X和Cygwin(适用于 Windows)。
您可以参考 overview页面,查看支持的Flink版本并下载发行包,然后解压:

tar -xzf flink-*.tgz

设置FLINK_HOME环境变量:

export FLINK_HOME=/path/flink-*

启动Session集群 #

要在k8s上启动Session集群,请运行 Flink 附带的 bash 脚本:

cd /path/flink-*
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

成功启动集群后,返回如下信息:

org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://my-first-flink-cluster-rest.default:8081

请参考Flink文档来暴露Flink Web UI和REST端口。
请确保您提交作业的节点可以访问REST端口。

然后,将以下两个配置添加到flink-conf.yaml中:

rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}

{{REST_PORT}}和{{NODE_IP}}替换为JobManager Web界面的实际值。

发行页面下载Flink CDC的tar文件并解压:

tar -xzf flink-cdc-*.tar.gz

解压后的flink-cdc包含四个目录: binliblogconf

发行页面下载连接器,并移动到lib路径下。
下载链接仅适用于稳定版本,SNAPSHOT依赖项需要您根据特定分支自行构建。

以下是mysql整库同步到doris的示例配置文件:mysql-to-doris.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
 type: mysql
 hostname: localhost
 port: 3306
 username: root
 password: 123456
 tables: app_db.\.*
 server-id: 5400-5404
 server-time-zone: UTC

sink:
 type: doris
 fenodes: 127.0.0.1:8030
 username: root
 password: ""

pipeline:
 name: Sync MySQL Database to Doris
 parallelism: 2

请参考连接器信息,按需修改配置文件。

最后,通过Cli将作业提交到Flink Standalone集群。

cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml

成功提交作业后,返回如下信息:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

通过Flink Web UI,您可以找到一个名为Sync MySQL Database to Doris的作业正在运行。

Kubernetes Operator模式 #

假设您已经在K8S集群上部署Flink Kubernetes Operator,您只需构建自定义的Flink CDC Docker镜像即可。

构建自定义Docker镜像 #

  1. 发行页面下载Flink CDC的tar文件和需要的连接器,并移动到Docker镜像构建目录。
    假设您的Docker构建目录为/opt/docker/flink-cdc,此时该目录下的文件结构如下:
    /opt/docker/flink-cdc
        ├── flink-cdc-3.3-SNAPSHOT-bin.tar.gz
        ├── flink-cdc-pipeline-connector-doris-3.3-SNAPSHOT.jar
        ├── flink-cdc-pipeline-connector-mysql-3.3-SNAPSHOT.jar
        ├── mysql-connector-java-8.0.27.jar
        └── ...
    
  2. 创建Dockerfile文件,从官方flink镜像构建出自定义镜像并添加Flink CDC的依赖。
    FROM flink:1.18.0-java8
    ADD *.jar $FLINK_HOME/lib/
    ADD flink-cdc*.tar.gz $FLINK_HOME/
    RUN mv $FLINK_HOME/flink-cdc-3.3-SNAPSHOT/lib/flink-cdc-dist-3.3-SNAPSHOT.jar $FLINK_HOME/lib/
    
    Docker镜像构建目录最终如下:
    /opt/docker/flink-cdc
        ├── Dockerfile
        ├── flink-cdc-3.3-SNAPSHOT-bin.tar.gz
        ├── flink-cdc-pipeline-connector-doris-3.3-SNAPSHOT.jar
        ├── flink-cdc-pipeline-connector-mysql-3.3-SNAPSHOT.jar
        ├── mysql-connector-java-8.0.27.jar
        └── ...
    
  3. 构建自定义镜像并推送至仓库
    docker build -t flink-cdc-pipeline:3.3-SNAPSHOT .
    
    docker push flink-cdc-pipeline:3.3-SNAPSHOT
    

以下是一个示例文件,请修改其中对应的连接参数为实际值:

---
apiVersion: v1
data:
  flink-cdc.yaml: |-
      parallelism: 4
      schema.change.behavior: EVOLVE      
  mysql-to-doris.yaml: |-
    source:
      type: mysql
      hostname: localhost
      port: 3306
      username: root
      password: 123456
      tables: app_db.\.*
      server-id: 5400-5404
      server-time-zone: UTC
    
    sink:
      type: doris
      fenodes: 127.0.0.1:8030
      username: root
      password: ""
    
    pipeline:
      name: Sync MySQL Database to Doris
      parallelism: 2    
kind: ConfigMap
metadata:
  name: flink-cdc-pipeline-configmap

创建FlinkDeployment YAML文件 #

以下是示例文件flink-cdc-pipeline-job.yaml

---
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-cdc-pipeline-job
spec:
  flinkConfiguration:
    classloader.resolve-order: parent-first
    state.checkpoints.dir: 'file:///tmp/checkpoints'
    state.savepoints.dir: 'file:///tmp/savepoints'
  flinkVersion: v1_18
  image: 'flink-cdc-pipeline:3.3-SNAPSHOT'
  imagePullPolicy: Always
  job:
    args:
      - '--use-mini-cluster'
      - /opt/flink/flink-cdc-3.3-SNAPSHOT/conf/mysql-to-doris.yaml
    entryClass: org.apache.flink.cdc.cli.CliFrontend
    jarURI: 'local:///opt/flink/flink-cdc-3.3-SNAPSHOT/lib/flink-cdc-dist-3.3-SNAPSHOT.jar'
    parallelism: 1
    state: running
    upgradeMode: savepoint
  jobManager:
    replicas: 1
    resource:
      cpu: 1
      memory: 1024m
  podTemplate:
    apiVersion: v1
    kind: Pod
    spec:
      containers:
        # don't modify this name
        - name: flink-main-container
          volumeMounts:
            - mountPath: /opt/flink/flink-cdc-3.3-SNAPSHOT/conf
              name: flink-cdc-pipeline-config
      volumes:
        - configMap:
            name: flink-cdc-pipeline-configmap
          name: flink-cdc-pipeline-config
  restartNonce: 0
  serviceAccount: flink
  taskManager:
    resource:
      cpu: 1
      memory: 1024m
  1. 由于Flink的类加载机制,参数classloader.resolve-order必须设置为parent-first
  2. Flink CDC默认提交作业到远程Flink集群,在Operator模式下,您需要通过指定--use-mini-cluster参数在pod内部启动一个Standalone Flink集群。

ConfigMap和FlinkDeployment YAML文件创建完成后,即可通过kubectl提交作业到Operator:

kubectl apply -f flink-cdc-pipeline-job.yaml

成功提交作业后,返回信息如下:

flinkdeployment.flink.apache.org/flink-cdc-pipeline-job created

如您需要查看日志、暴露Flink Web UI等,请参考:Flink Kubernetes Operator文档

请注意,目前不支持使用native application mode提交作业。