Skip to content

Commit b9a975b

Browse files
committed
[FLINK-XXXXX] Terminal jobs should never be restarted by cluster/job health check
1 parent 27f66d9 commit b9a975b

3 files changed

Lines changed: 96 additions & 7 deletions

File tree

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver;
2424
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
2525
import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
26+
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2627
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
2728

2829
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -50,7 +51,8 @@ protected void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
5051
var observeConfig = ctx.getObserveConfig();
5152
savepointObserver.observeSavepointStatus(ctx);
5253
savepointObserver.observeCheckpointStatus(ctx);
53-
if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
54+
if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)
55+
&& !ReconciliationUtils.isJobInTerminalState(ctx.getResource().getStatus())) {
5456
clusterHealthObserver.observe(ctx);
5557
}
5658
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
279279
ClusterHealthEvaluator.getLastValidClusterHealthInfo(
280280
deployment.getStatus().getClusterInfo());
281281
boolean shouldRestartJobBecauseUnhealthy =
282-
shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
282+
shouldRestartJobBecauseUnhealthy(ctx, observeConfig);
283283
boolean shouldRecoverDeployment = shouldRecoverDeployment(observeConfig, deployment);
284284
if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) {
285285
if (shouldRecoverDeployment) {
@@ -316,7 +316,15 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
316316
}
317317

318318
private boolean shouldRestartJobBecauseUnhealthy(
319-
FlinkDeployment deployment, Configuration observeConfig) {
319+
FlinkResourceContext<FlinkDeployment> ctx, Configuration observeConfig) {
320+
var deployment = ctx.getResource();
321+
// Terminal jobs (FAILED, FINISHED, CANCELED) must not be restarted via the cluster health
322+
// check. Restarting a FAILED job is controlled exclusively by OPERATOR_JOB_RESTART_FAILED;
323+
// a terminal job also has no HA metadata, so a health-based restart would always fail.
324+
if (ReconciliationUtils.isJobInTerminalState(deployment.getStatus())) {
325+
return false;
326+
}
327+
320328
boolean restartNeeded = false;
321329

322330
if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
@@ -329,13 +337,13 @@ private boolean shouldRestartJobBecauseUnhealthy(
329337
if (deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
330338
LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
331339
restartNeeded = true;
332-
} else if (HighAvailabilityMode.isHighAvailabilityModeActivated(
333-
observeConfig)) {
334-
LOG.debug("HA is enabled, recovering unhealthy jobmanager deployment");
340+
} else if (ctx.getFlinkService().isHaMetadataAvailable(observeConfig)) {
341+
LOG.debug(
342+
"HA metadata available, recovering unhealthy jobmanager deployment");
335343
restartNeeded = true;
336344
} else {
337345
LOG.warn(
338-
"Could not recover unhealthy jobmanager deployment without HA enabled");
346+
"Could not recover unhealthy jobmanager deployment, HA metadata not available");
339347
}
340348

341349
if (restartNeeded) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@
3232
import io.javaoperatorsdk.operator.api.reconciler.Context;
3333
import org.junit.jupiter.api.BeforeEach;
3434
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.EnumSource;
3536
import org.junit.jupiter.params.provider.MethodSource;
3637

3738
import java.time.Duration;
3839

40+
import static org.apache.flink.api.common.JobStatus.FAILED;
3941
import static org.apache.flink.api.common.JobStatus.RUNNING;
4042
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
4143
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
@@ -156,4 +158,81 @@ public void verifyApplicationNoCompletedCheckpointsJmRecovery(
156158
appCluster.getStatus().getJobManagerDeploymentStatus());
157159
assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
158160
}
161+
162+
@ParameterizedTest
163+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
164+
public void verifyTerminallyFailedJobNotRestartedByHealthCheck(
165+
FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
166+
FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
167+
appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
168+
169+
// Start a healthy deployment
170+
flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
171+
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
172+
testController.reconcile(appCluster, context);
173+
testController.reconcile(appCluster, context);
174+
testController.reconcile(appCluster, context);
175+
assertEquals(
176+
JobManagerDeploymentStatus.READY,
177+
appCluster.getStatus().getJobManagerDeploymentStatus());
178+
assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
179+
180+
// Mark job as terminally FAILED
181+
flinkService.markApplicationJobFailedWithError(
182+
flinkService.listJobs().get(0).f1.getJobId(), "Terminal failure");
183+
184+
// Age the checkpoint health data to simulate an unhealthy evaluation
185+
// (no checkpoint progress within the window), which would normally trigger a restart
186+
var clusterHealthInfo =
187+
ClusterHealthObserver.CLUSTER_HEALTH_INFOS.get(ResourceID.fromResource(appCluster));
188+
clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
189+
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000);
190+
testController.getStatusRecorder().patchAndCacheStatus(appCluster, kubernetesClient);
191+
192+
// Reconcile - FAILED terminal job must NOT be restarted via the health-check codepath.
193+
// The health-based restart path requires HA metadata which a terminated job does not have,
194+
// and restarting a terminal job is controlled exclusively by OPERATOR_JOB_RESTART_FAILED.
195+
testController.reconcile(appCluster, context);
196+
197+
assertEquals(FAILED, appCluster.getStatus().getJobStatus().getState());
198+
assertEquals(
199+
JobManagerDeploymentStatus.READY,
200+
appCluster.getStatus().getJobManagerDeploymentStatus());
201+
}
202+
203+
/**
204+
* For stateful (LAST_STATE / SAVEPOINT) upgrade modes, a health-based restart must NOT be
205+
* triggered when HA metadata is absent. Without HA metadata the restart would immediately fail
206+
* with an UpgradeFailureException, so the check must be skipped entirely.
207+
*/
208+
@ParameterizedTest
209+
@EnumSource(
210+
value = UpgradeMode.class,
211+
names = {"LAST_STATE", "SAVEPOINT"})
212+
public void verifyUnhealthyRestartSkippedWhenHaMetadataAbsent(UpgradeMode upgradeMode)
213+
throws Exception {
214+
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
215+
appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
216+
217+
// Start a healthy deployment (HA metadata available by default)
218+
testController.reconcile(appCluster, context);
219+
testController.reconcile(appCluster, context);
220+
testController.reconcile(appCluster, context);
221+
assertEquals(
222+
JobManagerDeploymentStatus.READY,
223+
appCluster.getStatus().getJobManagerDeploymentStatus());
224+
assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
225+
226+
// Simulate unhealthy cluster (restart count exceeds threshold) while HA metadata is absent
227+
flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "100");
228+
flinkService.setHaDataAvailable(false);
229+
230+
testController.reconcile(appCluster, context);
231+
232+
// Health-based restart must NOT be triggered when HA metadata is absent
233+
assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
234+
assertEquals(
235+
JobManagerDeploymentStatus.READY,
236+
appCluster.getStatus().getJobManagerDeploymentStatus());
237+
}
159238
}

0 commit comments

Comments
 (0)