Skip to content

Commit 5a448da

Browse files
committed
[FLINK-39270] Fix savepoint/last-state upgrade state loss under slow JM
1 parent 0a4d841 commit 5a448da

5 files changed

Lines changed: 101 additions & 5 deletions

File tree

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,7 @@ public boolean isJobCancellable() {
133133
}
134134
return RECONCILING != jobState && !jobState.isGloballyTerminalState();
135135
}
136+
137+
@JsonIgnore
138+
public abstract boolean isJmAccessible();
136139
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,10 @@ public List<Condition> setRunningConditionIfRequired() {
107107
setConditions(newConditions);
108108
return newConditions;
109109
}
110+
111+
@JsonIgnore
112+
@Override
113+
public boolean isJmAccessible() {
114+
return jobManagerDeploymentStatus.isRestApiAvailable();
115+
}
110116
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobStatus.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.annotation.Experimental;
2121
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
2222

23+
import com.fasterxml.jackson.annotation.JsonIgnore;
2324
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2425
import lombok.AllArgsConstructor;
2526
import lombok.Data;
@@ -42,4 +43,10 @@ public class FlinkSessionJobStatus extends CommonStatus<FlinkSessionJobSpec> {
4243
/** Status of the last reconcile operation. */
4344
private FlinkSessionJobReconciliationStatus reconciliationStatus =
4445
new FlinkSessionJobReconciliationStatus();
46+
47+
@Override
48+
@JsonIgnore
49+
public boolean isJmAccessible() {
50+
return true;
51+
}
4552
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,16 +216,13 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration d
216216
var resource = ctx.getResource();
217217
var status = resource.getStatus();
218218
var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
219-
boolean terminal = ReconciliationUtils.isJobInTerminalState(status);
220219

221220
if (upgradeMode == UpgradeMode.STATELESS) {
222221
LOG.info("Stateless job, ready for upgrade");
223-
return JobUpgrade.stateless(terminal);
222+
return JobUpgrade.stateless(ReconciliationUtils.isJobInTerminalState(status));
224223
}
225224

226-
var flinkService = ctx.getFlinkService();
227-
if (ReconciliationUtils.isJobCancelled(status)
228-
|| (terminal && !flinkService.isHaMetadataAvailable(ctx.getObserveConfig()))) {
225+
if (useStatusSavepoint(ctx, status)) {
229226

230227
if (!SnapshotUtils.lastSavepointKnown(status)) {
231228
throw new UpgradeFailureException(
@@ -303,6 +300,38 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration d
303300
return JobUpgrade.unavailable();
304301
}
305302

303+
/**
304+
* Core logic to determine whether the savepoint recorded in the status is safe to use for
305+
* upgrades: no other savepoint/checkpoint may be in progress and what's there is the latest.
306+
*/
307+
@VisibleForTesting
308+
static boolean useStatusSavepoint(FlinkResourceContext<?> ctx, CommonStatus<?> status) {
309+
310+
// Only terminal job state is fixed/guaranteed
311+
if (!ReconciliationUtils.isJobInTerminalState(status)) {
312+
return false;
313+
}
314+
315+
// Observed to be fully canceled with state information
316+
if (ReconciliationUtils.isJobCancelled(status)) {
317+
return true;
318+
}
319+
320+
var service = ctx.getFlinkService();
321+
var conf = ctx.getObserveConfig();
322+
323+
// At this point our job is in terminal state (failed or stopped). Normally the jobmanager
324+
// removes any HA state that would affect failover/restarts but this is not fully guaranteed
325+
// (the JM may be slow due to CPU or memory pressure).
326+
// We accept the recorded savepoint in cases where the HA metadata doesn't exist (it was a
327+
// clean job shutdown) or the JM is still accessible which guarantees that the observer
328+
// updated the savepoint information.
329+
// This still guards against cases where the job + JM fails after restart in a way that the
330+
// operator cannot access the new state (in that case HA metadata would be available and JM
331+
// inaccessible)
332+
return !service.isHaMetadataAvailable(conf) || status.isJmAccessible();
333+
}
334+
306335
@VisibleForTesting
307336
protected JobUpgrade getUpgradeModeBasedOnStateAge(
308337
FlinkResourceContext<CR> ctx, Configuration deployConfig, boolean cancellable)

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,57 @@ void testUpgradeUsesLatestSnapshot(boolean useLegacyFields) throws Exception {
313313
assertEquals(savepointPath, flinkService.listJobs().get(0).f0);
314314
}
315315

316+
@Test
317+
public void testTerminalJobWithHaMetaAvailable() throws Exception {
318+
// Bootstrap a running deployment with LAST_STATE upgrade mode (HA config is required)
319+
var deployment = buildApplicationCluster(FlinkVersion.v1_19, UpgradeMode.LAST_STATE);
320+
321+
reconciler.reconcile(deployment, context);
322+
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
323+
324+
// Simulate: job reached a terminal state while JM is still READY and HA metadata is
325+
// available (e.g. job manager cannot delete HA metadata promptly, such as under memory or
326+
// cpu pressure)
327+
flinkService.clear();
328+
flinkService.setHaDataAvailable(true);
329+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
330+
deployment
331+
.getStatus()
332+
.getJobStatus()
333+
.setState(org.apache.flink.api.common.JobStatus.FINISHED);
334+
335+
// Happy path: a valid savepoint path is recorded in status
336+
// Expectation: redeploy from that recorded savepoint
337+
final String recordedSavepointPath = "ha_recorded_savepoint";
338+
deployment.getStatus().getJobStatus().setUpgradeSavepointPath(recordedSavepointPath);
339+
deployment.getSpec().setRestartNonce(100L);
340+
341+
reconciler.reconcile(deployment, context);
342+
reconciler.reconcile(deployment, context);
343+
344+
assertEquals(1, flinkService.getRunningCount());
345+
assertEquals(recordedSavepointPath, flinkService.listJobs().get(0).f0);
346+
347+
// Error path: only the LAST_STATE dummy marker is in the status, meaning the job was
348+
// previously upgraded via HA last-state without recording an actual savepoint path.
349+
// When the job is now terminal with JM accessible, there is nothing to restore from.
350+
flinkService.clear();
351+
deployment.getSpec().setRestartNonce(200L);
352+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
353+
deployment
354+
.getStatus()
355+
.getJobStatus()
356+
.setState(org.apache.flink.api.common.JobStatus.FINISHED);
357+
deployment
358+
.getStatus()
359+
.getJobStatus()
360+
.setUpgradeSavepointPath(ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH);
361+
362+
assertThatThrownBy(() -> reconciler.reconcile(deployment, context))
363+
.isInstanceOf(UpgradeFailureException.class)
364+
.hasMessageContaining("Job is in terminal state but last checkpoint is unknown");
365+
}
366+
316367
private FlinkDeployment cloneDeploymentWithUpgradeMode(
317368
FlinkDeployment deployment, UpgradeMode upgradeMode) {
318369
FlinkDeployment result = ReconciliationUtils.clone(deployment);

0 commit comments

Comments
 (0)