This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
自定义故障丰富器(Failure Enricher) #
Flink 提供了一个可插拔的接口,供用户注册他们自定义的逻辑,并使用额外的元数据标签(字符串类型的 key-value 对)来丰富故障(Failure)信息。 这使得用户可以实现自定义的故障丰富器插件,对作业故障进行分类、对外暴露自定义指标,或者调用外部通知系统。
每当 JobManager 在运行时收到异常时,都会触发 FailureEnrichers。 每个 FailureEnricher 可以异步返回故障对应的标签(labels),这些标签可以通过 JobManager 的 REST API 来查询(例如:“type:System” 的标签意味着该故障被分类为系统错误)。
实现一个自定义的故障丰富器插件 #
要实现自定义的 FailureEnricher 插件,需要按照下面的步骤:
-
通过实现 FailureEnricher 接口,添加自定义的 FailureEnricher ;
-
通过实现 FailureEnricherFactory 接口,添加自定义的 FailureEnricherFactory ;
-
添加一个 Service 入口,创建一个文件
META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory
,文件中包含自定义 FailureEnricherFactory 的类名(有关详细信息,请参阅 Java Service Loader 文档)。
然后,创建一个包含 FailureEnricher
,FailureEnricherFactory
,META-INF/services/
和所有外部依赖项的 jar 文件。
在 Flink Deploy 目录中的 plugins/
目录中创建一个任意名称的目录,例如 “failure-enrichment”,并将 jar 文件放入此目录中。
使用方式与其它 Plugin 是类似的,有关的详细信息,请参考 Flink Plugins。
请注意,每个 FailureEnricher 都应该定义一组与对应 values 关联的 output keys 。这组 keys 必须是唯一的,否则将忽略具有重复 key 的所有丰富器(enrichers)。
FailureEnricherFactory 示例:
package org.apache.flink.test.plugin.jar.failure;
public class TestFailureEnricherFactory implements FailureEnricherFactory {
@Override
public FailureEnricher createFailureEnricher(Configuration conf) {
return new CustomEnricher();
}
}
FailureEnricher 示例:
package org.apache.flink.test.plugin.jar.failure;
public class CustomEnricher implements FailureEnricher {
private final Set<String> outputKeys;
public CustomEnricher() {
this.outputKeys = Collections.singleton("labelKey");
}
@Override
public Set<String> getOutputKeys() {
return outputKeys;
}
@Override
public CompletableFuture<Map<String, String>> processFailure(
Throwable cause, Context context) {
return CompletableFuture.completedFuture(Collections.singletonMap("labelKey", "labelValue"));
}
}
配置 #
JobManager 在启动时加载 FailureEnricher 插件。 为了确保您的 FailureEnrichers 已加载,所有类名都应该定义在 jobmanager.failure-enrichers 配置 中。 如果此配置为空,则不会启动任何 Enricher。 示例:
jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure.CustomEnricher
验证 #
要验证 FailureEnricher 是否已加载,您可以检查 JobManager 日志中是否有下面的内容:
Found failure enricher org.apache.flink.test.plugin.jar.failure.CustomEnricher at jar:file:/path/to/flink/plugins/failure-enrichment/flink-test-plugin.jar!/org/apache/flink/test/plugin/jar/failure/CustomEnricher.class
此外,还可以查询 JobManager 的 REST API 来查找 failureLabels 字段:
"failureLabels": {
"labelKey": "labelValue"
}