This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
介绍 #
Apache Hadoop YARN 是众多数据处理框架所青睐的资源提供者。 Flink 服务被提交至 YARN 的 ResourceManager,后者会在由 YARN NodeManager 管理的机器上生成 container。Flink 将其 JobManager 和 TaskManager 实例部署到这些 container 中。
Flink 可以根据在 JobManager 上运行的作业处理所需的 slot 数量,动态分配和释放 TaskManager 资源。
准备工作 #
此“入门指南”部分假定从 2.10.2 版本起具备功能可用的 YARN 环境。YARN 环境可以通过像亚马逊 EMR、谷歌云 DataProc 等服务或 Cloudera 等产品来搭建。不建议在本地或集群上手动设置 YARN 环境来完成本入门教程。
- 通过运行
yarn top
无错误信息显示以确保你的 YARN 集群准备好接受 Flink 应用程序的提交。 - 从下载页面下载最新的 Flink 发行版并解压缩。
- 一定要确保设置了
HADOOP_CLASSPATH
环境变量(可以通过运行echo $HADOOP_CLASSPATH
来检查)。如果没有,请使用以下命令进行设置。
export HADOOP_CLASSPATH=`hadoop classpath`
Session 模式 #
Flink 在所有类 UNIX 的环境中运行,即在 Linux、Mac OS X 以及(针对 Windows 的)Cygwin 上运行。 你可以参考概览来检查支持的版本并下载Flink二进制版本, 然后解压文件:
tar -xzf flink-*.tgz
你需要设置 FLINK_HOME
环境变量,比如:
export FLINK_HOME=/path/flink-*
在 YARN 启动一个Flink Session #
确保已设置 HADOOP_CLASSPATH
环境变量,即可在 YARN 会话启动一个 Flink 任务:
# we assume to be in the root directory of
# the unzipped Flink distribution
# export HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`
# Start YARN session
./bin/yarn-session.sh --detached
# Stop YARN session (replace the application id based
# on the output of the yarn-session.sh command)
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
启动 Yarn 会话之后,即可通过命令输出最后一行打印的 URL 或者 YARN ResourceManager Web UI 访问 Flink Web UI。
然后,需要向 flink-conf.yaml 添加一些配置:
rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}
execution.target: yarn-session
yarn.application.id: {{YARN_APPLICATION_ID}}
{{REST_PORT}} 和 {{NODE_IP}} 需要替换为 JobManager Web 接口的实际值,{{YARN_APPLICATION_ID}} 则需替换为 Flink 实际的 Yarn 应用 ID。
配置 Flink CDC #
从发布页面下载 Flink CDC 的 tar 文件,然后提取该归档文件:
tar -xzf flink-cdc-*.tar.gz
提取后的 flink-cdc
包含四个目录: bin
,lib
,log
和 conf
。
从发布页面下载连接器 jar,并将其移动至 lib
目录中。
下载链接仅适用于稳定版本,SNAPSHOT 依赖项需要自己基于特定分支进行构建。
提交 Flink CDC 任务 #
下面是一个用于整库同步的示例文件 mysql-to-doris.yaml
:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
你可以按需修改配置文件。 最后,通过 Cli 将作业提交至 Flink Standalone 集群。
cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml
提交成功将返回如下信息:
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
你可以通过 Flink Web UI 找到一个名为 Sync MySQL Database to Doris
的作业。
请注意,目前还不支持提交至 application 模式集群和 per-job 模式集群。