From e42fbbaa6e4279e4f7a10d482e2dca74b92d33a3 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Wed, 8 Apr 2026 15:22:58 +0530 Subject: [PATCH 1/3] Add test proving backoffLimit=-1 (unlimited retries) bug in FlinkStateSnapshot The default backoffLimit of -1 (documented as "unlimited retries") causes snapshots to immediately fail after the first error. The check `failures > backoffLimit` evaluates to `1 > -1` which is always true, so no retries ever happen with the default configuration. This test is expected to fail until the bug is fixed. --- .../FlinkStateSnapshotControllerTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java index ce36f42680..25fed1718b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java @@ -154,6 +154,25 @@ public void testReconcileBackoff(int backoffLimit) { assertThat(snapshot.getStatus().getState()).isEqualTo(FAILED); } + @Test + public void testReconcileBackoffUnlimited() { + var deployment = createDeployment(); + context = TestUtils.createSnapshotContext(client, deployment); + // Default backoffLimit is -1, meaning unlimited retries + var snapshot = createSavepoint(deployment, false, -1); + snapshot.setStatus(new FlinkStateSnapshotStatus()); + + flinkService.setTriggerSavepointFailure(true); + + // With unlimited retries, the snapshot should never transition to FAILED + for (int i = 0; i < 10; i++) { + controller.updateErrorStatus(snapshot, context, new Exception()); + assertThat(snapshot.getStatus().getState()) + .as("Snapshot with backoffLimit=-1 should retry indefinitely, but failed after attempt %d", i + 1) + .isEqualTo(TRIGGER_PENDING); + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testReconcileSavepointAlreadyExists(boolean jobReferenced) { From 2445919ea67382400fc9fcf3f2a0e78e694daa59 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Wed, 8 Apr 2026 17:21:58 +0530 Subject: [PATCH 2/3] Fixes spotless failures --- .../operator/controller/FlinkStateSnapshotControllerTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java index 25fed1718b..c23478ad41 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java @@ -168,7 +168,9 @@ public void testReconcileBackoffUnlimited() { for (int i = 0; i < 10; i++) { controller.updateErrorStatus(snapshot, context, new Exception()); assertThat(snapshot.getStatus().getState()) - .as("Snapshot with backoffLimit=-1 should retry indefinitely, but failed after attempt %d", i + 1) + .as( + "Snapshot with backoffLimit=-1 should retry indefinitely, but failed after attempt %d", + i + 1) .isEqualTo(TRIGGER_PENDING); } } From 91654f37c2a27b5ee011291fd400d6684ea12553 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Mon, 22 Jun 2026 14:46:49 +0530 Subject: [PATCH 3/3] Fix backoffLimit=-1 to mean unlimited retries in FlinkStateSnapshot The spec documents backoffLimit=-1 (the default) as "unlimited retries", but the controller compared `failures > backoffLimit` without handling the -1 sentinel. After the first failure (failures=1), `1 > -1` was true, so the snapshot was marked FAILED with withNoRetry() -- the opposite of unlimited. Guard the no-retry branch with `backoffLimit >= 0` so: -1 -> retries indefinitely (default) 0 -> no retries N -> retries up to N times Co-Authored-By: Claude Opus 4.8 (1M context) --- .../operator/controller/FlinkStateSnapshotController.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java index 8c36dc0efd..97e9075d02 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java @@ -128,7 +128,8 @@ public ErrorStatusUpdateControl updateErrorStatus( resource.getStatus().getError(), ctx.getKubernetesClient()); - if (resource.getStatus().getFailures() > resource.getSpec().getBackoffLimit()) { + var backoffLimit = resource.getSpec().getBackoffLimit(); + if (backoffLimit >= 0 && resource.getStatus().getFailures() > backoffLimit) { LOG.info( "Snapshot {} failed and won't be retried as failure count exceeded the backoff limit", resource.getMetadata().getName());