public class KubernetesUtils extends Object
Modifier and Type | Method and Description |
---|---|
static void |
checkAndUpdatePortConfigOption(Configuration flinkConfig,
ConfigOption<String> port,
int fallbackPort)
Check whether the port config option is a fixed port.
|
static List<URI> |
checkJarFileForApplicationMode(Configuration configuration) |
static CompletedCheckpointStore |
createCompletedCheckpointStore(Configuration configuration,
FlinkKubeClient kubeClient,
Executor executor,
String configMapName,
String lockIdentity,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor,
RestoreMode restoreMode)
Create a
DefaultCompletedCheckpointStore with KubernetesStateHandleStore . |
static void |
createConfigMapIfItDoesNotExist(FlinkKubeClient flinkKubeClient,
String configMapName,
String clusterId)
Creates a config map with the given name if it does not exist.
|
static KubernetesStateHandleStore<JobGraph> |
createJobGraphStateHandleStore(Configuration configuration,
FlinkKubeClient flinkKubeClient,
String configMapName,
String lockIdentity)
Create a
KubernetesStateHandleStore which storing JobGraph . |
static JobGraphStore |
createJobGraphStore(Configuration configuration,
FlinkKubeClient flinkKubeClient,
String configMapName,
String lockIdentity)
Create a
DefaultJobGraphStore with NoOpJobGraphStoreWatcher . |
static String |
createSingleLeaderKey(String componentId) |
static String |
encodeLeaderInformation(LeaderInformation leaderInformation) |
static String |
extractLeaderName(String key) |
static Map<String,String> |
getCommonLabels(String clusterId)
Get the common labels for Flink native clusters.
|
static Map<String,String> |
getConfigMapLabels(String clusterId,
String type)
Get ConfigMap labels for the current Flink cluster.
|
static String |
getDeploymentName(String clusterId)
Generate name of the Deployment.
|
static Map<String,String> |
getJobManagerSelectors(String clusterId)
Get job manager selectors for the current Flink cluster.
|
static LeaderInformation |
getLeaderInformationFromConfigMap(KubernetesConfigMap configMap)
Get the
LeaderInformation from ConfigMap. |
static String |
getNamespacedServiceName(io.fabric8.kubernetes.api.model.Service service)
Generate namespaced name of the service.
|
static KubernetesConfigMap |
getOnlyConfigMap(List<KubernetesConfigMap> configMaps,
String expectedConfigMapName)
Check the ConfigMap list should only contain the expected one.
|
static io.fabric8.kubernetes.api.model.ResourceRequirements |
getResourceRequirements(io.fabric8.kubernetes.api.model.ResourceRequirements resourceRequirements,
int mem,
double memoryLimitFactor,
double cpu,
double cpuLimitFactor,
Map<String,ExternalResource> externalResources,
Map<String,String> externalResourceConfigKeys)
Get resource requirements from memory and cpu.
|
static String |
getServiceAccount(FlinkPod flinkPod)
Get the service account from the input pod first, if not specified, the service account name
will be used.
|
static List<String> |
getStartCommandWithBashWrapper(String command) |
static File |
getTaskManagerPodTemplateFileInPod() |
static Map<String,String> |
getTaskManagerSelectors(String clusterId)
Get task manager selectors for the current Flink cluster.
|
static boolean |
isHostNetwork(Configuration configuration)
Checks if hostNetwork is enabled.
|
static boolean |
isSingleLeaderKey(String key) |
static FlinkPod |
loadPodFromTemplateFile(FlinkKubeClient kubeClient,
File podTemplateFile,
String mainContainerName) |
static Optional<LeaderInformation> |
parseLeaderInformationSafely(String value) |
static Integer |
parsePort(Configuration flinkConfig,
ConfigOption<String> port)
Parse a valid port for the config option.
|
static String |
resolveDNSPolicy(String dnsPolicy,
boolean hostNetworkEnabled)
Resolve the DNS policy defined value.
|
static <T> String |
resolveUserDefinedValue(Configuration flinkConfig,
ConfigOption<T> configOption,
String valueOfConfigOptionOrDefault,
String valueOfPodTemplate,
String fieldDescription)
Resolve the user defined value with the precedence.
|
static String |
tryToGetPrettyPrintYaml(io.fabric8.kubernetes.api.model.KubernetesResource kubernetesResource)
Try to get the pretty print yaml for Kubernetes resource.
|
public static void checkAndUpdatePortConfigOption(Configuration flinkConfig, ConfigOption<String> port, int fallbackPort)
flinkConfig
- flink configurationport
- config option need to be checkedfallbackPort
- the fallback port that will be set to the configurationpublic static Integer parsePort(Configuration flinkConfig, ConfigOption<String> port)
flinkConfig
- flink configport
- port config optionpublic static String getDeploymentName(String clusterId)
public static Map<String,String> getTaskManagerSelectors(String clusterId)
public static Map<String,String> getJobManagerSelectors(String clusterId)
public static Map<String,String> getCommonLabels(String clusterId)
clusterId
- cluster idpublic static Map<String,String> getConfigMapLabels(String clusterId, String type)
clusterId
- cluster idtype
- the config map use case. It could only be Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY
now.public static KubernetesConfigMap getOnlyConfigMap(List<KubernetesConfigMap> configMaps, String expectedConfigMapName)
configMaps
- ConfigMap list to checkexpectedConfigMapName
- expected ConfigMap Namepublic static LeaderInformation getLeaderInformationFromConfigMap(KubernetesConfigMap configMap)
LeaderInformation
from ConfigMap.configMap
- ConfigMap contains the leader informationLeaderInformation.empty()
if there is
no corresponding data in the ConfigMap.public static JobGraphStore createJobGraphStore(Configuration configuration, FlinkKubeClient flinkKubeClient, String configMapName, String lockIdentity) throws Exception
DefaultJobGraphStore
with NoOpJobGraphStoreWatcher
.configuration
- configuration to build a RetrievableStateStorageHelperflinkKubeClient
- flink kubernetes clientconfigMapName
- ConfigMap namelockIdentity
- lock identity to check the leadershipDefaultJobGraphStore
with NoOpJobGraphStoreWatcher
Exception
- when create the storage helperpublic static KubernetesStateHandleStore<JobGraph> createJobGraphStateHandleStore(Configuration configuration, FlinkKubeClient flinkKubeClient, String configMapName, String lockIdentity) throws Exception
KubernetesStateHandleStore
which storing JobGraph
.configuration
- configuration to build a RetrievableStateStorageHelperflinkKubeClient
- flink kubernetes clientconfigMapName
- ConfigMap namelockIdentity
- lock identity to check the leadershipKubernetesStateHandleStore
which storing JobGraph
.Exception
- when create the storage helperpublic static CompletedCheckpointStore createCompletedCheckpointStore(Configuration configuration, FlinkKubeClient kubeClient, Executor executor, String configMapName, @Nullable String lockIdentity, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, Executor ioExecutor, RestoreMode restoreMode) throws Exception
DefaultCompletedCheckpointStore
with KubernetesStateHandleStore
.configuration
- configuration to build a RetrievableStateStorageHelperkubeClient
- flink kubernetes clientconfigMapName
- ConfigMap nameexecutor
- executor to run blocking callslockIdentity
- lock identity to check the leadershipmaxNumberOfCheckpointsToRetain
- max number of checkpoints to retain on state store
handlerestoreMode
- the mode in which the job is restoringDefaultCompletedCheckpointStore
with KubernetesStateHandleStore
.Exception
- when create the storage helper failedpublic static io.fabric8.kubernetes.api.model.ResourceRequirements getResourceRequirements(io.fabric8.kubernetes.api.model.ResourceRequirements resourceRequirements, int mem, double memoryLimitFactor, double cpu, double cpuLimitFactor, Map<String,ExternalResource> externalResources, Map<String,String> externalResourceConfigKeys)
resourceRequirements
- resource requirements in pod templatemem
- Memory in mb.memoryLimitFactor
- limit factor for the memory, used to set the limit resources.cpu
- cpu.cpuLimitFactor
- limit factor for the cpu, used to set the limit resources.externalResources
- external resourcesexternalResourceConfigKeys
- config keys of external resourcespublic static List<String> getStartCommandWithBashWrapper(String command)
public static List<URI> checkJarFileForApplicationMode(Configuration configuration)
public static FlinkPod loadPodFromTemplateFile(FlinkKubeClient kubeClient, File podTemplateFile, String mainContainerName)
public static File getTaskManagerPodTemplateFileInPod()
public static <T> String resolveUserDefinedValue(Configuration flinkConfig, ConfigOption<T> configOption, String valueOfConfigOptionOrDefault, @Nullable String valueOfPodTemplate, String fieldDescription)
T
- The type of value associated with the configuration option.flinkConfig
- flink configurationconfigOption
- the config option to define the Kubernetes fieldsvalueOfConfigOptionOrDefault
- the value defined by explicit config option or defaultvalueOfPodTemplate
- the value defined in the pod templatefieldDescription
- Kubernetes fields descriptionpublic static String resolveDNSPolicy(String dnsPolicy, boolean hostNetworkEnabled)
dnsPolicy
- DNS policy defined in pod template spechostNetworkEnabled
- Host network enabled or not@Nullable public static String getServiceAccount(FlinkPod flinkPod)
flinkPod
- the Flink pod to parse the service accountpublic static String tryToGetPrettyPrintYaml(io.fabric8.kubernetes.api.model.KubernetesResource kubernetesResource)
kubernetesResource
- kubernetes resourceKubernetesResource#toString()
if parse failed.public static boolean isHostNetwork(Configuration configuration)
public static void createConfigMapIfItDoesNotExist(FlinkKubeClient flinkKubeClient, String configMapName, String clusterId) throws FlinkException
flinkKubeClient
- to use for creating the config mapconfigMapName
- name of the config mapclusterId
- clusterId to which the map belongsFlinkException
- if the config map could not be createdpublic static String encodeLeaderInformation(LeaderInformation leaderInformation)
public static Optional<LeaderInformation> parseLeaderInformationSafely(String value)
public static boolean isSingleLeaderKey(String key)
public static String getNamespacedServiceName(io.fabric8.kubernetes.api.model.Service service)
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.