Skip to content

Commit ff954a6

Browse files
committed
Sync child status.upgradeSavepointPath on transition deploy
AbstractJobReconciler.restoreJob() reads status.jobStatus.upgradeSavepointPath, not spec.initialSavepointPath, in the terminal+no-HA upgrade branch. When a suspended child carries a stale value there, deployCluster's createOrReplace on the next transition does not touch the status subresource, so the stale value silently shadows the fresh path FBGD writes and the JM restores from an arbitrarily old savepoint. After deployCluster, initiateDeployment now re-fetches the child and patches its status.upgradeSavepointPath to match the fresh spec.initialSavepointPath via a new BlueGreenKubernetesService.syncUpgradeSavepointPath helper that writes the /status subresource. Skipped for first deployments and when no fresh savepoint was prepared. Adds verifyFreshSavepointSyncedToChildStatusOnRetry: pollutes a suspended GREEN's status.upgradeSavepointPath with a stale value, triggers a second transition with a fresh savepoint, asserts the fresh value is mirrored into status and the stale value is overwritten.
1 parent fd36457 commit ff954a6

3 files changed

Lines changed: 148 additions & 0 deletions

File tree

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deployCluster;
4848
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.isFlinkDeploymentReady;
4949
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.suspendFlinkDeployment;
50+
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.syncUpgradeSavepointPath;
5051
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.fetchSavepointInfo;
5152
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval;
5253
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getSpecDiff;
@@ -94,6 +95,28 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
9495

9596
deployCluster(context, flinkDeployment);
9697

98+
// Mirror the fresh initialSavepointPath into the child's status.upgradeSavepointPath.
99+
// AbstractJobReconciler.restoreJob() reads from status, not spec, in the terminal+no-HA
100+
// upgrade branch; if a prior failed transition left a stale value there, deployCluster's
101+
// createOrReplace above does not overwrite it (status is a subresource) and the operator
102+
// would silently restore from the stale savepoint. Re-fetch is required: deployCluster
103+
// bumps resourceVersion, so the context's snapshot would 409 on updateStatus.
104+
String freshSavepointPath = flinkDeployment.getSpec().getJob().getInitialSavepointPath();
105+
if (!isFirstDeployment && freshSavepointPath != null && !freshSavepointPath.isEmpty()) {
106+
FlinkDeployment reloadedChild =
107+
context.getJosdkContext()
108+
.getClient()
109+
.resources(FlinkDeployment.class)
110+
.inNamespace(flinkDeployment.getMetadata().getNamespace())
111+
.withName(flinkDeployment.getMetadata().getName())
112+
.get();
113+
if (reloadedChild != null
114+
&& reloadedChild.getStatus() != null
115+
&& reloadedChild.getStatus().getJobStatus() != null) {
116+
syncUpgradeSavepointPath(context, reloadedChild, freshSavepointPath);
117+
}
118+
}
119+
97120
BlueGreenUtils.setAbortTimestamp(context);
98121

99122
return patchStatusUpdateControl(context, nextState, JobStatus.RECONCILING, null)

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,23 @@ public static void deployCluster(BlueGreenContext context, FlinkDeployment flink
5959
context.getJosdkContext().getClient().resource(flinkDeployment).createOrReplace();
6060
}
6161

