- 应用开发
- DataStream API
- 用户自定义 Functions
用户自定义 Functions
本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本。
大多数操作都需要用户自定义 function。本节列出了实现用户自定义 function 的不同方式。还会介绍 Accumulators
(累加器),可用于深入了解你的 Flink 应用程序。
实现接口
最基本的方法是实现提供的接口:
匿名类
你可以将 function 当做匿名类传递:
Java 8 Lambdas
Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。
Rich functions
所有需要用户自定义 function 的转化操作都可以将 rich function 作为参数。例如,你可以将下面代码
替换成
并将 function 照常传递给 map
transformation:
Rich functions 也可以定义成匿名类:
Lambda Functions
正如你在上面的例子中看到的,所有的操作同可以通过 lambda 表达式来描述:
Rich functions
所有将 lambda 表达式作为参数的转化操作都可以用 rich function 来代替。例如,你可以将下面代码
替换成
并将 function 传递给 map
transformation:
Rich functions 也可以定义成匿名类:
除了用户自定义的 function(map,reduce 等),Rich functions 还提供了四个方法:open
、close
、getRuntimeContext
和
setRuntimeContext
。这些方法对于参数化 function
(参阅 给 function 传递参数),
创建和最终确定本地状态,访问广播变量(参阅
广播变量),以及访问运行时信息,例如累加器和计数器(参阅
累加器和计数器),以及迭代器的相关信息(参阅 迭代器)
有很大作用。
Back to top
累加器和计数器
累加器是具有加法运算和最终累加结果的一种简单结构,可在作业结束后使用。
最简单的累加器就是计数器: 你可以使用
Accumulator.add(V value)
方法将其递增。在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端。
在调试过程中或在你想快速了解有关数据更多信息时,累加器作用很大。
Flink 目前有如下内置累加器。每个都实现了
累加器
接口。
如何使用累加器:
首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。
其次,你必须在 rich function 的 open()
方法中注册累加器对象。也可以在此处定义名称。
现在你可以在操作 function 中的任何位置(包括 open()
和 close()
方法中)使用累加器。
最终整体结果会存储在由执行环境的 execute()
方法返回的 JobExecutionResult
对象中(当前只有等待作业完成后执行才起作用)。
单个作业的所有累加器共享一个命名空间。因此你可以在不同的操作 function 里面使用同一个累加器。Flink 会在内部将所有具有相同名称的累加器合并起来。
关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用。我们还计划在下一次迭代中提供上一次的迭代结果。你可以使用
聚合器
来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。
定制累加器:
要实现自己的累加器,你只需要实现累加器接口即可。如果你认为自定义累加器应随 Flink 一起提供,请尽管创建 pull request。
你可以选择实现
Accumulator
或 SimpleAccumulator。
Accumulator<V,R>
的实现十分灵活: 它定义了将要添加的值类型 V
,并定义了最终的结果类型 R
。例如,对于直方图,V
是一个数字且 R
是一个直方图。
SimpleAccumulator
适用于两种类型都相同的情况,例如计数器。
Back to top