Handling Application Parameters

应用程序参数处理 #

应用程序参数处理 #

几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。

为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。请注意,这里说的 Parametertool 并不是必须使用的。Commons CLIargparse4j 等其他框架也可以非常好地兼容 Flink。

ParameterTool 读取配置值 #

ParameterTool 定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map<string,string> 类型,这样使得它可以很容易地与你的配置集成在一起。

配置值来自 .properties 文件 #

以下方法可以读取 Properties 文件并解析出键/值对:

String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);

InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

配置值来自命令行 #

以下方法可以从命令行中获取参数,如 --input hdfs:///mydata --elements 42

public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..

配置值来自系统属性 #

启动 JVM 时,可以将系统属性传递给 JVM:-Dinput=hdfs:///mydata。你也可以从这些系统属性初始化 ParameterTool

ParameterTool parameter = ParameterTool.fromSystemProperties();

现在我们已经从某处获取了参数(见上文),可以以各种不同的方式使用它们。

直接从 ParameterTool 获取

ParameterTool 本身具有访问配置值的方法。

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters();
// .. there are more methods available.

你可以在提交应用程序时直接在客户端的 main() 方法中使用这些方法的返回值。例如,你可以这样设置算子的并行度:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

由于 ParameterTool 是序列化的,你可以将其传递给函数本身:

ParameterTool parameters = ParameterTool.fromArgs(args);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内使用它以获取命令行的传递的参数。

注册全局参数 #

从 JobManager web 界面和用户定义的所有函数中可以以配置值的方式访问在 ExecutionConfig 中注册的全局作业参数。

注册全局参数:

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在任意富函数中访问参数:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        ParameterTool parameters = ParameterTool.fromMap(getRuntimeContext().getGlobalJobParameters());
        parameters.getRequired("input");
        // .. do more ..

Back to top