Class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>,SPEC extends AbstractFlinkSpec,STATUS extends CommonStatus<SPEC>>
- java.lang.Object
-
- org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler<CR,SPEC,STATUS>
-
- All Implemented Interfaces:
Reconciler<CR>
- Direct Known Subclasses:
AbstractJobReconciler
,SessionReconciler
public abstract class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>,SPEC extends AbstractFlinkSpec,STATUS extends CommonStatus<SPEC>> extends java.lang.Object implements Reconciler<CR>
Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink related resources including initial deployments, upgrades, rollbacks etc.
-
-
Field Summary
Fields Modifier and Type Field Description protected JobAutoScaler<io.javaoperatorsdk.operator.processing.event.ResourceID,KubernetesJobAutoScalerContext>
autoscaler
protected java.time.Clock
clock
protected EventRecorder
eventRecorder
static java.lang.String
MSG_ROLLBACK
static java.lang.String
MSG_SPEC_CHANGED
static java.lang.String
MSG_SUBMIT
static java.lang.String
MSG_SUSPENDED
protected StatusRecorder<CR,STATUS>
statusRecorder
-
Constructor Summary
Constructors Constructor Description AbstractFlinkResourceReconciler(EventRecorder eventRecorder, StatusRecorder<CR,STATUS> statusRecorder, JobAutoScaler<io.javaoperatorsdk.operator.processing.event.ResourceID,KubernetesJobAutoScalerContext> autoscaler)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description io.javaoperatorsdk.operator.api.reconciler.DeleteControl
cleanup(FlinkResourceContext<CR> ctx)
This is called when receiving the delete event of custom resource.protected abstract io.javaoperatorsdk.operator.api.reconciler.DeleteControl
cleanupInternal(FlinkResourceContext<CR> ctx)
Shut down and clean up all Flink job/cluster resources.abstract void
deploy(FlinkResourceContext<CR> ctx, SPEC spec, org.apache.flink.configuration.Configuration deployConfig, java.util.Optional<java.lang.String> savepoint, boolean requireHaMetadata)
Deploys the target resource spec to Kubernetes.protected boolean
flinkVersionChanged(SPEC oldSpec, SPEC newSpec)
protected abstract boolean
readyToReconcile(FlinkResourceContext<CR> ctx)
Check whether the given Flink resource is ready to be reconciled or we are still waiting for any pending operation or condition first.void
reconcile(FlinkResourceContext<CR> ctx)
This is called when receiving the create or update event of the custom resource.protected abstract boolean
reconcileOtherChanges(FlinkResourceContext<CR> ctx)
Reconcile any other changes required for this resource that are specific to the reconciler implementation.protected abstract boolean
reconcileSpecChange(DiffType diffType, FlinkResourceContext<CR> ctx, org.apache.flink.configuration.Configuration deployConfig, SPEC lastReconciledSpec)
Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the status accordingly.void
setClock(java.time.Clock clock)
protected void
setOwnerReference(CR owner, org.apache.flink.configuration.Configuration deployConfig)
protected boolean
shouldRecoverDeployment(org.apache.flink.configuration.Configuration conf, FlinkDeployment deployment)
Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated.
-
-
-
Field Detail
-
eventRecorder
protected final EventRecorder eventRecorder
-
statusRecorder
protected final StatusRecorder<CR extends AbstractFlinkResource<SPEC,STATUS>,STATUS extends CommonStatus<SPEC>> statusRecorder
-
autoscaler
protected final JobAutoScaler<io.javaoperatorsdk.operator.processing.event.ResourceID,KubernetesJobAutoScalerContext> autoscaler
-
MSG_SUSPENDED
public static final java.lang.String MSG_SUSPENDED
- See Also:
- Constant Field Values
-
MSG_SPEC_CHANGED
public static final java.lang.String MSG_SPEC_CHANGED
- See Also:
- Constant Field Values
-
MSG_ROLLBACK
public static final java.lang.String MSG_ROLLBACK
- See Also:
- Constant Field Values
-
MSG_SUBMIT
public static final java.lang.String MSG_SUBMIT
- See Also:
- Constant Field Values
-
clock
protected java.time.Clock clock
-
-
Constructor Detail
-
AbstractFlinkResourceReconciler
public AbstractFlinkResourceReconciler(EventRecorder eventRecorder, StatusRecorder<CR,STATUS> statusRecorder, JobAutoScaler<io.javaoperatorsdk.operator.processing.event.ResourceID,KubernetesJobAutoScalerContext> autoscaler)
-
-
Method Detail
-
reconcile
public void reconcile(FlinkResourceContext<CR> ctx) throws java.lang.Exception
Description copied from interface:Reconciler
This is called when receiving the create or update event of the custom resource.- Specified by:
reconcile
in interfaceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>>
- Parameters:
ctx
- the context with which the operation is executed- Throws:
java.lang.Exception
- Error during reconciliation.
-
readyToReconcile
protected abstract boolean readyToReconcile(FlinkResourceContext<CR> ctx)
Check whether the given Flink resource is ready to be reconciled or we are still waiting for any pending operation or condition first.- Parameters:
ctx
- Reconciliation context.- Returns:
- True if the resource is ready to be reconciled.
-
reconcileSpecChange
protected abstract boolean reconcileSpecChange(DiffType diffType, FlinkResourceContext<CR> ctx, org.apache.flink.configuration.Configuration deployConfig, SPEC lastReconciledSpec) throws java.lang.Exception
Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the status accordingly.- Parameters:
diffType
- SpecChange diff type.ctx
- Reconciliation context.deployConfig
- Deployment configuration.lastReconciledSpec
- Last reconciled spec- Returns:
- True if spec change reconciliation was executed
- Throws:
java.lang.Exception
- Error during spec upgrade.
-
reconcileOtherChanges
protected abstract boolean reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws java.lang.Exception
Reconcile any other changes required for this resource that are specific to the reconciler implementation.- Parameters:
ctx
- Reconciliation context.- Returns:
- True if any further reconciliation action was taken.
- Throws:
java.lang.Exception
- Error during reconciliation.
-
cleanup
public io.javaoperatorsdk.operator.api.reconciler.DeleteControl cleanup(FlinkResourceContext<CR> ctx)
Description copied from interface:Reconciler
This is called when receiving the delete event of custom resource. This method is meant to cleanup the associated components like the Flink job components.- Specified by:
cleanup
in interfaceReconciler<CR extends AbstractFlinkResource<SPEC,STATUS>>
- Parameters:
ctx
- the context with which the operation is executed- Returns:
- DeleteControl to manage the deletion behavior
-
deploy
@VisibleForTesting public abstract void deploy(FlinkResourceContext<CR> ctx, SPEC spec, org.apache.flink.configuration.Configuration deployConfig, java.util.Optional<java.lang.String> savepoint, boolean requireHaMetadata) throws java.lang.Exception
Deploys the target resource spec to Kubernetes.- Parameters:
ctx
- Reconciliation context.spec
- Spec that should be deployed to Kubernetes.deployConfig
- Flink conf for the deployment.savepoint
- Optional savepoint path for applications and session jobs.requireHaMetadata
- Flag used by application deployments to validate HA metadata- Throws:
java.lang.Exception
- Error during deployment.
-
cleanupInternal
protected abstract io.javaoperatorsdk.operator.api.reconciler.DeleteControl cleanupInternal(FlinkResourceContext<CR> ctx)
Shut down and clean up all Flink job/cluster resources.- Parameters:
ctx
- Current context.- Returns:
- DeleteControl object.
-
shouldRecoverDeployment
protected boolean shouldRecoverDeployment(org.apache.flink.configuration.Configuration conf, FlinkDeployment deployment)
Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated. This is triggered only if, jm deployment missing, recovery config and HA enabled. This logic is only used by the Session and Application reconcilers.- Parameters:
conf
- Flink cluster configuration.deployment
- FlinkDeployment object.- Returns:
- True if recovery should be executed.
-
setOwnerReference
protected void setOwnerReference(CR owner, org.apache.flink.configuration.Configuration deployConfig)
-
setClock
@VisibleForTesting public void setClock(java.time.Clock clock)
-
-