diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java index 8c36dc0efd..97e9075d02 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java @@ -128,7 +128,8 @@ public ErrorStatusUpdateControl updateErrorStatus( resource.getStatus().getError(), ctx.getKubernetesClient()); - if (resource.getStatus().getFailures() > resource.getSpec().getBackoffLimit()) { + var backoffLimit = resource.getSpec().getBackoffLimit(); + if (backoffLimit >= 0 && resource.getStatus().getFailures() > backoffLimit) { LOG.info( "Snapshot {} failed and won't be retried as failure count exceeded the backoff limit", resource.getMetadata().getName()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java index ce36f42680..c23478ad41 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java @@ -154,6 +154,27 @@ public void testReconcileBackoff(int backoffLimit) { assertThat(snapshot.getStatus().getState()).isEqualTo(FAILED); } + @Test + public void testReconcileBackoffUnlimited() { + var deployment = createDeployment(); + context = TestUtils.createSnapshotContext(client, deployment); + // Default backoffLimit is -1, meaning unlimited retries + var snapshot = createSavepoint(deployment, false, -1); + snapshot.setStatus(new FlinkStateSnapshotStatus()); + + flinkService.setTriggerSavepointFailure(true); + + // With unlimited retries, the snapshot should never transition to FAILED + for (int i = 0; i < 10; i++) { + controller.updateErrorStatus(snapshot, context, new Exception()); + assertThat(snapshot.getStatus().getState()) + .as( + "Snapshot with backoffLimit=-1 should retry indefinitely, but failed after attempt %d", + i + 1) + .isEqualTo(TRIGGER_PENDING); + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testReconcileSavepointAlreadyExists(boolean jobReferenced) {