public class BootstrapTools extends Object
Modifier and Type | Method and Description |
---|---|
static Configuration |
cloneConfiguration(Configuration configuration)
Clones the given configuration and resets instance specific config options.
|
static Configuration |
generateTaskManagerConfiguration(Configuration baseConfig,
String jobManagerHostname,
int jobManagerPort,
int numSlots,
scala.concurrent.duration.FiniteDuration registrationTimeout)
Generate a task manager configuration.
|
static String |
getStartCommand(String template,
Map<String,String> startCommandValues)
Replaces placeholders in the template start command with values from startCommandValues.
|
static String |
getTaskManagerShellCommand(Configuration flinkConfig,
ContaineredTaskManagerParameters tmParams,
String configDirectory,
String logDirectory,
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
Class<?> mainClass)
Generates the shell command to start a task manager.
|
static org.apache.commons.cli.Option |
newDynamicPropertiesOption()
Get an instance of the dynamic properties option.
|
static Configuration |
parseDynamicProperties(org.apache.commons.cli.CommandLine cmd)
Parse the dynamic properties (passed on the command line).
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String listeningAddress,
int listeningPort,
org.slf4j.Logger logger)
Starts an Actor System at a specific port.
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String listeningAddress,
String portRangeDefinition,
org.slf4j.Logger logger)
Starts an ActorSystem with the given configuration listening at the address/ports.
|
static WebMonitor |
startWebMonitorIfConfigured(Configuration config,
HighAvailabilityServices highAvailabilityServices,
LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever,
Time timeout,
ScheduledExecutor scheduledExecutor,
org.slf4j.Logger logger)
Starts the web frontend.
|
static void |
substituteDeprecatedConfigKey(Configuration config,
String deprecated,
String designated)
Sets the value of a new config key to the value of a deprecated config key.
|
static void |
substituteDeprecatedConfigPrefix(Configuration config,
String deprecatedPrefix,
String designatedPrefix)
Sets the value of a new config key to the value of a deprecated config key.
|
static void |
updateTmpDirectoriesInConfiguration(Configuration configuration,
String defaultDirs)
Set temporary configuration directories if necessary.
|
static void |
writeConfiguration(Configuration cfg,
File file)
Writes a Flink YAML config file from a Flink Configuration object.
|
public static akka.actor.ActorSystem startActorSystem(Configuration configuration, String listeningAddress, String portRangeDefinition, org.slf4j.Logger logger) throws Exception
configuration
- The Flink configurationlisteningAddress
- The address to listen at.portRangeDefinition
- The port range to choose a port from.logger
- The logger to output log information.Exception
public static akka.actor.ActorSystem startActorSystem(Configuration configuration, String listeningAddress, int listeningPort, org.slf4j.Logger logger) throws Exception
configuration
- The Flink configuration.listeningAddress
- The address to listen at.listeningPort
- The port to listen at.logger
- the logger to output log information.Exception
public static WebMonitor startWebMonitorIfConfigured(Configuration config, HighAvailabilityServices highAvailabilityServices, LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever, MetricQueryServiceRetriever queryServiceRetriever, Time timeout, ScheduledExecutor scheduledExecutor, org.slf4j.Logger logger) throws Exception
config
- The Flink config.highAvailabilityServices
- Service factory for high availability servicesjobManagerRetriever
- to retrieve the leading JobManagerGatewayqueryServiceRetriever
- to resolve a query servicetimeout
- for asynchronous operationsscheduledExecutor
- to run asynchronous operationslogger
- Logger for log outputException
public static Configuration generateTaskManagerConfiguration(Configuration baseConfig, String jobManagerHostname, int jobManagerPort, int numSlots, scala.concurrent.duration.FiniteDuration registrationTimeout)
baseConfig
- Config to start from.jobManagerHostname
- Job manager host name.jobManagerPort
- Port of the job manager.numSlots
- Number of slots to configure.registrationTimeout
- Timeout for registrationpublic static void writeConfiguration(Configuration cfg, File file) throws IOException
cfg
- The Flink configfile
- The File to write toIOException
public static void substituteDeprecatedConfigKey(Configuration config, String deprecated, String designated)
config
- Config to writedeprecated
- The old config keydesignated
- The new config keypublic static void substituteDeprecatedConfigPrefix(Configuration config, String deprecatedPrefix, String designatedPrefix)
config
- Config to writedeprecatedPrefix
- Old prefix of keydesignatedPrefix
- New prefix of keypublic static org.apache.commons.cli.Option newDynamicPropertiesOption()
Dynamic properties allow the user to specify additional configuration values with -D, such as -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624
public static Configuration parseDynamicProperties(org.apache.commons.cli.CommandLine cmd)
public static String getTaskManagerShellCommand(Configuration flinkConfig, ContaineredTaskManagerParameters tmParams, String configDirectory, String logDirectory, boolean hasLogback, boolean hasLog4j, boolean hasKrb5, Class<?> mainClass)
flinkConfig
- The Flink configuration.tmParams
- Parameters for the task manager.configDirectory
- The configuration directory for the flink-conf.yamllogDirectory
- The log directory.hasLogback
- Uses logback?hasLog4j
- Uses log4j?mainClass
- The main class to start with.public static String getStartCommand(String template, Map<String,String> startCommandValues)
If the default template ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE
is used, the following keys must be present in the map or the resulting
command will still contain placeholders:
template
- a template start command with placeholdersstartCommandValues
- a replacement map placeholder -> valuepublic static void updateTmpDirectoriesInConfiguration(Configuration configuration, @Nullable String defaultDirs)
configuration
- flink config to patchdefaultDirs
- in case no tmp directories is set, next directories will be appliedpublic static Configuration cloneConfiguration(Configuration configuration)
configuration
- to cloneCopyright © 2014–2019 The Apache Software Foundation. All rights reserved.