Class AbstractFlinkService
- java.lang.Object
-
- org.apache.flink.kubernetes.operator.service.AbstractFlinkService
-
- All Implemented Interfaces:
FlinkService
- Direct Known Subclasses:
NativeFlinkService
,StandaloneFlinkService
public abstract class AbstractFlinkService extends java.lang.Object implements FlinkService
An abstractFlinkService
containing some common implementations for the native and standalone Flink Services.
-
-
Field Summary
Fields Modifier and Type Field Description protected ArtifactManager
artifactManager
protected java.util.concurrent.ExecutorService
executorService
static java.lang.String
FIELD_NAME_TOTAL_CPU
static java.lang.String
FIELD_NAME_TOTAL_MEMORY
protected io.fabric8.kubernetes.client.KubernetesClient
kubernetesClient
protected FlinkOperatorConfiguration
operatorConfig
-
Constructor Summary
Constructors Constructor Description AbstractFlinkService(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, ArtifactManager artifactManager, java.util.concurrent.ExecutorService executorService, FlinkOperatorConfiguration operatorConfig)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf, boolean deleteClusterAfterSavepoint)
void
cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf)
protected static java.time.Duration
deleteBlocking(java.lang.String operation, java.util.concurrent.Callable<io.fabric8.kubernetes.client.dsl.Waitable> delete, java.time.Duration timeout)
Generic blocking delete operation implementation for triggering and waiting for removal of the selected resources.void
deleteClusterDeployment(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkDeploymentStatus status, org.apache.flink.configuration.Configuration conf, boolean deleteHaData)
protected abstract void
deleteClusterInternal(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf, io.fabric8.kubernetes.api.model.DeletionPropagation deletionPropagation)
Delete Flink kubernetes cluster by deleting the kubernetes resources directly.protected java.time.Duration
deleteDeploymentBlocking(java.lang.String name, io.fabric8.kubernetes.client.dsl.Resource<io.fabric8.kubernetes.api.model.apps.Deployment> deployment, io.fabric8.kubernetes.api.model.DeletionPropagation propagation, java.time.Duration timeout)
Wait until Deployment is removed, return remaining timeout.protected void
deleteHAData(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf)
protected abstract void
deployApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf)
protected abstract void
deploySessionCluster(org.apache.flink.configuration.Configuration conf)
void
disposeSavepoint(java.lang.String savepointPath, org.apache.flink.configuration.Configuration conf)
CheckpointFetchResult
fetchCheckpointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
SavepointFetchResult
fetchSavepointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
org.apache.flink.api.java.tuple.Tuple2<java.util.Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,java.util.Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
getCheckpointInfo(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf)
org.apache.flink.client.program.rest.RestClusterClient<java.lang.String>
getClusterClient(org.apache.flink.configuration.Configuration conf)
java.util.Map<java.lang.String,java.lang.String>
getClusterInfo(org.apache.flink.configuration.Configuration conf)
protected abstract io.fabric8.kubernetes.api.model.PodList
getJmPodList(java.lang.String namespace, java.lang.String clusterId)
io.fabric8.kubernetes.api.model.PodList
getJmPodList(FlinkDeployment deployment, org.apache.flink.configuration.Configuration conf)
io.fabric8.kubernetes.client.KubernetesClient
getKubernetesClient()
java.util.Optional<Savepoint>
getLastCheckpoint(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf)
java.util.Map<java.lang.String,java.lang.String>
getMetrics(org.apache.flink.configuration.Configuration conf, java.lang.String jobId, java.util.List<java.lang.String> metricNames)
protected org.apache.flink.runtime.rest.RestClient
getRestClient(org.apache.flink.configuration.Configuration conf)
protected java.net.SocketAddress
getSocketAddress(org.apache.flink.client.program.rest.RestClusterClient<java.lang.String> clusterClient)
protected abstract io.fabric8.kubernetes.api.model.PodList
getTmPodList(java.lang.String namespace, java.lang.String clusterId)
boolean
isHaMetadataAvailable(org.apache.flink.configuration.Configuration conf)
boolean
isJobManagerPortReady(org.apache.flink.configuration.Configuration config)
java.util.Collection<org.apache.flink.runtime.client.JobStatusMessage>
listJobs(org.apache.flink.configuration.Configuration conf)
protected static org.apache.flink.configuration.Configuration
removeOperatorConfigs(org.apache.flink.configuration.Configuration config)
org.apache.flink.runtime.jobmaster.JobResult
requestJobResult(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobID)
protected void
runJar(JobSpec job, org.apache.flink.api.common.JobID jobID, org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody response, org.apache.flink.configuration.Configuration conf, java.lang.String savepoint)
void
submitApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf, boolean requireHaMetadata)
org.apache.flink.api.common.JobID
submitJobToSessionCluster(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkSessionJobSpec spec, org.apache.flink.api.common.JobID jobID, org.apache.flink.configuration.Configuration conf, java.lang.String savepoint)
void
submitSessionCluster(org.apache.flink.configuration.Configuration conf)
void
triggerCheckpoint(java.lang.String jobId, SnapshotTriggerType triggerType, CheckpointInfo checkpointInfo, org.apache.flink.configuration.Configuration conf)
void
triggerSavepoint(java.lang.String jobId, SnapshotTriggerType triggerType, SavepointInfo savepointInfo, org.apache.flink.configuration.Configuration conf)
protected void
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status)
protected org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody
uploadJar(io.fabric8.kubernetes.api.model.ObjectMeta objectMeta, FlinkSessionJobSpec spec, org.apache.flink.configuration.Configuration conf)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.kubernetes.operator.service.FlinkService
cancelJob, scale
-
-
-
-
Field Detail
-
FIELD_NAME_TOTAL_CPU
public static final java.lang.String FIELD_NAME_TOTAL_CPU
- See Also:
- Constant Field Values
-
FIELD_NAME_TOTAL_MEMORY
public static final java.lang.String FIELD_NAME_TOTAL_MEMORY
- See Also:
- Constant Field Values
-
kubernetesClient
protected final io.fabric8.kubernetes.client.KubernetesClient kubernetesClient
-
executorService
protected final java.util.concurrent.ExecutorService executorService
-
operatorConfig
protected final FlinkOperatorConfiguration operatorConfig
-
artifactManager
protected final ArtifactManager artifactManager
-
-
Constructor Detail
-
AbstractFlinkService
public AbstractFlinkService(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, ArtifactManager artifactManager, java.util.concurrent.ExecutorService executorService, FlinkOperatorConfiguration operatorConfig)
-
-
Method Detail
-
getJmPodList
protected abstract io.fabric8.kubernetes.api.model.PodList getJmPodList(java.lang.String namespace, java.lang.String clusterId)
-
getTmPodList
protected abstract io.fabric8.kubernetes.api.model.PodList getTmPodList(java.lang.String namespace, java.lang.String clusterId)
-
deployApplicationCluster
protected abstract void deployApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
deploySessionCluster
protected abstract void deploySessionCluster(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
getKubernetesClient
public io.fabric8.kubernetes.client.KubernetesClient getKubernetesClient()
- Specified by:
getKubernetesClient
in interfaceFlinkService
-
submitApplicationCluster
public void submitApplicationCluster(JobSpec jobSpec, org.apache.flink.configuration.Configuration conf, boolean requireHaMetadata) throws java.lang.Exception
- Specified by:
submitApplicationCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
submitSessionCluster
public void submitSessionCluster(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
submitSessionCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
isHaMetadataAvailable
public boolean isHaMetadataAvailable(org.apache.flink.configuration.Configuration conf)
- Specified by:
isHaMetadataAvailable
in interfaceFlinkService
-
submitJobToSessionCluster
public org.apache.flink.api.common.JobID submitJobToSessionCluster(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkSessionJobSpec spec, org.apache.flink.api.common.JobID jobID, org.apache.flink.configuration.Configuration conf, @Nullable java.lang.String savepoint) throws java.lang.Exception
- Specified by:
submitJobToSessionCluster
in interfaceFlinkService
- Throws:
java.lang.Exception
-
isJobManagerPortReady
public boolean isJobManagerPortReady(org.apache.flink.configuration.Configuration config)
- Specified by:
isJobManagerPortReady
in interfaceFlinkService
-
getSocketAddress
protected java.net.SocketAddress getSocketAddress(org.apache.flink.client.program.rest.RestClusterClient<java.lang.String> clusterClient) throws java.net.MalformedURLException
- Throws:
java.net.MalformedURLException
-
listJobs
public java.util.Collection<org.apache.flink.runtime.client.JobStatusMessage> listJobs(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
listJobs
in interfaceFlinkService
- Throws:
java.lang.Exception
-
requestJobResult
public org.apache.flink.runtime.jobmaster.JobResult requestJobResult(org.apache.flink.configuration.Configuration conf, org.apache.flink.api.common.JobID jobID) throws java.lang.Exception
- Specified by:
requestJobResult
in interfaceFlinkService
- Throws:
java.lang.Exception
-
cancelJob
protected void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf, boolean deleteClusterAfterSavepoint) throws java.lang.Exception
- Throws:
java.lang.Exception
-
cancelSessionJob
public void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
cancelSessionJob
in interfaceFlinkService
- Throws:
java.lang.Exception
-
triggerSavepoint
public void triggerSavepoint(java.lang.String jobId, SnapshotTriggerType triggerType, SavepointInfo savepointInfo, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
triggerSavepoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
triggerCheckpoint
public void triggerCheckpoint(java.lang.String jobId, SnapshotTriggerType triggerType, CheckpointInfo checkpointInfo, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
triggerCheckpoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getLastCheckpoint
public java.util.Optional<Savepoint> getLastCheckpoint(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getLastCheckpoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getCheckpointInfo
public org.apache.flink.api.java.tuple.Tuple2<java.util.Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,java.util.Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>> getCheckpointInfo(org.apache.flink.api.common.JobID jobId, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getCheckpointInfo
in interfaceFlinkService
- Throws:
java.lang.Exception
-
disposeSavepoint
public void disposeSavepoint(java.lang.String savepointPath, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
disposeSavepoint
in interfaceFlinkService
- Throws:
java.lang.Exception
-
fetchSavepointInfo
public SavepointFetchResult fetchSavepointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
- Specified by:
fetchSavepointInfo
in interfaceFlinkService
-
fetchCheckpointInfo
public CheckpointFetchResult fetchCheckpointInfo(java.lang.String triggerId, java.lang.String jobId, org.apache.flink.configuration.Configuration conf)
- Specified by:
fetchCheckpointInfo
in interfaceFlinkService
-
getClusterInfo
public java.util.Map<java.lang.String,java.lang.String> getClusterInfo(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getClusterInfo
in interfaceFlinkService
- Throws:
java.lang.Exception
-
getJmPodList
public io.fabric8.kubernetes.api.model.PodList getJmPodList(FlinkDeployment deployment, org.apache.flink.configuration.Configuration conf)
- Specified by:
getJmPodList
in interfaceFlinkService
-
getClusterClient
public org.apache.flink.client.program.rest.RestClusterClient<java.lang.String> getClusterClient(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Specified by:
getClusterClient
in interfaceFlinkService
- Throws:
java.lang.Exception
-
runJar
@VisibleForTesting protected void runJar(JobSpec job, org.apache.flink.api.common.JobID jobID, org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody response, org.apache.flink.configuration.Configuration conf, java.lang.String savepoint)
-
uploadJar
@VisibleForTesting protected org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody uploadJar(io.fabric8.kubernetes.api.model.ObjectMeta objectMeta, FlinkSessionJobSpec spec, org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
getRestClient
@VisibleForTesting protected org.apache.flink.runtime.rest.RestClient getRestClient(org.apache.flink.configuration.Configuration conf) throws java.lang.Exception
- Throws:
java.lang.Exception
-
deleteDeploymentBlocking
@VisibleForTesting protected java.time.Duration deleteDeploymentBlocking(java.lang.String name, io.fabric8.kubernetes.client.dsl.Resource<io.fabric8.kubernetes.api.model.apps.Deployment> deployment, io.fabric8.kubernetes.api.model.DeletionPropagation propagation, java.time.Duration timeout)
Wait until Deployment is removed, return remaining timeout.
-
removeOperatorConfigs
@VisibleForTesting protected static org.apache.flink.configuration.Configuration removeOperatorConfigs(org.apache.flink.configuration.Configuration config)
-
getMetrics
public java.util.Map<java.lang.String,java.lang.String> getMetrics(org.apache.flink.configuration.Configuration conf, java.lang.String jobId, java.util.List<java.lang.String> metricNames) throws java.lang.Exception
- Specified by:
getMetrics
in interfaceFlinkService
- Throws:
java.lang.Exception
-
deleteClusterDeployment
public final void deleteClusterDeployment(io.fabric8.kubernetes.api.model.ObjectMeta meta, FlinkDeploymentStatus status, org.apache.flink.configuration.Configuration conf, boolean deleteHaData)
- Specified by:
deleteClusterDeployment
in interfaceFlinkService
-
deleteClusterInternal
protected abstract void deleteClusterInternal(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf, io.fabric8.kubernetes.api.model.DeletionPropagation deletionPropagation)
Delete Flink kubernetes cluster by deleting the kubernetes resources directly.- Parameters:
namespace
- NamespaceclusterId
- ClusterIdconf
- Configuration of the Flink applicationdeletionPropagation
- Resource deletion propagation policy
-
deleteHAData
protected void deleteHAData(java.lang.String namespace, java.lang.String clusterId, org.apache.flink.configuration.Configuration conf)
-
updateStatusAfterClusterDeletion
protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status)
-
deleteBlocking
protected static java.time.Duration deleteBlocking(java.lang.String operation, java.util.concurrent.Callable<io.fabric8.kubernetes.client.dsl.Waitable> delete, java.time.Duration timeout)
Generic blocking delete operation implementation for triggering and waiting for removal of the selected resources. By returning the remaining timeout we allow chaining multiple delete operations under a single timeout setting easily.- Parameters:
operation
- Name of the operation for loggingdelete
- Call that should trigger the async deletion and return the resource to be watchedtimeout
- Timeout for the operation- Returns:
- Remaining timeout after deletion.
-
-