This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
简介 #
Kubernetes(K8s)是一种流行的容器编排系统,用于自动化部署、扩展和管理应用程序。Flink的原生Kubernetes集成允许您直接在正在运行的 Kubernetes 集群上部署 Flink。此外,Flink 能够根据所需资源动态分配和取消分配TaskManager,因为它可以直接与Kubernetes通信。
Apache Flink还提供了Kubernetes Operator,用于管理Kubernetes上的Flink集群。它支持独立部署和原生部署模式,极大简化了Flink在Kubernetes上的部署、配置和生命周期管理。
更多信息请参考:Flink Kubernetes Operator文档。
准备 #
假设您正在运行的Kubernetes集群满足以下要求:
- Kubernetes版本 >= 1.9。
- KubeConfig,作为列出、创建、删除pods和services权限的入口,可通过
~/.kube/config
进行配置。 您可以通过运行命令:kubectl auth can-i <list|create|edit|delete> pods
来验证权限。 - 已启用 Kubernetes DNS。
default
用户具有创建、删除POD的 RBAC 权限。
如果您在配置Kubernetes集群时遇到问题,请参考:如何配置Kubernetes集群。
Session模式 #
Flink可以在所有类UNIX环境上运行,即Linux、Mac OS X和Cygwin(适用于 Windows)。
您可以参考 overview页面,查看支持的Flink版本并下载发行包,然后解压:
tar -xzf flink-*.tgz
设置FLINK_HOME
环境变量:
export FLINK_HOME=/path/flink-*
启动Session集群 #
要在k8s上启动Session集群,请运行 Flink 附带的 bash 脚本:
cd /path/flink-*
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
成功启动集群后,返回如下信息:
org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://my-first-flink-cluster-rest.default:8081
请参考Flink文档来暴露Flink Web UI和REST端口。
请确保您提交作业的节点可以访问REST端口。
然后,将以下两个配置添加到flink-conf.yaml中:
rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}
{{REST_PORT}}和{{NODE_IP}}替换为JobManager Web界面的实际值。
配置Flink CDC #
从发行页面下载Flink CDC的tar文件并解压:
tar -xzf flink-cdc-*.tar.gz
解压后的flink-cdc
包含四个目录: bin
,lib
,log
和conf
。
从发行页面下载连接器,并移动到lib
路径下。
下载链接仅适用于稳定版本,SNAPSHOT依赖项需要您根据特定分支自行构建。
提交Flink CDC作业 #
以下是mysql整库同步到doris的示例配置文件:mysql-to-doris.yaml
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
请参考连接器信息,按需修改配置文件。
最后,通过Cli将作业提交到Flink Standalone集群。
cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml
成功提交作业后,返回如下信息:
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
通过Flink Web UI,您可以找到一个名为Sync MySQL Database to Doris
的作业正在运行。
Kubernetes Operator模式 #
假设您已经在K8S集群上部署Flink Kubernetes Operator,您只需构建自定义的Flink CDC Docker镜像即可。
构建自定义Docker镜像 #
- 从发行页面下载Flink CDC的tar文件和需要的连接器,并移动到Docker镜像构建目录。
假设您的Docker构建目录为/opt/docker/flink-cdc
,此时该目录下的文件结构如下:/opt/docker/flink-cdc ├── flink-cdc-3.3-SNAPSHOT-bin.tar.gz ├── flink-cdc-pipeline-connector-doris-3.3-SNAPSHOT.jar ├── flink-cdc-pipeline-connector-mysql-3.3-SNAPSHOT.jar ├── mysql-connector-java-8.0.27.jar └── ...
- 创建Dockerfile文件,从官方
flink
镜像构建出自定义镜像并添加Flink CDC的依赖。Docker镜像构建目录最终如下:FROM flink:1.18.0-java8 ADD *.jar $FLINK_HOME/lib/ ADD flink-cdc*.tar.gz $FLINK_HOME/ RUN mv $FLINK_HOME/flink-cdc-3.3-SNAPSHOT/lib/flink-cdc-dist-3.3-SNAPSHOT.jar $FLINK_HOME/lib/
/opt/docker/flink-cdc ├── Dockerfile ├── flink-cdc-3.3-SNAPSHOT-bin.tar.gz ├── flink-cdc-pipeline-connector-doris-3.3-SNAPSHOT.jar ├── flink-cdc-pipeline-connector-mysql-3.3-SNAPSHOT.jar ├── mysql-connector-java-8.0.27.jar └── ...
- 构建自定义镜像并推送至仓库
docker build -t flink-cdc-pipeline:3.3-SNAPSHOT . docker push flink-cdc-pipeline:3.3-SNAPSHOT
创建ConfigMap用于挂载Flink CDC配置文件 #
以下是一个示例文件,请修改其中对应的连接参数为实际值:
---
apiVersion: v1
data:
flink-cdc.yaml: |-
parallelism: 4
schema.change.behavior: EVOLVE
mysql-to-doris.yaml: |-
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
kind: ConfigMap
metadata:
name: flink-cdc-pipeline-configmap
创建FlinkDeployment YAML文件 #
以下是示例文件flink-cdc-pipeline-job.yaml
:
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-cdc-pipeline-job
spec:
flinkConfiguration:
classloader.resolve-order: parent-first
state.checkpoints.dir: 'file:///tmp/checkpoints'
state.savepoints.dir: 'file:///tmp/savepoints'
flinkVersion: v1_18
image: 'flink-cdc-pipeline:3.3-SNAPSHOT'
imagePullPolicy: Always
job:
args:
- '--use-mini-cluster'
- /opt/flink/flink-cdc-3.3-SNAPSHOT/conf/mysql-to-doris.yaml
entryClass: org.apache.flink.cdc.cli.CliFrontend
jarURI: 'local:///opt/flink/flink-cdc-3.3-SNAPSHOT/lib/flink-cdc-dist-3.3-SNAPSHOT.jar'
parallelism: 1
state: running
upgradeMode: savepoint
jobManager:
replicas: 1
resource:
cpu: 1
memory: 1024m
podTemplate:
apiVersion: v1
kind: Pod
spec:
containers:
# don't modify this name
- name: flink-main-container
volumeMounts:
- mountPath: /opt/flink/flink-cdc-3.3-SNAPSHOT/conf
name: flink-cdc-pipeline-config
volumes:
- configMap:
name: flink-cdc-pipeline-configmap
name: flink-cdc-pipeline-config
restartNonce: 0
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 1024m
- 由于Flink的类加载机制,参数
classloader.resolve-order
必须设置为parent-first
。- Flink CDC默认提交作业到远程Flink集群,在Operator模式下,您需要通过指定
--use-mini-cluster
参数在pod内部启动一个Standalone Flink集群。
提交Flink CDC作业 #
ConfigMap和FlinkDeployment YAML文件创建完成后,即可通过kubectl提交作业到Operator:
kubectl apply -f flink-cdc-pipeline-job.yaml
成功提交作业后,返回信息如下:
flinkdeployment.flink.apache.org/flink-cdc-pipeline-job created
如您需要查看日志、暴露Flink Web UI等,请参考:Flink Kubernetes Operator文档。
请注意,目前不支持使用native application mode提交作业。