DataStream API 教程
This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

DataStream API 教程 #

Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 在这篇教程中,你将学习如何使用 PyFlink 和 DataStream API 构建一个简单的流式应用程序。

你要搭建一个什么系统 #

在本教程中,你将学习如何编写一个简单的 Python DataStream 作业。 例子是从非空集合中读取数据,并将结果写入本地文件系统。

准备条件 #

本教程假设你对 Python 有一定的熟悉,但是即使你使用的是不同编程语言,你也应该能够学会。

困难求助 #

如果你有疑惑,可以查阅 社区支持资源。 特别是,Apache Flink 用户邮件列表 一直被评为所有Apache项目中最活跃的一个,也是快速获得帮助的好方法。

怎样跟着教程练习 #

首先,你需要在你的电脑上准备以下环境:

  • Java 8 or 11
  • Python 3.6, 3.7 or 3.8

使用 Python DataStream API 需要安装 PyFlink,PyFlink 发布在 PyPI上,可以通过 pip 快速安装。

$ python -m pip install apache-flink

一旦 PyFlink 安装完成之后,你就可以开始编写 Python DataStream 作业了。

DataStream API 应用程序首先需要声明一个执行环境(StreamExecutionEnvironment),这是流式程序执行的上下文。你将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

一旦创建了 StreamExecutionEnvironment 之后,你可以使用它来声明数据源。数据源从外部系统(如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到 Flink 作业里。

为了简单起见,本教程使用元素集合作为数据源。

ds = env.from_collection(
    collection=[(1, 'aaa'), (2, 'bbb')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的 ROW 类型)。

你现在可以在这个数据流上执行转换操作,或者使用 sink 将数据写入外部系统。本教程使用 StreamingFileSink 将数据写入 /tmp/output 文件目录中。

ds.add_sink(StreamingFileSink
    .for_row_format('/tmp/output', Encoder.simple_string_encoder())
    .build())

最后一步是执行真实的 PyFlink DataStream API 作业。PyFlink applications 是懒加载的,并且只有在完全构建之后才会提交给集群上执行。要执行一个应用程序,你只需简单地调用 env.execute(job_name)

env.execute("tutorial_job")

完整的代码如下:

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink


def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('/tmp/output', Encoder.simple_string_encoder())
                .build())
    env.execute("tutorial_job")


if __name__ == '__main__':
    tutorial()

现在你已经编写好 PyFlink 程序,可以运行它了!首先,需要确保输出目录不存在:

rm -rf /tmp/output

接下来,可以使用如下命令运行刚刚创建的示例:

$ python datastream_tutorial.py

这个命令会在本地集群中构建并运行 PyFlink 程序。你也可以使用 Job Submission Examples 中描述的命令将其提交到远程集群。

最后,你可以在命令行上看到执行结果:

$ find /tmp/output -type f -exec cat {} \;
1,aaa
2,bbb

本教程为你开始编写自己的 PyFlink DataStream API 程序提供了基础。如果需要了解更多关于 Python DataStream API 的使用,请查阅 Flink Python API 文档