Class AbstractFlinkService
- java.lang.Object
-
- org.apache.flink.kubernetes.operator.service.AbstractFlinkService
-
- All Implemented Interfaces:
FlinkService
- Direct Known Subclasses:
NativeFlinkService
,StandaloneFlinkService
public abstract class AbstractFlinkService extends java.lang.Object implements FlinkService
An abstractFlinkService
containing some common implementations for the native and standalone Flink Services.
-
-
Field Summary
Fields Modifier and Type Field Description protected ArtifactManager
artifactManager
protected FlinkConfigManager
configManager
protected io.fabric8.kubernetes.client.KubernetesClient
kubernetesClient
-
Constructor Summary
Constructors Constructor Description AbstractFlinkService(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, FlinkConfigManager configManager)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf, boolean deleteClusterAfterSavepoint)
void
cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf)
void
deleteClusterDeployment(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData)
protected abstract void
deleteClusterInternal(io.fabric8.kubernetes.api.model.ObjectMeta meta, boolean deleteHaConfigmaps)
Delete Flink kubernetes cluster by deleting the kubernetes resources directly.protected abstract void
deployApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf)
void
disposeSavepoint(java.lang.String savepointPath, org.apache.flink.configuration.Configuration conf)
SavepointFetchResult
fetchSavepointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
protected org.apache.flink.client.program.ClusterClient<java.lang.String>
getClusterClient(org.apache.flink.configuration.Configuration conf)
java.util.Map<java.lang.String,java.lang.String>
getClusterInfo(org.apache.flink.configuration.Configuration conf)
protected static org.apache.flink.api.common.JobStatus
getEffectiveStatus(org.apache.flink.runtime.messages.webmonitor.JobDetails details)
protected abstract io.fabric8.kubernetes.api.model.PodList
getJmPodList(java.lang.String namespace, java.lang.String clusterId)
io.fabric8.kubernetes.api.model.PodList
getJmPodList(FlinkDeployment deployment, org.apache.flink.configuration.Configuration conf)
io.fabric8.kubernetes.client.KubernetesClient
getKubernetesClient()
java.util.Optional<Savepoint>
getLastCheckpoint(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf)
java.util.Map<java.lang.String,java.lang.String>
getMetrics(org.apache.flink.configuration.Configuration conf, java.lang.String jobId, java.util.List<java.lang.String> metricNames)
boolean
isHaMetadataAvailable(org.apache.flink.configuration.Configuration conf)
boolean
isJobManagerPortReady(org.apache.flink.configuration.Configuration config)
java.util.Collection<org.apache.flink.runtime.client.JobStatusMessage>
listJobs(org.apache.flink.configuration.Configuration conf)
protected static org.apache.flink.configuration.Configuration
removeOperatorConfigs(org.apache.flink.configuration.Configuration config)
org.apache.flink.runtime.jobmaster.JobResult
requestJobResult(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobID)
void
submitApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf, boolean requireHaMetadata)
org.apache.flink.api.common.JobID
submitJobToSessionCluster(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkSessionJobSpec spec, org.apache.flink.configuration.Configuration conf, java.lang.String savepoint)
void
triggerSavepoint(java.lang.String jobId, SavepointTriggerType triggerType, SavepointInfo savepointInfo, org.apache.flink.configuration.Configuration conf)
protected void
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status)
void
waitForClusterShutdown(org.apache.flink.configuration.Configuration conf)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.kubernetes.operator.service.FlinkService
cancelJob, scale, submitSessionCluster
-
-
-
-
Field Detail
-
kubernetesClient
protected final io.fabric8.kubernetes.client.KubernetesClient kubernetesClient
-
configManager
protected final FlinkConfigManager configManager
-
artifactManager
protected final ArtifactManager artifactManager
-
-
Constructor Detail
-
AbstractFlinkService
public AbstractFlinkService(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, FlinkConfigManager configManager)
-
-
Method Detail
-
getJmPodList
protected abstract io.fabric8.kubernetes.api.model.PodList getJmPodList(java.lang.String namespace, java.lang.String clusterId)
-
deployApplicationCluster
protected abstract void deployApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
getKubernetesClient
public io.fabric8.kubernetes.client.KubernetesClient getKubernetesClient()
- Specified by:
getKubernetesClient
in interfaceFlinkService
-
submitApplicationCluster
public void submitApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf, boolean requireHaMetadata) throws java.lang.Exception
- Specified by:
submitApplicationCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
isHaMetadataAvailable
public boolean isHaMetadataAvailable(org.apache.flink.configuration.Configuration conf)
- Specified by:
isHaMetadataAvailable
in interfaceFlinkService
-
submitJobToSessionCluster
public org.apache.flink.api.common.JobID submitJobToSessionCluster(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkSessionJobSpec spec, org.apache.flink.configuration.Configuration conf, @Nullable java.lang.String savepoint) throws java.lang.Exception
- Specified by:
submitJobToSessionCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
isJobManagerPortReady
public boolean isJobManagerPortReady(org.apache.flink.configuration.Configuration config)
- Specified by:
isJobManagerPortReady
in interfaceFlinkService
-
listJobs
public java.util.Collection<org.apache.flink.runtime.client.JobStatusMessage> listJobs(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
listJobs
in interfaceFlinkService
- Throws:
java.lang.Exception
-
requestJobResult
public org.apache.flink.runtime.jobmaster.JobResult requestJobResult(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobID) throws java.lang.Exception
- Specified by:
requestJobResult
in interfaceFlinkService
- Throws:
java.lang.Exception
-
cancelJob
protected void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf, boolean deleteClusterAfterSavepoint) throws java.lang.Exception
- Throws:
java.lang.Exception
-
cancelSessionJob
public void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
cancelSessionJob
in interfaceFlinkService
- Throws:
java.lang.Exception
-
triggerSavepoint
public void triggerSavepoint(java.lang.String jobId, SavepointTriggerType triggerType, SavepointInfo savepointInfo, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
triggerSavepoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getLastCheckpoint
public java.util.Optional<Savepoint> getLastCheckpoint(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getLastCheckpoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
disposeSavepoint
public void disposeSavepoint(java.lang.String savepointPath, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
disposeSavepoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
fetchSavepointInfo
public SavepointFetchResult fetchSavepointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
- Specified by:
fetchSavepointInfo
in interfaceFlinkService
-
getClusterInfo
public java.util.Map<java.lang.String,java.lang.String> getClusterInfo(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getClusterInfo
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getJmPodList
public io.fabric8.kubernetes.api.model.PodList getJmPodList(FlinkDeployment deployment, org.apache.flink.configuration.Configuration conf)
- Specified by:
getJmPodList
in interfaceFlinkService
-
waitForClusterShutdown
public void waitForClusterShutdown(org.apache.flink.configuration.Configuration conf)
- Specified by:
waitForClusterShutdown
in interfaceFlinkService
-
getClusterClient
@VisibleForTesting protected org.apache.flink.client.program.ClusterClient<java.lang.String> getClusterClient(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
removeOperatorConfigs
@VisibleForTesting protected static org.apache.flink.configuration.Configuration removeOperatorConfigs(org.apache.flink.configuration.Configuration config)
-
getEffectiveStatus
@VisibleForTesting protected static org.apache.flink.api.common.JobStatus getEffectiveStatus(org.apache.flink.runtime.messages.webmonitor.JobDetails details)
-
getMetrics
public java.util.Map<java.lang.String,java.lang.String> getMetrics(org.apache.flink.configuration.Configuration conf, java.lang.String jobId, java.util.List<java.lang.String> metricNames) throws java.lang.Exception
- Specified by:
getMetrics
in interfaceFlinkService
- Throws:
java.lang.Exception
-
deleteClusterDeployment
public final void deleteClusterDeployment(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData)
- Specified by:
deleteClusterDeployment
in interfaceFlinkService
-
deleteClusterInternal
protected abstract void deleteClusterInternal(io.fabric8.kubernetes.api.model.ObjectMeta meta, boolean deleteHaConfigmaps)
Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally allows deleting the native kubernetes HA resources as well.- Parameters:
meta
- ObjectMeta of the deploymentdeleteHaConfigmaps
- Flag to indicate whether k8s HA metadata should be removed as well
-
updateStatusAfterClusterDeletion
protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status)
-
-