ZooKeeper 高可用服务

ZooKeeper 高可用服务 #

Flink 的 ZooKeeper 高可用模式使用 ZooKeeper 提供高可用服务。

Flink 利用 ZooKeeper 在所有运行的 JobManager 实例之间进行 分布式协调。ZooKeeper 是一个独立于 Flink 的服务,它通过 leader 选举和轻量级的一致性状态存储来提供高可靠的分布式协调。查看 ZooKeeper入门指南,了解更多关于 ZooKeeper 的信息。Flink 包含 启动一个简单的ZooKeeper 的安装脚本。

配置 #

为了启用高可用集群(HA-cluster),你必须设置以下配置项:

  • high-availability.type (必要的): high-availability.type 配置项必须设置为 zookeeper

    high-availability.type: zookeeper
  • high-availability.storageDir (必要的): JobManager 元数据持久化到文件系统 high-availability.storageDir 配置的路径中,并且在 ZooKeeper 中只能有一个目录指向此位置。

    high-availability.storageDir: hdfs:///flink/recovery

    storageDir 存储要从 JobManager 失败恢复时所需的所有元数据。

  • high-availability.zookeeper.quorum (必要的): ZooKeeper quorum 是一个提供分布式协调服务的复制组。

    high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181

    每个 addressX:port 指的是一个 ZooKeeper 服务器,它可以被 Flink 在给定的地址和端口上访问。

  • high-availability.zookeeper.path.root (推荐的): ZooKeeper 根节点,集群的所有节点都放在该节点下。

    high-availability.zookeeper.path.root: /flink
  • high-availability.cluster-id (推荐的): ZooKeeper cluster-id 节点,在该节点下放置集群所需的协调数据。

    high-availability.cluster-id: /default_ns # important: customize per cluster

    重要: 在 YARN、原生 Kubernetes 或其他集群管理器上运行时,不应该手动设置此值。在这些情况下,将自动生成一个集群 ID。如果在未使用集群管理器的机器上运行多个 Flink 高可用集群,则必须为每个集群手动配置单独的集群 ID(cluster-ids)。

配置示例 #

Flink 配置文件 中配置高可用模式和 ZooKeeper 复制组(quorum):

high-availability.type: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # 重要: 每个集群自定义
high-availability.storageDir: hdfs:///flink/recovery

Back to top

ZooKeeper 安全配置 #

如果 ZooKeeper 使用 Kerberos 以安全模式运行,必要时可以在 Flink 配置文件 中覆盖以下配置:

# 默认配置为 "zookeeper". 如果 ZooKeeper quorum 配置了不同的服务名称,
# 那么可以替换到这里。

zookeeper.sasl.service-name: zookeeper 

# 默认配置为 "Client". 该值必须为 "security.kerberos.login.contexts" 项中配置的某一个值。
zookeeper.sasl.login-context-name: Client  

有关用于 Kerberos 安全性的 Flink 配置的更多信息,请参阅 Flink 配置页面的安全性部分。你还可以找到关于 Flink 如何在内部设置基于 kerberos 的安全性 的详细信息。

Back to top

Advanced Configuration #

Tolerating Suspended ZooKeeper Connections #

Per default, Flink’s ZooKeeper client treats suspended ZooKeeper connections as an error. This means that Flink will invalidate all leaderships of its components and thereby triggering a failover if a connection is suspended.

This behaviour might be too disruptive in some cases (e.g., unstable network environment). If you are willing to take a more aggressive approach, then you can tolerate suspended ZooKeeper connections and only treat lost connections as an error via high-availability.zookeeper.client.tolerate-suspended-connections. Enabling this feature will make Flink more resilient against temporary connection problems but also increase the risk of running into ZooKeeper timing problems.

For more information take a look at Curator’s error handling.

启动 ZooKeeper #

如果你没有安装 ZooKeeper,可以使用 Flink 附带的帮助脚本。

conf/zoo.cfg 文件中有 ZooKeeper 的配置模板。你可以在 server.X 配置项中配置主机来运行 ZooKeeper。其中 X 是每个服务器的唯一 ID:

server.X=addressX:peerPort:leaderPort
[...]
server.Y=addressY:peerPort:leaderPort

脚本 bin/start-zookeeper-quorum.sh 将在每个配置的主机上启动一个 ZooKeeper 服务。该进程是通过 Flink 包装器来启动的,该包装器从 conf/zoo.cfg 读取配置,并确保设置一些必要的配置值以方便使用。

在生产环境中,建议你自行管理 ZooKeeper 的安装与部署。

Back to top