Class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>,SPEC extends AbstractFlinkSpec,STATUS extends CommonStatus<SPEC>>
- java.lang.Object
-
- org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler<CR,SPEC,STATUS>
-
- All Implemented Interfaces:
Reconciler<CR>
- Direct Known Subclasses:
AbstractJobReconciler
,SessionReconciler
public abstract class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>,SPEC extends AbstractFlinkSpec,STATUS extends CommonStatus<SPEC>> extends java.lang.Object implements Reconciler<CR>
Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink related resources including initial deployments, upgrades, rollbacks etc.
-
-
Field Summary
Fields Modifier and Type Field Description protected java.time.Clock
clock
protected FlinkConfigManager
configManager
protected EventRecorder
eventRecorder
protected io.fabric8.kubernetes.client.KubernetesClient
kubernetesClient
static java.lang.String
MSG_ROLLBACK
static java.lang.String
MSG_SPEC_CHANGED
static java.lang.String
MSG_SUBMIT
static java.lang.String
MSG_SUSPENDED
protected StatusRecorder<CR,STATUS>
statusRecorder
-
Constructor Summary
Constructors Constructor Description AbstractFlinkResourceReconciler(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<CR,STATUS> statusRecorder)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description io.javaoperatorsdk.operator.api.reconciler.DeleteControl
cleanup(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
This is called when receiving the delete event of custom resource.protected abstract io.javaoperatorsdk.operator.api.reconciler.DeleteControl
cleanupInternal(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
Shut down and clean up all Flink job/cluster resources.protected abstract void
deploy(CR relatedResource, SPEC spec, STATUS status, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration deployConfig, java.util.Optional<java.lang.String> savepoint, boolean requireHaMetadata)
Deploys the target resource spec to Kubernetes.protected boolean
flinkVersionChanged(SPEC oldSpec, SPEC newSpec)
protected abstract org.apache.flink.configuration.Configuration
getDeployConfig(io.fabric8.kubernetes.api.model.ObjectMeta meta, SPEC spec, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx)
Get Flink configuration object for deploying the given spec usingdeploy(CR, SPEC, STATUS, io.javaoperatorsdk.operator.api.reconciler.Context<?>, org.apache.flink.configuration.Configuration, java.util.Optional<java.lang.String>, boolean)
.protected abstract FlinkService
getFlinkService(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
Get the Flink service related to the resource and context.protected abstract org.apache.flink.configuration.Configuration
getObserveConfig(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
Get Flink configuration for client interactions with the running Flink deployment/session job.protected abstract boolean
readyToReconcile(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration deployConfig)
Check whether the given Flink resource is ready to be reconciled or we are still waiting for any pending operation or condition first.void
reconcile(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx)
This is called when receiving the create or update event of the custom resource.protected abstract boolean
reconcileOtherChanges(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> context, org.apache.flink.configuration.Configuration observeConfig)
Reconcile any other changes required for this resource that are specific to the reconciler implementation.protected abstract boolean
reconcileSpecChange(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration observeConfig, org.apache.flink.configuration.Configuration deployConfig, DiffType diffType)
Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the status accordingly.protected abstract void
rollback(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration observeConfig)
Rollback deployed resource to the last stable spec.protected void
setClock(java.time.Clock clock)
protected void
setOwnerReference(CR owner, org.apache.flink.configuration.Configuration deployConfig)
protected boolean
shouldRecoverDeployment(org.apache.flink.configuration.Configuration conf, FlinkDeployment deployment)
Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated.
-
-
-
Field Detail
-
configManager
protected final FlinkConfigManager configManager
-
eventRecorder
protected final EventRecorder eventRecorder
-
statusRecorder
protected final StatusRecorder<CR extends AbstractFlinkResource<SPEC,STATUS>,STATUS extends CommonStatus<SPEC>> statusRecorder
-
kubernetesClient
protected final io.fabric8.kubernetes.client.KubernetesClient kubernetesClient
-
MSG_SUSPENDED
public static final java.lang.String MSG_SUSPENDED
- See Also:
- Constant Field Values
-
MSG_SPEC_CHANGED
public static final java.lang.String MSG_SPEC_CHANGED
- See Also:
- Constant Field Values
-
MSG_ROLLBACK
public static final java.lang.String MSG_ROLLBACK
- See Also:
- Constant Field Values
-
MSG_SUBMIT
public static final java.lang.String MSG_SUBMIT
- See Also:
- Constant Field Values
-
clock
protected java.time.Clock clock
-
-
Constructor Detail
-
AbstractFlinkResourceReconciler
public AbstractFlinkResourceReconciler(io.fabric8.kubernetes.client.KubernetesClient kubernetesClient, FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<CR,STATUS> statusRecorder)
-
-
Method Detail
-
reconcile
public void reconcile(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx) throws java.lang.Exception
Description copied from interface:Reconciler
This is called when receiving the create or update event of the custom resource.- Specified by:
reconcile
in interfaceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>>
- Parameters:
cr
- the custom resource that has been created or updatedctx
- the context with which the operation is executed- Throws:
java.lang.Exception
- Error during reconciliation.
-
getDeployConfig
protected abstract org.apache.flink.configuration.Configuration getDeployConfig(io.fabric8.kubernetes.api.model.ObjectMeta meta, SPEC spec, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx)
Get Flink configuration object for deploying the given spec usingdeploy(CR, SPEC, STATUS, io.javaoperatorsdk.operator.api.reconciler.Context<?>, org.apache.flink.configuration.Configuration, java.util.Optional<java.lang.String>, boolean)
.- Parameters:
meta
- ObjectMeta of the related resource.spec
- Spec for which the config should be created.ctx
- Reconciliation context.- Returns:
- Deployment configuration.
-
getObserveConfig
protected abstract org.apache.flink.configuration.Configuration getObserveConfig(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
Get Flink configuration for client interactions with the running Flink deployment/session job.- Parameters:
resource
- Related Flink resource.context
- Reconciliation context.- Returns:
- Observe configuration.
-
readyToReconcile
protected abstract boolean readyToReconcile(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration deployConfig)
Check whether the given Flink resource is ready to be reconciled or we are still waiting for any pending operation or condition first.- Parameters:
cr
- Related Flink resource.ctx
- Reconciliation context.deployConfig
- Deployment configuration.- Returns:
- True if the resource is ready to be reconciled.
-
reconcileSpecChange
protected abstract boolean reconcileSpecChange(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration observeConfig, org.apache.flink.configuration.Configuration deployConfig, DiffType diffType) throws java.lang.Exception
Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the status accordingly.- Parameters:
cr
- Related Flink resource.observeConfig
- Observe configuration.deployConfig
- Deployment configuration.- Returns:
- True if spec change reconciliation was executed
- Throws:
java.lang.Exception
- Error during spec upgrade.
-
rollback
protected abstract void rollback(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration observeConfig) throws java.lang.Exception
Rollback deployed resource to the last stable spec.- Parameters:
cr
- Related Flink resource.ctx
- Reconciliation context.observeConfig
- Observe configuration.- Throws:
java.lang.Exception
- Error during rollback.
-
reconcileOtherChanges
protected abstract boolean reconcileOtherChanges(CR cr, io.javaoperatorsdk.operator.api.reconciler.Context<?> context, org.apache.flink.configuration.Configuration observeConfig) throws java.lang.Exception
Reconcile any other changes required for this resource that are specific to the reconciler implementation.- Parameters:
cr
- Related Flink resource.observeConfig
- Observe configuration.- Returns:
- True if any further reconciliation action was taken.
- Throws:
java.lang.Exception
- Error during reconciliation.
-
cleanup
public io.javaoperatorsdk.operator.api.reconciler.DeleteControl cleanup(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
Description copied from interface:Reconciler
This is called when receiving the delete event of custom resource. This method is meant to cleanup the associated components like the Flink job components.- Specified by:
cleanup
in interfaceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>>
- Parameters:
resource
- the custom resource that has been deletedcontext
- the context with which the operation is executed- Returns:
- DeleteControl to manage the deletion behavior
-
deploy
protected abstract void deploy(CR relatedResource, SPEC spec, STATUS status, io.javaoperatorsdk.operator.api.reconciler.Context<?> ctx, org.apache.flink.configuration.Configuration deployConfig, java.util.Optional<java.lang.String> savepoint, boolean requireHaMetadata) throws java.lang.Exception
Deploys the target resource spec to Kubernetes.- Parameters:
relatedResource
- Related resource.spec
- Spec that should be deployed to Kubernetes.status
- Status object of the resourcedeployConfig
- Flink conf for the deployment.ctx
- Reconciliation context.savepoint
- Optional savepoint path for applications and session jobs.requireHaMetadata
- Flag used by application deployments to validate HA metadata- Throws:
java.lang.Exception
- Error during deployment.
-
cleanupInternal
protected abstract io.javaoperatorsdk.operator.api.reconciler.DeleteControl cleanupInternal(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
Shut down and clean up all Flink job/cluster resources.- Parameters:
resource
- Resource being reconciled.context
- Current context.- Returns:
- DeleteControl object.
-
getFlinkService
protected abstract FlinkService getFlinkService(CR resource, io.javaoperatorsdk.operator.api.reconciler.Context<?> context)
Get the Flink service related to the resource and context.- Parameters:
resource
- Resource being reconciled.context
- Current context.- Returns:
- Flink service implementation.
-
shouldRecoverDeployment
protected boolean shouldRecoverDeployment(org.apache.flink.configuration.Configuration conf, FlinkDeployment deployment)
Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated. This is triggered only if, jm deployment missing, recovery config and HA enabled. This logic is only used by the Session and Application reconcilers.- Parameters:
conf
- Flink cluster configuration.deployment
- FlinkDeployment object.- Returns:
- True if recovery should be executed.
-
setOwnerReference
protected void setOwnerReference(CR owner, org.apache.flink.configuration.Configuration deployConfig)
-
setClock
@VisibleForTesting protected void setClock(java.time.Clock clock)
-
-