Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,17 @@ public enum BlueGreenDiffType {
* Full redeploy from user-specified savepoint. Triggered when savepointRedeployNonce changes.
* Uses the initialSavepointPath from spec instead of taking a new savepoint.
*/
SAVEPOINT_REDEPLOY
SAVEPOINT_REDEPLOY,

/**
* In-place suspension. Triggered when job.state changes from RUNNING to SUSPENDED. Suspends the
* currently active child without creating a new deployment.
*/
SUSPEND,

/**
* Resume from suspension. Triggered when job.state changes from SUSPENDED to RUNNING. Spins up
* the child with the current (potentially updated) spec.
*/
RESUME
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
Expand Down Expand Up @@ -110,13 +111,38 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
*/
public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {

BlueGreenDiffType specDiff = getSpecDiff(context);

if (specDiff != BlueGreenDiffType.IGNORE) {
FlinkDeployment currentFlinkDeployment =
context.getDeploymentByType(currentBlueGreenDeploymentType);

if (isFlinkDeploymentReady(currentFlinkDeployment)) {
if (specDiff == BlueGreenDiffType.SUSPEND && currentFlinkDeployment != null) {
setLastReconciledSpec(context);
LOG.info(
"In-place suspension for '{}'",
currentFlinkDeployment.getMetadata().getName());
return patchFlinkDeployment(context, currentBlueGreenDeploymentType);
}

if (specDiff == BlueGreenDiffType.RESUME && currentFlinkDeployment != null) {
setLastReconciledSpec(context);
LOG.info(
"In-place resume for '{}'", currentFlinkDeployment.getMetadata().getName());
return patchFlinkDeployment(context, currentBlueGreenDeploymentType);
}

// Check if child is currently suspended - if so, just patch specs without restart
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit of patching the spec if the deployment is suspended? A RESUME command can/will override these changes anyway, am I correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! Yes, a RESUME will reconcile all changes made during suspension. However, without this patch, the FlinkDeployment and FlinkBlueGreenDeployment will be out of sync during that entire time.

We think keeping them in sync provides a better user experience: users can inspect the child FlinkDeployment at any time and see the exact spec that will be executed, rather than having to track changes across resources. We're hoping to eventually make the FlinkBlueGreenDeployment the single source of truth, with the child always reflecting the parent's current desired state.

That said, we're open to other perspectives! If you think the extra patch call isn't worth it, we'd be happy to discuss!

if (isChildSuspended(currentFlinkDeployment)) {
setLastReconciledSpec(context);
LOG.info(
"Spec change while suspended for '{}'",
currentFlinkDeployment.getMetadata().getName());
return patchFlinkDeployment(context, currentBlueGreenDeploymentType);
}

if (currentFlinkDeployment != null && isFlinkDeploymentReady(currentFlinkDeployment)) {
if (specDiff == BlueGreenDiffType.TRANSITION) {
boolean savepointTriggered = false;
try {
Expand Down Expand Up @@ -173,12 +199,16 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
} else {
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
setLastReconciledSpec(context);
var childName =
currentFlinkDeployment != null
? currentFlinkDeployment.getMetadata().getName()
: "null";
var error =
String.format(
"Transition to %s not possible, current Flink Deployment '%s' is not READY. FAILING '%s'",
calculateTransition(currentBlueGreenDeploymentType)
.nextBlueGreenDeploymentType,
currentFlinkDeployment.getMetadata().getName(),
childName,
context.getBgDeployment().getMetadata().getName());
return markDeploymentFailing(context, error);
}
Expand All @@ -188,6 +218,16 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
return UpdateControl.noUpdate();
}

private boolean isChildSuspended(FlinkDeployment deployment) {
if (deployment == null || deployment.getSpec() == null) {
return false;
}
var job = deployment.getSpec().getJob();
return job != null
&& job.getState()
== org.apache.flink.kubernetes.operator.api.spec.JobState.SUSPENDED;
}

private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
BlueGreenContext context, BlueGreenDeploymentType blueGreenDeploymentTypeToPatch) {
return patchFlinkDeployment(context, blueGreenDeploymentTypeToPatch, true);
Expand Down Expand Up @@ -435,6 +475,16 @@ public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
TransitionState transitionState =
determineTransitionState(context, currentBlueGreenDeploymentType);

if (isChildSuspended(transitionState.nextDeployment)) {
if (transitionState.nextDeployment.getStatus().getLifecycleState()
== ResourceLifecycleState.SUSPENDED) {
return finalizeSuspendedDeployment(context, transitionState.nextState);
} else {
return shouldWeAbort(
context, transitionState.nextDeployment, transitionState.nextState);
}
}

if (isFlinkDeploymentReady(transitionState.nextDeployment)) {
return shouldWeDelete(
context,
Expand All @@ -447,11 +497,36 @@ public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
}
}

private UpdateControl<FlinkBlueGreenDeployment> finalizeSuspendedDeployment(
BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) {

LOG.info(
"Finalizing suspended deployment '{}' to {} state",
context.getDeploymentName(),
nextState);

context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setSavepointTriggerId(null);

return patchStatusUpdateControl(context, nextState, JobStatus.SUSPENDED, null)
.rescheduleAfter(0);
}

private UpdateControl<FlinkBlueGreenDeployment> handleSpecChangesDuringTransition(
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
if (hasSpecChanged(context)) {
BlueGreenDiffType diffType = getSpecDiff(context);

// Block SUSPEND during transition - wait for transition to complete first
if (diffType == BlueGreenDiffType.SUSPEND) {
LOG.info(
"Suspend requested during transition for '{}'. "
+ "Waiting for transition to complete before processing suspend.",
context.getBgDeployment().getMetadata().getName());
return null;
}

if (diffType != BlueGreenDiffType.IGNORE) {
setLastReconciledSpec(context);
var oppositeDeploymentType =
Expand Down Expand Up @@ -658,7 +733,10 @@ public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setSavepointTriggerId(null);

return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null);
// Finalize status and reschedule immediately so any pending spec changes
// (e.g., suspend requested during transition) are picked up on next reconcile
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null)
.rescheduleAfter(0);
}

// ==================== Common Utility Methods ====================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;

import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;

import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService.patchStatusUpdateControl;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.hasSpecChanged;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setLastReconciledSpec;

Expand All @@ -41,6 +43,15 @@ public InitializingBlueStateHandler(BlueGreenDeploymentService deploymentService
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
FlinkBlueGreenDeploymentStatus deploymentStatus = context.getDeploymentStatus();

// Block initial deployment if job.state is SUSPENDED - user must start with RUNNING
var jobSpec = context.getBgDeployment().getSpec().getTemplate().getSpec().getJob();
if (jobSpec != null && jobSpec.getState() == JobState.SUSPENDED) {
LOG.info(
"Blocking initial deployment '{}' - job.state is SUSPENDED, waiting for RUNNING",
context.getBgDeployment().getMetadata().getName());
return patchStatusUpdateControl(context, null, JobStatus.SUSPENDED, null);
}

// Deploy only if this is the initial deployment (no previous spec exists)
// or if we're recovering from a failure and the spec has changed since the last attempt
if (deploymentStatus.getLastReconciledSpec() == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;

import lombok.NonNull;
Expand Down Expand Up @@ -59,6 +60,19 @@ public BlueGreenDiffType compare() {
FlinkDeploymentSpec leftSpec = left.getTemplate().getSpec();
FlinkDeploymentSpec rightSpec = right.getTemplate().getSpec();

// Check for suspend/resume state changes first - these take highest precedence
// for in-place suspension handling
JobState leftJobState = getJobState(leftSpec);
JobState rightJobState = getJobState(rightSpec);

if (leftJobState != rightJobState) {
if (rightJobState == JobState.SUSPENDED) {
return BlueGreenDiffType.SUSPEND;
} else if (leftJobState == JobState.SUSPENDED && rightJobState == JobState.RUNNING) {
return BlueGreenDiffType.RESUME;
}
}

// Used in Case 2, 3 & 4: Delegate to ReflectiveDiffBuilder for nested spec comparison
// Calculate diffResult before comparison to apply in-place removal of ignored fields
DiffResult<FlinkDeploymentSpec> diffResult =
Expand Down Expand Up @@ -88,6 +102,19 @@ public BlueGreenDiffType compare() {
}
}

/**
* Gets the job state from the spec, defaulting to RUNNING if not set.
*
* @param spec the FlinkDeploymentSpec
* @return the job state, or RUNNING if job or state is null
*/
private JobState getJobState(FlinkDeploymentSpec spec) {
if (spec.getJob() == null || spec.getJob().getState() == null) {
return JobState.RUNNING;
}
return spec.getJob().getState();
}

/**
* Validates that the specs and their nested components are not null. Throws
* IllegalArgumentException if any required component is null.
Expand Down
Loading