62+
/**
63+
* Patches status.upgradeSavepointPath on the given child FlinkDeployment. Used by the B/G
64+
* controller to keep status in sync with a freshly written spec.initialSavepointPath, since
65+
* AbstractJobReconciler.restoreJob() reads from status (not spec) in the terminal+no-HA upgrade
66+
* path.
67+
*/
68+
public static void syncUpgradeSavepointPath(
69+
BlueGreenContext context, FlinkDeployment flinkDeployment, String savepointPath) {
70+
String namespace = flinkDeployment.getMetadata().getNamespace();
71+
flinkDeployment.getStatus().getJobStatus().setUpgradeSavepointPath(savepointPath);
72+
context.getJosdkContext()
73+
.getClient()
74+
.resource(flinkDeployment)
75+
.inNamespace(namespace)
76+
.updateStatus();
77+
}
78+
6279
/**
6380
* Checks if a FlinkDeployment is ready (STABLE lifecycle state and RUNNING job status).
6481
*

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,114 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
328328
testTransitionToGreen(rs, customValue, null);
329329
}
330330

331+
/**
332+
* A suspended child left over from a prior failed transition can carry a stale
333+
* status.upgradeSavepointPath, which AbstractJobReconciler.restoreJob() reads (instead of
334+
* spec.initialSavepointPath) in the terminal+no-HA upgrade branch — silently shadowing the
335+
* fresh path FBGD writes on the next transition.
336+
*
337+
* <p>Asserts that initiateDeployment mirrors the freshly set spec.initialSavepointPath onto the
338+
* existing child's status.upgradeSavepointPath so the FD reconciler picks up the new savepoint.
339+
*/
340+
@ParameterizedTest
341+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
342+
public void verifyFreshSavepointSyncedToChildStatusOnRetry(FlinkVersion flinkVersion)
343+
throws Exception {
344+
var blueGreenDeployment =
345+
buildSessionCluster(
346+
TEST_DEPLOYMENT_NAME,
347+
TEST_NAMESPACE,
348+
flinkVersion,
349+
null,
350+
UpgradeMode.SAVEPOINT);
351+
352+
var abortGracePeriodMs = 1200;
353+
var reschedulingIntervalMs = 3000;
354+
blueGreenDeployment
355+
.getSpec()
356+
.getConfiguration()
357+
.put(ABORT_GRACE_PERIOD.key(), String.valueOf(abortGracePeriodMs));
358+
blueGreenDeployment
359+
.getSpec()
360+
.getConfiguration()
361+
.put(
362+
RECONCILIATION_RESCHEDULING_INTERVAL.key(),
363+
String.valueOf(reschedulingIntervalMs));
364+
365+
// 1. Initial deploy → ACTIVE_BLUE
366+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null);
367+
368+
// 2. Spec change → savepoint → start transition to GREEN
369+
simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0, null);
370+
rs = handleSavepoint(rs);
371+
rs = reconcile(rs.deployment);
372+
assertEquals(
373+
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
374+
rs.reconciledStatus.getBlueGreenState());
375+
376+
// 3. GREEN never becomes ready → abort → ACTIVE_BLUE + GREEN SUSPENDED
377+
Long reschedDelayMs = 0L;
378+
for (int i = 0; i < 2; i++) {
379+
rs = reconcile(rs.deployment);
380+
reschedDelayMs = rs.updateControl.getScheduleDelay().get();
381+
}
382+
Thread.sleep(reschedDelayMs);
383+
rs = reconcile(rs.deployment);
384+
assertFailingJobStatus(rs);
385+
assertEquals(
386+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState());
387+
388+
// 4. Pollute SUSPENDED GREEN's status with a stale upgradeSavepointPath
389+
// (simulates an older successful savepoint left in status from a prior lifecycle)
390+
var suspendedGreen =
391+
getFlinkDeployments().stream()
392+
.filter(d -> JobState.SUSPENDED.equals(d.getSpec().getJob().getState()))
393+
.findFirst()
394+
.orElseThrow(() -> new AssertionError("No SUSPENDED deployment found"));
395+
String stalePath = "s3://stale-from-prior-failed-deploy";
396+
suspendedGreen.getStatus().getJobStatus().setUpgradeSavepointPath(stalePath);
397+
kubernetesClient.resource(suspendedGreen).update();
398+
assertEquals(
399+
stalePath,
400+
reloadFlinkDeployment(suspendedGreen)
401+
.getStatus()
402+
.getJobStatus()
403+
.getUpgradeSavepointPath(),
404+
"precondition: stale value was persisted to GREEN status");
405+
406+
// 5. Spec change → fresh savepoint → start second transition
407+
simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0, null);
408+
rs = handleSavepoint(rs);
409+
rs = reconcile(rs.deployment);
410+
assertEquals(
411+
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
412+
rs.reconciledStatus.getBlueGreenState());
413+
414+
// 6. GREEN's status.upgradeSavepointPath must now mirror the fresh
415+
// spec.initialSavepointPath written by initiateDeployment, NOT the stale value.
416+
var freshGreen = reloadFlinkDeployment(suspendedGreen);
417+
String freshSpecPath = freshGreen.getSpec().getJob().getInitialSavepointPath();
418+
String freshStatusPath = freshGreen.getStatus().getJobStatus().getUpgradeSavepointPath();
419+
assertNotNull(freshSpecPath, "transition should have set a fresh initialSavepointPath");
420+
assertNotEquals(
421+
stalePath,
422+
freshStatusPath,
423+
"stale status.upgradeSavepointPath must be overwritten by initiateDeployment");
424+
assertEquals(
425+
freshSpecPath,
426+
freshStatusPath,
427+
"status.upgradeSavepointPath must mirror spec.initialSavepointPath so "
428+
+ "AbstractJobReconciler.restoreJob() picks up the fresh savepoint");
429+
}
430+
431+
private FlinkDeployment reloadFlinkDeployment(FlinkDeployment deployment) {
432+
return kubernetesClient
433+
.resources(FlinkDeployment.class)
434+
.inNamespace(deployment.getMetadata().getNamespace())
435+
.withName(deployment.getMetadata().getName())
436+
.get();
437+
}
438+
331439
private static String getFlinkConfigurationValue(
332440
FlinkDeploymentSpec flinkDeploymentSpec, String propertyName) {
333441
return flinkDeploymentSpec.getFlinkConfiguration().get(propertyName).asText();

0 commit comments

Comments
 (0)