This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Standalone #
本页面提供了关于如何在静态(但可能异构)集群上以完全分布式方式运行 Flink 的说明。
需求 #
软件需求 #
Flink 运行在所有类 UNIX 环境下,例如 Linux,Mac OS X 和 Cygwin (Windows),集群由一个 master 节点以及一个或多个 worker 节点构成。在配置系统之前,请确保在每个节点上安装有以下软件:
- Java 1.8.x 或更高版本,
- ssh (必须运行 sshd 以执行用于管理 Flink 各组件的脚本)
如果集群不满足软件要求,那么你需要安装/更新这些软件。
使集群中所有节点使用免密码 SSH 以及拥有相同的目录结构可以让你使用脚本来控制一切。
JAVA_HOME
配置
#
Flink 需要 master 和所有 worker 节点设置 JAVA_HOME
环境变量,并指向你的 Java 安装目录。
你可以在 Flink 配置文件中通过 env.java.home
配置项来设置此变量。 需要注意的是,该配置项必须以 flattened 的格式(即一行键值对格式)在配置文件中。
Flink 设置 #
前往 下载页面 获取可运行的软件包。
在下载完最新的发布版本后,复制压缩文件到 master 节点并解压:
tar xzf flink-*.tgz
cd flink-*
配置 Flink #
在解压完文件后,你需要编辑 Flink 配置文件来为集群配置 Flink。
设置 jobmanager.rpc.address
配置项指向 master 节点。你也应该通过设置 jobmanager.memory.process.size
和 taskmanager.memory.process.size
配置项来定义 Flink 允许在每个节点上分配的最大内存值。
这些值的单位是 MB。如果一些 worker 节点上有你想分配到 Flink 系统的多余内存,你可以在这些特定节点的 Flink 配置文件 中重写 taskmanager.memory.process.size
或 taskmanager.memory.flink.size
的默认值。
最后,你必须提供集群上会被用作为 worker 节点的所有节点列表,也就是运行 TaskManager 的节点。编辑文件 conf/workers 并输入每个 worker 节点的 IP 或主机名。
以下例子展示了三个节点(IP 地址从 10.0.0.1 到 10.0.0.3,主机名为 master、worker1、 woker2)的设置,以及配置文件(在所有机器上都需要在相同路径访问)的内容:
/path/to/flink/conf/
config.yaml
jobmanager.rpc.address: 10.0.0.1
/path/to/flink/
conf/workers
10.0.0.2 10.0.0.3
Flink 目录必须放在所有 worker 节点的相同目录下。你可以使用共享的 NFS 目录,或将 Flink 目录复制到每个 worker 节点上。
请参考 配置参数页面 获取更多细节以及额外的配置项。
特别地,
- 每个 JobManager 的可用内存值(
jobmanager.memory.process.size
), - 每个 TaskManager 的可用内存值 (
taskmanager.memory.process.size
,并检查 内存调优指南), - 每台机器的可用 CPU 数(
taskmanager.numberOfTaskSlots
), - 集群中所有 CPU 数(
parallelism.default
)和 - 临时目录(
io.tmp.dirs
)
的值都是非常重要的配置项。
启动 Flink #
下面的脚本在本地节点启动了一个 JobManager 并通过 SSH 连接到 workers 文件中所有的 worker 节点,在每个节点上启动 TaskManager。现在你的 Flink 系统已经启动并运行着。可以通过配置的 RPC 端口向本地节点上的 JobManager 提交作业。
假定你在 master 节点并且在 Flink 目录下:
bin/start-cluster.sh
为了关闭 Flink,这里同样有一个 stop-cluster.sh
脚本。
为集群添加 JobManager/TaskManager 实例 #
你可以使用 bin/jobmanager.sh
和 bin/taskmanager.sh
脚本为正在运行的集群添加 JobManager 和 TaskManager 实例。
添加 JobManager #
bin/jobmanager.sh ((start|start-foreground) [args])|stop|stop-all
添加 TaskManager #
bin/taskmanager.sh start|start-foreground|stop|stop-all
确保在你想启动/关闭相应实例的主机上执行这些脚本。
High-Availability with Standalone #
In order to enable HA for a standalone cluster, you have to use the ZooKeeper HA services.
Additionally, you have to configure your cluster to start multiple JobManagers.
Masters File (masters) #
In order to start an HA-cluster configure the masters file in conf/masters
:
-
masters file: The masters file contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds.
jobManagerAddress1:webUIPort1 [...] jobManagerAddressX:webUIPortX
By default, the job manager will pick a random port for inter process communication. You can change this via the high-availability.jobmanager.port key. This key accepts single ports (e.g. 50010
), ranges (50000-50025
), or a combination of both (50010,50011,50020-50025,50050-50075
).
Example: Standalone Cluster with 2 JobManagers #
-
Configure high availability mode and ZooKeeper quorum in Flink configuration file:
high-availability.type: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one # important: customize per cluster high-availability.storageDir: hdfs:///flink/recovery
-
Configure masters in
conf/masters
:localhost:8081 localhost:8082
-
Configure ZooKeeper server in
conf/zoo.cfg
(currently it’s only possible to run a single ZooKeeper server per machine):server.0=localhost:2888:3888
-
Start ZooKeeper quorum:
$ bin/start-zookeeper-quorum.sh Starting zookeeper daemon on host localhost.
-
Start an HA-cluster:
$ bin/start-cluster.sh Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. Starting standalonesession daemon on host localhost. Starting standalonesession daemon on host localhost. Starting taskexecutor daemon on host localhost.
-
Stop ZooKeeper quorum and cluster:
$ bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 7647) on localhost. Stopping standalonesession daemon (pid: 7495) on host localhost. Stopping standalonesession daemon (pid: 7349) on host localhost. $ bin/stop-zookeeper-quorum.sh Stopping zookeeper daemon (pid: 7101) on host localhost.
User jars & Classpath #
In Standalone mode, the following jars will be recognized as user-jars and included into user classpath:
- Session Mode: The JAR file specified in startup command.
- Application Mode: The JAR file specified in startup command and all JAR files in Flink’s
usrlib
folder.
Please refer to the Debugging Classloading Docs for details.