Skip to content

Commit 04065a0

Browse files
[FLINK-39439] Fix savepoint history entry removed from status before dispose succeeds
1 parent cdc568c commit 04065a0

2 files changed

Lines changed: 150 additions & 9 deletions

File tree

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,14 @@ void cleanupSavepointHistoryLegacy(
368368
var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);
369369

370370
while (savepointHistory.size() > maxCount) {
371-
// remove oldest entries
372-
var sp = savepointHistory.remove(0);
373-
if (savepointCleanupEnabled) {
374-
disposeSavepointQuietly(ctx, sp.getLocation());
371+
// Remove oldest entry only after successful dispose to avoid orphaning files.
372+
// Break on failure — entries are ordered oldest-first so we cannot skip this one
373+
// and remove a newer entry instead, as that would leave the oldest orphaned.
374+
var sp = savepointHistory.get(0);
375+
if (!savepointCleanupEnabled || disposeSavepointQuietly(ctx, sp.getLocation())) {
376+
savepointHistory.remove(0);
377+
} else {
378+
break;
375379
}
376380
}
377381

@@ -382,21 +386,27 @@ void cleanupSavepointHistoryLegacy(
382386
continue;
383387
}
384388
if (sp.getTimeStamp() < maxTms) {
385-
it.remove();
386-
if (savepointCleanupEnabled) {
387-
disposeSavepointQuietly(ctx, sp.getLocation());
389+
// Remove entry only after successful dispose to avoid orphaning files.
390+
// Each entry is independent here so we continue on failure rather than breaking.
391+
if (!savepointCleanupEnabled || disposeSavepointQuietly(ctx, sp.getLocation())) {
392+
it.remove();
388393
}
389394
}
390395
}
391396
}
392397

393-
private void disposeSavepointQuietly(FlinkResourceContext<CR> ctx, String path) {
398+
private boolean disposeSavepointQuietly(FlinkResourceContext<CR> ctx, String path) {
394399
try {
395400
LOG.info("Disposing savepoint {}", path);
396401
ctx.getFlinkService().disposeSavepoint(path, ctx.getObserveConfig());
402+
return true;
397403
} catch (Exception e) {
398404
// savepoint dispose error should not affect the deployment
399-
LOG.error("Exception while disposing savepoint {}", path, e);
405+
LOG.error(
406+
"Exception while disposing savepoint {}, will retry on next reconcile",
407+
path,
408+
e);
409+
return false;
400410
}
401411
}
402412

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,137 @@ public void testDisabledDispose() {
318318
Collections.emptyList(), flinkService.getDisposedSavepoints());
319319
}
320320

321+
@Test
322+
public void testCountBasedDisposeRetainsEntryOnFailure() {
323+
var deployment = TestUtils.buildApplicationCluster();
324+
deployment
325+
.getStatus()
326+
.getReconciliationStatus()
327+
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
328+
Configuration conf = new Configuration();
329+
conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED, false);
330+
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 2);
331+
configManager.updateDefaultConfig(conf);
332+
333+
var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
334+
long futureTs = System.currentTimeMillis() * 2;
335+
var sp0 =
336+
new Savepoint(
337+
futureTs,
338+
"sp0",
339+
SnapshotTriggerType.MANUAL,
340+
SavepointFormatType.CANONICAL,
341+
0L);
342+
var sp1 =
343+
new Savepoint(
344+
futureTs + 1,
345+
"sp1",
346+
SnapshotTriggerType.MANUAL,
347+
SavepointFormatType.CANONICAL,
348+
1L);
349+
var sp2 =
350+
new Savepoint(
351+
futureTs + 2,
352+
"sp2",
353+
SnapshotTriggerType.MANUAL,
354+
SavepointFormatType.CANONICAL,
355+
2L);
356+
spInfo.updateLastSavepoint(sp0);
357+
spInfo.updateLastSavepoint(sp1);
358+
spInfo.updateLastSavepoint(sp2);
359+
360+
flinkService.setDisposeSavepointFailure(true);
361+
observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), Set.of());
362+
363+
// sp0 must still be in history because dispose failed — removing it would orphan the files
364+
assertThat(spInfo.getSavepointHistory()).containsExactly(sp0, sp1, sp2);
365+
assertThat(flinkService.getDisposedSavepoints()).isEmpty();
366+
}
367+
368+
@Test
369+
public void testAgeBasedDisposeRetainsEntryOnFailure() {
370+
var deployment = TestUtils.buildApplicationCluster();
371+
deployment
372+
.getStatus()
373+
.getReconciliationStatus()
374+
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
375+
Configuration conf = new Configuration();
376+
conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED, false);
377+
conf.set(
378+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
379+
Duration.ofMillis(5));
380+
configManager.updateDefaultConfig(conf);
381+
382+
var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
383+
var sp1 =
384+
new Savepoint(
385+
1, "sp1", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 1L);
386+
var sp2 =
387+
new Savepoint(
388+
2, "sp2", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 2L);
389+
spInfo.updateLastSavepoint(sp1);
390+
spInfo.updateLastSavepoint(sp2);
391+
392+
flinkService.setDisposeSavepointFailure(true);
393+
observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), Set.of());
394+
395+
// sp1 must still be in history because dispose failed — removing it would orphan the files
396+
assertThat(spInfo.getSavepointHistory()).containsExactly(sp1, sp2);
397+
assertThat(flinkService.getDisposedSavepoints()).isEmpty();
398+
}
399+
400+
@Test
401+
public void testDisposeRetryOnSubsequentReconcile() {
402+
var deployment = TestUtils.buildApplicationCluster();
403+
deployment
404+
.getStatus()
405+
.getReconciliationStatus()
406+
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
407+
Configuration conf = new Configuration();
408+
conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED, false);
409+
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 2);
410+
configManager.updateDefaultConfig(conf);
411+
412+
var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
413+
long futureTs = System.currentTimeMillis() * 2;
414+
var sp0 =
415+
new Savepoint(
416+
futureTs,
417+
"sp0",
418+
SnapshotTriggerType.MANUAL,
419+
SavepointFormatType.CANONICAL,
420+
0L);
421+
var sp1 =
422+
new Savepoint(
423+
futureTs + 1,
424+
"sp1",
425+
SnapshotTriggerType.MANUAL,
426+
SavepointFormatType.CANONICAL,
427+
1L);
428+
var sp2 =
429+
new Savepoint(
430+
futureTs + 2,
431+
"sp2",
432+
SnapshotTriggerType.MANUAL,
433+
SavepointFormatType.CANONICAL,
434+
2L);
435+
spInfo.updateLastSavepoint(sp0);
436+
spInfo.updateLastSavepoint(sp1);
437+
spInfo.updateLastSavepoint(sp2);
438+
439+
// First reconcile: dispose fails (job is down), entry must be retained
440+
flinkService.setDisposeSavepointFailure(true);
441+
observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), Set.of());
442+
assertThat(spInfo.getSavepointHistory()).containsExactly(sp0, sp1, sp2);
443+
assertThat(flinkService.getDisposedSavepoints()).isEmpty();
444+
445+
// Second reconcile: dispose succeeds, entry must now be removed
446+
flinkService.setDisposeSavepointFailure(false);
447+
observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), Set.of());
448+
assertThat(spInfo.getSavepointHistory()).containsExactly(sp1, sp2);
449+
assertThat(flinkService.getDisposedSavepoints()).containsExactly(sp0.getLocation());
450+
}
451+
321452
@Test
322453
public void testPeriodicSavepoint() throws Exception {
323454
var conf = new Configuration();

0 commit comments

Comments
 (0)