Skip to content

Commit 5350465

Browse files
authored
[FLINK-39967] Fix backoffLimit=-1 to mean unlimited retries in FlinkStateSnapshot
1 parent d131b70 commit 5350465

2 files changed

Lines changed: 23 additions & 1 deletion

File tree

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public ErrorStatusUpdateControl<FlinkStateSnapshot> updateErrorStatus(
128128
resource.getStatus().getError(),
129129
ctx.getKubernetesClient());
130130

131-
if (resource.getStatus().getFailures() > resource.getSpec().getBackoffLimit()) {
131+
var backoffLimit = resource.getSpec().getBackoffLimit();
132+
if (backoffLimit >= 0 && resource.getStatus().getFailures() > backoffLimit) {
132133
LOG.info(
133134
"Snapshot {} failed and won't be retried as failure count exceeded the backoff limit",
134135
resource.getMetadata().getName());

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,27 @@ public void testReconcileBackoff(int backoffLimit) {
154154
assertThat(snapshot.getStatus().getState()).isEqualTo(FAILED);
155155
}
156156

157+
@Test
158+
public void testReconcileBackoffUnlimited() {
159+
var deployment = createDeployment();
160+
context = TestUtils.createSnapshotContext(client, deployment);
161+
// Default backoffLimit is -1, meaning unlimited retries
162+
var snapshot = createSavepoint(deployment, false, -1);
163+
snapshot.setStatus(new FlinkStateSnapshotStatus());
164+
165+
flinkService.setTriggerSavepointFailure(true);
166+
167+
// With unlimited retries, the snapshot should never transition to FAILED
168+
for (int i = 0; i < 10; i++) {
169+
controller.updateErrorStatus(snapshot, context, new Exception());
170+
assertThat(snapshot.getStatus().getState())
171+
.as(
172+
"Snapshot with backoffLimit=-1 should retry indefinitely, but failed after attempt %d",
173+
i + 1)
174+
.isEqualTo(TRIGGER_PENDING);
175+
}
176+
}
177+
157178
@ParameterizedTest
158179
@ValueSource(booleans = {true, false})
159180
public void testReconcileSavepointAlreadyExists(boolean jobReferenced) {

0 commit comments

Comments
 (0)