Class ReconciliationUtils
- java.lang.Object
-
- org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils
-
public class ReconciliationUtils extends java.lang.Object
Reconciliation utilities.
-
-
Constructor Summary
Constructors Constructor Description ReconciliationUtils()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <SPEC extends AbstractFlinkSpec>
booleanapplyValidationErrorAndResetSpec(AbstractFlinkResource<SPEC,?> deployment, java.lang.String validationError, FlinkOperatorConfiguration conf)
In case of validation errors we need to (temporarily) reset the old spec so that we can reconcile other outstanding changes, instead of simply blocking.static void
checkAndUpdateStableSpec(CommonStatus<?> status)
Checks the status and if the corresponding Flink job/application is in stable running state, it updates the last stable spec.static void
clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?,?> resource)
Clear last reconciled spec if that corresponds to the first deployment.static <T> T
clone(T object)
static <SPEC extends AbstractFlinkSpec>
SPECgetDeployedSpec(AbstractFlinkResource<SPEC,?> deployment)
static java.lang.Long
getUpgradeTargetGeneration(AbstractFlinkResource<?,?> resource)
Get spec generation for the current in progress upgrade.static boolean
isJobInTerminalState(CommonStatus<?> status)
static boolean
isJobRunning(CommonStatus<?> status)
static boolean
isUpgradeModeChangedToLastStateAndHADisabledPreviously(AbstractFlinkResource<?,?> flinkApp, org.apache.flink.configuration.Configuration observeConfig)
static java.time.Duration
rescheduleAfter(JobManagerDeploymentStatus status, FlinkDeployment flinkDeployment, FlinkOperatorConfiguration operatorConfiguration)
static <STATUS extends CommonStatus<?>,R extends AbstractFlinkResource<?,STATUS>>
io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl<R>toErrorStatusUpdateControl(R resource, java.util.Optional<io.javaoperatorsdk.operator.api.reconciler.RetryInfo> retryInfo, java.lang.Exception e, StatusRecorder<R,STATUS> statusRecorder, FlinkOperatorConfiguration operatorConfiguration)
Update the resource error status and metrics when the operator encountered an exception during reconciliation.static <SPEC extends AbstractFlinkSpec,STATUS extends CommonStatus<SPEC>,R extends io.fabric8.kubernetes.client.CustomResource<SPEC,STATUS>>
io.javaoperatorsdk.operator.api.reconciler.UpdateControl<R>toUpdateControl(FlinkOperatorConfiguration operatorConfiguration, R current, R previous, boolean reschedule)
static void
updateForReconciliationError(AbstractFlinkResource<?,?> target, java.lang.Throwable error, FlinkOperatorConfiguration conf)
static <SPEC extends AbstractFlinkSpec>
voidupdateLastReconciledSavepointTriggerNonce(SavepointInfo savepointInfo, AbstractFlinkResource<SPEC,?> target)
static <SPEC extends AbstractFlinkSpec>
voidupdateStatusBeforeDeploymentAttempt(AbstractFlinkResource<SPEC,?> target, org.apache.flink.configuration.Configuration conf)
Update status before deployment attempt of a new resource spec.static <SPEC extends AbstractFlinkSpec>
voidupdateStatusForAlreadyUpgraded(AbstractFlinkResource<SPEC,?> resource)
Updates status in cases where a previously successful deployment wasn't recorded for any reason.static <SPEC extends AbstractFlinkSpec>
voidupdateStatusForDeployedSpec(AbstractFlinkResource<SPEC,?> target, org.apache.flink.configuration.Configuration conf)
Update status after successful deployment of a new resource spec.
-
-
-
Method Detail
-
updateStatusForDeployedSpec
public static <SPEC extends AbstractFlinkSpec> void updateStatusForDeployedSpec(AbstractFlinkResource<SPEC,?> target, org.apache.flink.configuration.Configuration conf)
Update status after successful deployment of a new resource spec. Existing reconciliation errors will be cleared, lastReconciled spec will be updated and for suspended jobs it will also be marked stable.For Application deployments TaskManager info will also be updated.
- Type Parameters:
SPEC
- Spec type.- Parameters:
target
- Target Flink resource.conf
- Deployment configuration.
-
updateStatusBeforeDeploymentAttempt
public static <SPEC extends AbstractFlinkSpec> void updateStatusBeforeDeploymentAttempt(AbstractFlinkResource<SPEC,?> target, org.apache.flink.configuration.Configuration conf)
Update status before deployment attempt of a new resource spec. Existing reconciliation errors will be cleared, lastReconciled spec will be updated and reconciliation status marked UPGRADING.For Application deployments TaskManager info will also be updated.
- Type Parameters:
SPEC
- Spec type.- Parameters:
target
- Target Flink resource.conf
- Deployment configuration.
-
updateLastReconciledSavepointTriggerNonce
public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSavepointTriggerNonce(SavepointInfo savepointInfo, AbstractFlinkResource<SPEC,?> target)
-
updateForReconciliationError
public static void updateForReconciliationError(AbstractFlinkResource<?,?> target, java.lang.Throwable error, FlinkOperatorConfiguration conf)
-
clone
public static <T> T clone(T object)
-
toUpdateControl
public static <SPEC extends AbstractFlinkSpec,STATUS extends CommonStatus<SPEC>,R extends io.fabric8.kubernetes.client.CustomResource<SPEC,STATUS>> io.javaoperatorsdk.operator.api.reconciler.UpdateControl<R> toUpdateControl(FlinkOperatorConfiguration operatorConfiguration, R current, R previous, boolean reschedule)
-
rescheduleAfter
public static java.time.Duration rescheduleAfter(JobManagerDeploymentStatus status, FlinkDeployment flinkDeployment, FlinkOperatorConfiguration operatorConfiguration)
-
isUpgradeModeChangedToLastStateAndHADisabledPreviously
public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously(AbstractFlinkResource<?,?> flinkApp, org.apache.flink.configuration.Configuration observeConfig)
-
getDeployedSpec
public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(AbstractFlinkResource<SPEC,?> deployment)
-
isJobInTerminalState
public static boolean isJobInTerminalState(CommonStatus<?> status)
-
isJobRunning
public static boolean isJobRunning(CommonStatus<?> status)
-
applyValidationErrorAndResetSpec
public static <SPEC extends AbstractFlinkSpec> boolean applyValidationErrorAndResetSpec(AbstractFlinkResource<SPEC,?> deployment, java.lang.String validationError, FlinkOperatorConfiguration conf)
In case of validation errors we need to (temporarily) reset the old spec so that we can reconcile other outstanding changes, instead of simply blocking.This is only possible if we have a previously reconciled spec.
For in-flight application upgrades we need extra logic to set the desired job state to running
- Type Parameters:
SPEC
- Spec type.- Parameters:
deployment
- The current deployment to be reconciledvalidationError
- Validation error encountered for the current spec- Returns:
- True if the spec was reset and reconciliation can continue. False if nothing to reconcile.
-
toErrorStatusUpdateControl
public static <STATUS extends CommonStatus<?>,R extends AbstractFlinkResource<?,STATUS>> io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl<R> toErrorStatusUpdateControl(R resource, java.util.Optional<io.javaoperatorsdk.operator.api.reconciler.RetryInfo> retryInfo, java.lang.Exception e, StatusRecorder<R,STATUS> statusRecorder, FlinkOperatorConfiguration operatorConfiguration)
Update the resource error status and metrics when the operator encountered an exception during reconciliation.- Type Parameters:
STATUS
- Status type.R
- Resource type.- Parameters:
resource
- Flink Resource to be updatedretryInfo
- Current RetryInformatione
- Exception that caused the retrystatusRecorder
- statusRecorder object for patching statusoperatorConfiguration
-- Returns:
- This always returns Empty optional currently, due to the status update logic
-
getUpgradeTargetGeneration
public static java.lang.Long getUpgradeTargetGeneration(AbstractFlinkResource<?,?> resource)
Get spec generation for the current in progress upgrade.- Parameters:
resource
- Flink resource.- Returns:
- The spec generation for the upgrade.
-
clearLastReconciledSpecIfFirstDeploy
public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?,?> resource)
Clear last reconciled spec if that corresponds to the first deployment. This is important in cases where the first deployment fails.- Parameters:
resource
- Flink resource.
-
checkAndUpdateStableSpec
public static void checkAndUpdateStableSpec(CommonStatus<?> status)
Checks the status and if the corresponding Flink job/application is in stable running state, it updates the last stable spec.- Parameters:
status
- Status to be updated.
-
updateStatusForAlreadyUpgraded
public static <SPEC extends AbstractFlinkSpec> void updateStatusForAlreadyUpgraded(AbstractFlinkResource<SPEC,?> resource)
Updates status in cases where a previously successful deployment wasn't recorded for any reason. We simply change the job status from SUSPENDED to RUNNING and ReconciliationState to DEPLOYED while keeping the metadata.- Type Parameters:
SPEC
- Spec type.- Parameters:
resource
- Flink resource to be updated.
-
-