Class AbstractFlinkService
- java.lang.Object
-
- org.apache.flink.kubernetes.operator.service.AbstractFlinkService
-
- All Implemented Interfaces:
FlinkService
- Direct Known Subclasses:
NativeFlinkService
,StandaloneFlinkService
public abstract class AbstractFlinkService extends java.lang.Object implements FlinkService
An abstractFlinkService
containing some common implementations for the native and standalone Flink Services.
-
-
Field Summary
Fields Modifier and Type Field Description protected ArtifactManager
artifactManager
protected java.util.concurrent.ExecutorService
executorService
static java.lang.String
FIELD_NAME_TOTAL_CPU
static java.lang.String
FIELD_NAME_TOTAL_MEMORY
protected io.fabric8.kubernetes.client.KubernetesClient
kubernetesClient
protected FlinkOperatorConfiguration
operatorConfig
-
Constructor Summary
Constructors Constructor Description AbstractFlinkService(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, ArtifactManager artifactManager, java.util.concurrent.ExecutorService executorService, FlinkOperatorConfiguration operatorConfig)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description boolean
atLeastOneCheckpoint(org.apache.flink.configuration.Configuration conf)
protected java.util.Optional<java.lang.String>
cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf, boolean deleteClusterAfterSavepoint)
java.util.Optional<java.lang.String>
cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf)
protected static java.time.Duration
deleteBlocking(java.lang.String operation, java.util.concurrent.Callable<io.fabric8.kubernetes.client.dsl.Waitable> delete, java.time.Duration timeout)
Generic blocking delete operation implementation for triggering and waiting for removal of the selected resources.void
deleteClusterDeployment(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkDeploymentStatus status, org.apache.flink.configuration.Configuration conf, boolean deleteHaData)
protected abstract void
deleteClusterInternal(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf, io.fabric8.kubernetes.api.model.DeletionPropagation deletionPropagation)
Delete Flink kubernetes cluster by deleting the kubernetes resources directly.protected java.time.Duration
deleteDeploymentBlocking(java.lang.String name, io.fabric8.kubernetes.client.dsl.Resource<io.fabric8.kubernetes.api.model.apps.Deployment> deployment, io.fabric8.kubernetes.api.model.DeletionPropagation propagation, java.time.Duration timeout)
Wait until Deployment is removed, return remaining timeout.protected void
deleteHAData(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf)
protected abstract void
deployApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf)
protected abstract void
deploySessionCluster(org.apache.flink.configuration.Configuration conf)
void
disposeSavepoint(java.lang.String savepointPath, org.apache.flink.configuration.Configuration conf)
CheckpointFetchResult
fetchCheckpointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
CheckpointStatsResult
fetchCheckpointStats(java.lang.String jobId, java.lang.Long checkpointId, org.apache.flink.configuration.Configuration conf)
SavepointFetchResult
fetchSavepointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
org.apache.flink.api.java.tuple.Tuple2<java.util.Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,java.util.Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
getCheckpointInfo(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf)
org.apache.flink.client.program.rest.RestClusterClient<java.lang.String>
getClusterClient(org.apache.flink.configuration.Configuration conf)
java.util.Map<java.lang.String,java.lang.String>
getClusterInfo(org.apache.flink.configuration.Configuration conf)
protected abstract io.fabric8.kubernetes.api.model.PodList
getJmPodList(java.lang.String namespace, java.lang.String clusterId)
io.fabric8.kubernetes.api.model.PodList
getJmPodList(FlinkDeployment deployment, org.apache.flink.configuration.Configuration conf)
java.util.Optional<org.apache.flink.runtime.client.JobStatusMessage>
getJobStatus(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobId)
io.fabric8.kubernetes.client.KubernetesClient
getKubernetesClient()
java.util.Optional<Savepoint>
getLastCheckpoint(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf)
java.util.Map<java.lang.String,java.lang.String>
getMetrics(org.apache.flink.configuration.Configuration conf, java.lang.String jobId, java.util.List<java.lang.String> metricNames)
protected org.apache.flink.runtime.rest.RestClient
getRestClient(org.apache.flink.configuration.Configuration conf)
protected java.net.SocketAddress
getSocketAddress(org.apache.flink.client.program.rest.RestClusterClient<java.lang.String> clusterClient)
protected abstract io.fabric8.kubernetes.api.model.PodList
getTmPodList(java.lang.String namespace, java.lang.String clusterId)
boolean
isHaMetadataAvailable(org.apache.flink.configuration.Configuration conf)
boolean
isJobManagerPortReady(org.apache.flink.configuration.Configuration config)
static boolean
isJobMissingOrTerminated(java.lang.Exception e)
protected static org.apache.flink.configuration.Configuration
removeOperatorConfigs(org.apache.flink.configuration.Configuration config)
org.apache.flink.runtime.jobmaster.JobResult
requestJobResult(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobID)
protected void
runJar(JobSpec job, org.apache.flink.api.common.JobID jobID, org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody response, org.apache.flink.configuration.Configuration conf, java.lang.String savepoint)
void
submitApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf, boolean requireHaMetadata)
org.apache.flink.api.common.JobID
submitJobToSessionCluster(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkSessionJobSpec spec, org.apache.flink.api.common.JobID jobID, org.apache.flink.configuration.Configuration conf, java.lang.String savepoint)
void
submitSessionCluster(org.apache.flink.configuration.Configuration conf)
java.lang.String
triggerCheckpoint(java.lang.String jobId, org.apache.flink.core.execution.CheckpointType checkpointType, org.apache.flink.configuration.Configuration conf)
java.lang.String
triggerSavepoint(java.lang.String jobId, org.apache.flink.core.execution.SavepointFormatType savepointFormatType, java.lang.String savepointDirectory, org.apache.flink.configuration.Configuration conf)
protected void
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status)
protected org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody
uploadJar(io.fabric8.kubernetes.api.model.ObjectMeta objectMeta, FlinkSessionJobSpec spec, org.apache.flink.configuration.Configuration conf)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.kubernetes.operator.service.FlinkService
cancelJob, scale
-
-
-
-
Field Detail
-
FIELD_NAME_TOTAL_CPU
public static final java.lang.String FIELD_NAME_TOTAL_CPU
- See Also:
- Constant Field Values
-
FIELD_NAME_TOTAL_MEMORY
public static final java.lang.String FIELD_NAME_TOTAL_MEMORY
- See Also:
- Constant Field Values
-
kubernetesClient
protected final io.fabric8.kubernetes.client.KubernetesClient kubernetesClient
-
executorService
protected final java.util.concurrent.ExecutorService executorService
-
operatorConfig
protected final FlinkOperatorConfiguration operatorConfig
-
artifactManager
protected final ArtifactManager artifactManager
-
-
Constructor Detail
-
AbstractFlinkService
public AbstractFlinkService(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, ArtifactManager artifactManager, java.util.concurrent.ExecutorService executorService, FlinkOperatorConfiguration operatorConfig)
-
-
Method Detail
-
getJmPodList
protected abstract io.fabric8.kubernetes.api.model.PodList getJmPodList(java.lang.String namespace, java.lang.String clusterId)
-
getTmPodList
protected abstract io.fabric8.kubernetes.api.model.PodList getTmPodList(java.lang.String namespace, java.lang.String clusterId)
-
deployApplicationCluster
protected abstract void deployApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
deploySessionCluster
protected abstract void deploySessionCluster(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
getKubernetesClient
public io.fabric8.kubernetes.client.KubernetesClient getKubernetesClient()
- Specified by:
getKubernetesClient
in interfaceFlinkService
-
submitApplicationCluster
public void submitApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf, boolean requireHaMetadata) throws java.lang.Exception
- Specified by:
submitApplicationCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
submitSessionCluster
public void submitSessionCluster(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
submitSessionCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
isHaMetadataAvailable
public boolean isHaMetadataAvailable(org.apache.flink.configuration.Configuration conf)
- Specified by:
isHaMetadataAvailable
in interfaceFlinkService
-
atLeastOneCheckpoint
public boolean atLeastOneCheckpoint(org.apache.flink.configuration.Configuration conf)
- Specified by:
atLeastOneCheckpoint
in interfaceFlinkService
-
submitJobToSessionCluster
public org.apache.flink.api.common.JobID submitJobToSessionCluster(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkSessionJobSpec spec, org.apache.flink.api.common.JobID jobID, org.apache.flink.configuration.Configuration conf, @Nullable java.lang.String savepoint) throws java.lang.Exception
- Specified by:
submitJobToSessionCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
isJobManagerPortReady
public boolean isJobManagerPortReady(org.apache.flink.configuration.Configuration config)
- Specified by:
isJobManagerPortReady
in interfaceFlinkService
-
getSocketAddress
protected java.net.SocketAddress getSocketAddress(org.apache.flink.client.program.rest.RestClusterClient<java.lang.String> clusterClient) throws java.net.MalformedURLException
- Throws:
java.net.MalformedURLException
-
getJobStatus
public java.util.Optional<org.apache.flink.runtime.client.JobStatusMessage> getJobStatus(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobId) throws java.lang.Exception
- Specified by:
getJobStatus
in interfaceFlinkService
- Throws:
java.lang.Exception
-
requestJobResult
public org.apache.flink.runtime.jobmaster.JobResult requestJobResult(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobID) throws java.lang.Exception
- Specified by:
requestJobResult
in interfaceFlinkService
- Throws:
java.lang.Exception
-
cancelJob
protected java.util.Optional<java.lang.String> cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf, boolean deleteClusterAfterSavepoint) throws java.lang.Exception
- Throws:
java.lang.Exception
-
cancelSessionJob
public java.util.Optional<java.lang.String> cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
cancelSessionJob
in interfaceFlinkService
- Throws:
java.lang.Exception
-
isJobMissingOrTerminated
public static boolean isJobMissingOrTerminated(java.lang.Exception e)
-
triggerSavepoint
public java.lang.String triggerSavepoint(java.lang.String jobId, org.apache.flink.core.execution.SavepointFormatType savepointFormatType, java.lang.String savepointDirectory, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
triggerSavepoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
triggerCheckpoint
public java.lang.String triggerCheckpoint(java.lang.String jobId, org.apache.flink.core.execution.CheckpointType checkpointType, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
triggerCheckpoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getLastCheckpoint
public java.util.Optional<Savepoint> getLastCheckpoint(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getLastCheckpoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getCheckpointInfo
public org.apache.flink.api.java.tuple.Tuple2<java.util.Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,java.util.Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>> getCheckpointInfo(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getCheckpointInfo
in interfaceFlinkService
- Throws:
java.lang.Exception
-
disposeSavepoint
public void disposeSavepoint(java.lang.String savepointPath, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
disposeSavepoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
fetchSavepointInfo
public SavepointFetchResult fetchSavepointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
- Specified by:
fetchSavepointInfo
in interfaceFlinkService
-
fetchCheckpointInfo
public CheckpointFetchResult fetchCheckpointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
- Specified by:
fetchCheckpointInfo
in interfaceFlinkService
-
fetchCheckpointStats
public CheckpointStatsResult fetchCheckpointStats(java.lang.String jobId, java.lang.Long checkpointId, org.apache.flink.configuration.Configuration conf)
- Specified by:
fetchCheckpointStats
in interfaceFlinkService
-
getClusterInfo
public java.util.Map<java.lang.String,java.lang.String> getClusterInfo(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getClusterInfo
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getJmPodList
public io.fabric8.kubernetes.api.model.PodList getJmPodList(FlinkDeployment deployment, org.apache.flink.configuration.Configuration conf)
- Specified by:
getJmPodList
in interfaceFlinkService
-
getClusterClient
public org.apache.flink.client.program.rest.RestClusterClient<java.lang.String> getClusterClient(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getClusterClient
in interfaceFlinkService
- Throws:
java.lang.Exception
-
runJar
@VisibleForTesting protected void runJar(JobSpec job, org.apache.flink.api.common.JobID jobID, org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody response, org.apache.flink.configuration.Configuration conf, java.lang.String savepoint)
-
uploadJar
@VisibleForTesting protected org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody uploadJar(io.fabric8.kubernetes.api.model.ObjectMeta objectMeta, FlinkSessionJobSpec spec, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
getRestClient
@VisibleForTesting protected org.apache.flink.runtime.rest.RestClient getRestClient(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
deleteDeploymentBlocking
@VisibleForTesting protected java.time.Duration deleteDeploymentBlocking(java.lang.String name, io.fabric8.kubernetes.client.dsl.Resource<io.fabric8.kubernetes.api.model.apps.Deployment> deployment, io.fabric8.kubernetes.api.model.DeletionPropagation propagation, java.time.Duration timeout)
Wait until Deployment is removed, return remaining timeout.
-
removeOperatorConfigs
@VisibleForTesting protected static org.apache.flink.configuration.Configuration removeOperatorConfigs(org.apache.flink.configuration.Configuration config)
-
getMetrics
public java.util.Map<java.lang.String,java.lang.String> getMetrics(org.apache.flink.configuration.Configuration conf, java.lang.String jobId, java.util.List<java.lang.String> metricNames) throws java.lang.Exception
- Specified by:
getMetrics
in interfaceFlinkService
- Throws:
java.lang.Exception
-
deleteClusterDeployment
public final void deleteClusterDeployment(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkDeploymentStatus status, org.apache.flink.configuration.Configuration conf, boolean deleteHaData)
- Specified by:
deleteClusterDeployment
in interfaceFlinkService
-
deleteClusterInternal
protected abstract void deleteClusterInternal(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf, io.fabric8.kubernetes.api.model.DeletionPropagation deletionPropagation)
Delete Flink kubernetes cluster by deleting the kubernetes resources directly.- Parameters:
namespace
- NamespaceclusterId
- ClusterIdconf
- Configuration of the Flink applicationdeletionPropagation
- Resource deletion propagation policy
-
deleteHAData
protected void deleteHAData(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf)
-
updateStatusAfterClusterDeletion
protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status)
-
deleteBlocking
protected static java.time.Duration deleteBlocking(java.lang.String operation, java.util.concurrent.Callable<io.fabric8.kubernetes.client.dsl.Waitable> delete, java.time.Duration timeout)
Generic blocking delete operation implementation for triggering and waiting for removal of the selected resources. By returning the remaining timeout we allow chaining multiple delete operations under a single timeout setting easily.- Parameters:
operation
- Name of the operation for loggingdelete
- Call that should trigger the async deletion and return the resource to be watchedtimeout
- Timeout for the operation- Returns:
- Remaining timeout after deletion.
-
-