public class BootstrapTools extends Object
Modifier and Type | Class and Description |
---|---|
static interface |
BootstrapTools.ActorSystemExecutorConfiguration
Configuration interface for
ActorSystem underlying executor. |
static class |
BootstrapTools.FixedThreadPoolExecutorConfiguration
Configuration for a fixed thread pool executor.
|
static class |
BootstrapTools.ForkJoinExecutorConfiguration
Configuration for a fork join executor.
|
Modifier and Type | Method and Description |
---|---|
static int |
calculateHeapSize(int memory,
Configuration conf)
Calculate heap size after cut-off.
|
static Configuration |
cloneConfiguration(Configuration configuration)
Clones the given configuration and resets instance specific config options.
|
static String |
escapeForDifferentOS(String value)
Escape all the dynamic property values.
|
static String |
escapeWithDoubleQuote(String value) |
static String |
escapeWithSingleQuote(String value) |
static String |
getDynamicPropertiesAsString(Configuration baseConfig,
Configuration targetConfig)
Get dynamic properties based on two Flink configurations.
|
static Map<String,String> |
getEnvironmentVariables(String envPrefix,
Configuration flinkConfiguration)
Method to extract environment variables from the flinkConfiguration based on the given prefix String.
|
static String |
getStartCommand(String template,
Map<String,String> startCommandValues)
Replaces placeholders in the template start command with values from startCommandValues.
|
static String |
getTaskManagerShellCommand(Configuration flinkConfig,
ContaineredTaskManagerParameters tmParams,
String configDirectory,
String logDirectory,
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
Class<?> mainClass,
String mainArgs)
Generates the shell command to start a task manager.
|
static org.apache.commons.cli.Option |
newDynamicPropertiesOption()
Get an instance of the dynamic properties option.
|
static Configuration |
parseDynamicProperties(org.apache.commons.cli.CommandLine cmd)
Parse the dynamic properties (passed on the command line).
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String listeningAddress,
int listeningPort,
org.slf4j.Logger logger)
Starts an Actor System at a specific port.
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String listeningAddress,
int listeningPort,
org.slf4j.Logger logger,
BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration)
Starts an Actor System at a specific port.
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String actorSystemName,
String listeningAddress,
int listeningPort,
org.slf4j.Logger logger,
BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration)
Starts an Actor System at a specific port.
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String listeningAddress,
String portRangeDefinition,
org.slf4j.Logger logger)
Starts an ActorSystem with the given configuration listening at the address/ports.
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String listeningAddress,
String portRangeDefinition,
org.slf4j.Logger logger,
BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration)
Starts an ActorSystem with the given configuration listening at the address/ports.
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String actorSystemName,
String listeningAddress,
String portRangeDefinition,
org.slf4j.Logger logger,
BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration)
Starts an ActorSystem with the given configuration listening at the address/ports.
|
static void |
substituteDeprecatedConfigKey(Configuration config,
String deprecated,
String designated)
Sets the value of a new config key to the value of a deprecated config key.
|
static void |
substituteDeprecatedConfigPrefix(Configuration config,
String deprecatedPrefix,
String designatedPrefix)
Sets the value of a new config key to the value of a deprecated config key.
|
static void |
updateTmpDirectoriesInConfiguration(Configuration configuration,
String defaultDirs)
Set temporary configuration directories if necessary.
|
static void |
writeConfiguration(Configuration cfg,
File file)
Writes a Flink YAML config file from a Flink Configuration object.
|
public static akka.actor.ActorSystem startActorSystem(Configuration configuration, String listeningAddress, String portRangeDefinition, org.slf4j.Logger logger) throws Exception
configuration
- The Flink configurationlisteningAddress
- The address to listen at.portRangeDefinition
- The port range to choose a port from.logger
- The logger to output log information.Exception
- Thrown when actor system cannot be started in specified port rangepublic static akka.actor.ActorSystem startActorSystem(Configuration configuration, String listeningAddress, String portRangeDefinition, org.slf4j.Logger logger, @Nonnull BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception
configuration
- The Flink configurationlisteningAddress
- The address to listen at.portRangeDefinition
- The port range to choose a port from.logger
- The logger to output log information.actorSystemExecutorConfiguration
- configuration for the ActorSystem's underlying executorException
- Thrown when actor system cannot be started in specified port rangepublic static akka.actor.ActorSystem startActorSystem(Configuration configuration, String actorSystemName, String listeningAddress, String portRangeDefinition, org.slf4j.Logger logger, @Nonnull BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception
configuration
- The Flink configurationactorSystemName
- Name of the started ActorSystem
listeningAddress
- The address to listen at.portRangeDefinition
- The port range to choose a port from.logger
- The logger to output log information.actorSystemExecutorConfiguration
- configuration for the ActorSystem's underlying executorException
- Thrown when actor system cannot be started in specified port rangepublic static akka.actor.ActorSystem startActorSystem(Configuration configuration, String listeningAddress, int listeningPort, org.slf4j.Logger logger) throws Exception
configuration
- The Flink configuration.listeningAddress
- The address to listen at.listeningPort
- The port to listen at.logger
- the logger to output log information.Exception
public static akka.actor.ActorSystem startActorSystem(Configuration configuration, String listeningAddress, int listeningPort, org.slf4j.Logger logger, BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception
configuration
- The Flink configuration.listeningAddress
- The address to listen at.listeningPort
- The port to listen at.logger
- the logger to output log information.actorSystemExecutorConfiguration
- configuration for the ActorSystem's underlying executorException
public static akka.actor.ActorSystem startActorSystem(Configuration configuration, String actorSystemName, String listeningAddress, int listeningPort, org.slf4j.Logger logger, BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception
configuration
- The Flink configuration.actorSystemName
- Name of the started ActorSystem
listeningAddress
- The address to listen at.listeningPort
- The port to listen at.logger
- the logger to output log information.actorSystemExecutorConfiguration
- configuration for the ActorSystem's underlying executorException
public static void writeConfiguration(Configuration cfg, File file) throws IOException
cfg
- The Flink configfile
- The File to write toIOException
public static void substituteDeprecatedConfigKey(Configuration config, String deprecated, String designated)
config
- Config to writedeprecated
- The old config keydesignated
- The new config keypublic static void substituteDeprecatedConfigPrefix(Configuration config, String deprecatedPrefix, String designatedPrefix)
config
- Config to writedeprecatedPrefix
- Old prefix of keydesignatedPrefix
- New prefix of keypublic static org.apache.commons.cli.Option newDynamicPropertiesOption()
Dynamic properties allow the user to specify additional configuration values with -D, such as -Dfs.overwrite-files=true -Dtaskmanager.memory.network.min=536346624
public static Configuration parseDynamicProperties(org.apache.commons.cli.CommandLine cmd)
public static String getTaskManagerShellCommand(Configuration flinkConfig, ContaineredTaskManagerParameters tmParams, String configDirectory, String logDirectory, boolean hasLogback, boolean hasLog4j, boolean hasKrb5, Class<?> mainClass, String mainArgs)
flinkConfig
- The Flink configuration.tmParams
- Parameters for the task manager.configDirectory
- The configuration directory for the flink-conf.yamllogDirectory
- The log directory.hasLogback
- Uses logback?hasLog4j
- Uses log4j?mainClass
- The main class to start with.public static String getStartCommand(String template, Map<String,String> startCommandValues)
If the default template ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE
is used, the following keys must be present in the map or the resulting
command will still contain placeholders:
template
- a template start command with placeholdersstartCommandValues
- a replacement map placeholder -> valuepublic static void updateTmpDirectoriesInConfiguration(Configuration configuration, @Nullable String defaultDirs)
configuration
- flink config to patchdefaultDirs
- in case no tmp directories is set, next directories will be appliedpublic static Configuration cloneConfiguration(Configuration configuration)
configuration
- to clonepublic static String getDynamicPropertiesAsString(Configuration baseConfig, Configuration targetConfig)
baseConfig
- The base configuration.targetConfig
- The target configuration.public static String escapeForDifferentOS(String value)
For Windows OS, each value will be surrounded with double quotes. The double quote itself needs to be escaped with back slash. Also the caret symbol need to be escaped with double carets since Windows uses it to escape characters. See https://en.wikibooks.org/wiki/Windows_Batch_Scripting for more information about Windows escaping.
value
- value to be escapedpublic static int calculateHeapSize(int memory, Configuration conf)
public static Map<String,String> getEnvironmentVariables(String envPrefix, Configuration flinkConfiguration)
envPrefix
- Prefix for the environment variables keyflinkConfiguration
- The Flink config to get the environment variable definition fromCopyright © 2014–2020 The Apache Software Foundation. All rights reserved.