Skip to content

Commit 90a7faf

Browse files
authored
[FLINK-37766] FlinkSessionJob deletion blocked by finalizer
1 parent a00bf27 commit 90a7faf

2 files changed

Lines changed: 138 additions & 19 deletions

File tree

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -345,9 +345,11 @@ protected CancelResult cancelJob(
345345
}
346346
break;
347347
case CANCEL:
348-
cancelJobOrError(clusterClient, status, false);
349-
// This is async we need to return
350-
return CancelResult.pending();
348+
if (!cancelJobOrError(clusterClient, status, false)) {
349+
// This is async we need to return
350+
return CancelResult.pending();
351+
}
352+
break;
351353
}
352354
}
353355
if (suspendMode.deleteCluster() || deleteCluster) {
@@ -370,9 +372,12 @@ public CancelResult cancelSessionJob(
370372
switch (suspendMode) {
371373
case STATELESS:
372374
case CANCEL:
373-
cancelJobOrError(clusterClient, status, suspendMode == SuspendMode.STATELESS);
374-
// This is async we need to return and re-observe
375-
return CancelResult.pending();
375+
if (!cancelJobOrError(
376+
clusterClient, status, suspendMode == SuspendMode.STATELESS)) {
377+
// This is async we need to return and re-observe
378+
return CancelResult.pending();
379+
}
380+
break;
376381
case SAVEPOINT:
377382
savepointPath = savepointJobOrError(clusterClient, status, conf);
378383
break;
@@ -383,14 +388,30 @@ public CancelResult cancelSessionJob(
383388
return CancelResult.completed(savepointPath);
384389
}
385390

386-
public void cancelJobOrError(
391+
/**
392+
* Attempts to cancel a Flink job given its current status. If the job is already in the process
393+
* of being cancelled or has been terminated, the method handles these cases accordingly.
394+
*
395+
* @param clusterClient the {@code RestClusterClient} instance used to interact with the Flink
396+
* cluster.
397+
* @param status the current status of the job, encapsulated in a {@code CommonStatus} object.
398+
* @param ignoreMissing a flag indicating whether the absence of the job should be ignored. If
399+
* {@code true}, the method will return {@code true} when the job is missing. If {@code
400+
* false}, an exception will be thrown when the job cannot be found.
401+
* @return {@code true} if the job was already missing or terminated, and no further action is
402+
* needed. {@code false} if cancellation was successfully initiated and is still pending
403+
* (the caller should await completion before proceeding).
404+
* @throws UpgradeFailureException if the job cannot be cancelled due to an unexpected error, or
405+
* if the job is missing and {@code ignoreMissing} is set to {@code false}.
406+
*/
407+
public boolean cancelJobOrError(
387408
RestClusterClient<String> clusterClient,
388409
CommonStatus<?> status,
389410
boolean ignoreMissing) {
390411
var jobID = JobID.fromHexString(status.getJobStatus().getJobId());
391412
if (ReconciliationUtils.isJobCancelling(status)) {
392413
LOG.info("Job already cancelling");
393-
return;
414+
return false;
394415
}
395416
LOG.info("Cancelling job");
396417
try {
@@ -402,6 +423,7 @@ public void cancelJobOrError(
402423
if (isJobMissing(e)) {
403424
if (ignoreMissing) {
404425
LOG.info("Job already missing");
426+
return true;
405427
} else {
406428
throw new UpgradeFailureException(
407429
"Cannot find job when trying to cancel",
@@ -410,13 +432,15 @@ public void cancelJobOrError(
410432
}
411433
} else if (isJobTerminated(e)) {
412434
LOG.info("Job already terminated");
435+
return true;
413436
} else {
414437
LOG.warn("Error while cancelling job", e);
415438
throw new UpgradeFailureException(
416439
"Cancellation Error", EventRecorder.Reason.CleanupFailed.name(), e);
417440
}
418441
}
419442
status.getJobStatus().setState(JobStatus.CANCELLING);
443+
return false;
420444
}
421445

422446
public String savepointJobOrError(
@@ -487,9 +511,16 @@ public static boolean isJobTerminated(Exception e) {
487511
return true;
488512
}
489513

490-
return findThrowable(e, RestClientException.class)
514+
if (findThrowable(e, RestClientException.class)
491515
.map(RestClientException::getHttpResponseStatus)
492516
.map(respCode -> HttpResponseStatus.CONFLICT == respCode)
517+
.orElse(false)) {
518+
return true;
519+
}
520+
521+
return Optional.ofNullable(ExceptionUtils.getExceptionMessage(e))
522+
.map(String::toLowerCase)
523+
.map(msg -> msg.contains("already reached another terminal state"))
493524
.orElse(false);
494525
}
495526

@@ -1051,6 +1082,7 @@ private static String createEmptyJar() {
10511082
}
10521083
}
10531084

1085+
@Override
10541086
public Map<String, String> getMetrics(
10551087
Configuration conf, String jobId, List<String> metricNames) throws Exception {
10561088
try (var clusterClient = getClusterClient(conf)) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java

Lines changed: 97 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
114114
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
115115
import lombok.SneakyThrows;
116+
import org.jetbrains.annotations.NotNull;
116117
import org.junit.jupiter.api.Assertions;
117118
import org.junit.jupiter.api.BeforeEach;
118119
import org.junit.jupiter.api.Test;
@@ -334,15 +335,7 @@ public void cancelJobWithStatelessUpgradeModeTest() throws Exception {
334335
@ValueSource(ints = {404, 409, 500})
335336
public void cancelErrorHandling(int statusCode) throws Exception {
336337

337-
var testingClusterClient =
338-
new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);
339-
testingClusterClient.setCancelFunction(
340-
jobID ->
341-
CompletableFuture.failedFuture(
342-
new RuntimeException(
343-
new RestClientException(
344-
"errrr", HttpResponseStatus.valueOf(statusCode)))));
345-
var flinkService = new TestingService(testingClusterClient);
338+
var flinkService = getTestingService("errrr", HttpResponseStatus.valueOf(statusCode));
346339

347340
JobID jobID = JobID.generate();
348341
var job = TestUtils.buildSessionJob();
@@ -360,10 +353,104 @@ public void cancelErrorHandling(int statusCode) throws Exception {
360353
assertEquals(RUNNING, jobStatus.getState());
361354
} else {
362355
flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new Configuration());
363-
assertEquals(CANCELLING, jobStatus.getState());
356+
assertEquals(FINISHED, jobStatus.getState());
357+
assertNull(jobStatus.getJobId());
364358
}
365359
}
366360

361+
@Test
362+
public void cancelErrorHandlingWithTerminalStateMessage() throws Exception {
363+
var flinkService =
364+
getTestingService(
365+
"Job cancellation failed because the job has already reached another terminal state (FAILED).",
366+
HttpResponseStatus.BAD_REQUEST);
367+
368+
JobID jobID = JobID.generate();
369+
var job = TestUtils.buildSessionJob();
370+
var jobStatus = job.getStatus().getJobStatus();
371+
jobStatus.setJobId(jobID.toHexString());
372+
jobStatus.setState(RUNNING);
373+
ReconciliationUtils.updateStatusForDeployedSpec(job, new Configuration());
374+
375+
flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new Configuration());
376+
assertEquals(FINISHED, jobStatus.getState());
377+
assertNull(jobStatus.getJobId());
378+
}
379+
380+
/**
381+
* Reproduces the operator-upgrade scenario for Session Mode with CANCEL upgrade mode: when a
382+
* running session job's JobManager has already moved the job into a terminal state (e.g.
383+
* FAILED) and the operator (after a restart/upgrade) tries to cancel it, the cancellation
384+
* request comes back with "already reached another terminal state". Previously this caused the
385+
* finalizer to never be removed, leaving the CR stuck in Terminating.
386+
*/
387+
@Test
388+
public void cancelSessionJobWithCancelModeAndTerminalStateMessage() throws Exception {
389+
var flinkService =
390+
getTestingService(
391+
"Job cancellation failed because the job has already reached another terminal state (FAILED).",
392+
HttpResponseStatus.BAD_REQUEST);
393+
394+
JobID jobID = JobID.generate();
395+
var job = TestUtils.buildSessionJob();
396+
var jobStatus = job.getStatus().getJobStatus();
397+
jobStatus.setJobId(jobID.toHexString());
398+
jobStatus.setState(RUNNING);
399+
ReconciliationUtils.updateStatusForDeployedSpec(job, new Configuration());
400+
401+
var result = flinkService.cancelSessionJob(job, SuspendMode.CANCEL, new Configuration());
402+
// Must NOT be pending — the CR would otherwise be stuck in Terminating indefinitely
403+
assertFalse(result.isPending());
404+
assertEquals(FINISHED, jobStatus.getState());
405+
assertNull(jobStatus.getJobId());
406+
}
407+
408+
@NotNull
409+
private TestingService getTestingService(String message, HttpResponseStatus badRequest)
410+
throws Exception {
411+
final var testingClusterClient =
412+
new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);
413+
testingClusterClient.setCancelFunction(
414+
jobID ->
415+
CompletableFuture.failedFuture(
416+
new RuntimeException(
417+
new RestClientException(message, badRequest))));
418+
return new TestingService(testingClusterClient);
419+
}
420+
421+
/**
422+
* Reproduces FLINK-37766 for Application Mode: when a running application job's JobManager has
423+
* moved the job to a terminal state (e.g. FAILED due to HA desync) and the operator tries to
424+
* cancel the job with CANCEL suspend mode (used for last-state upgrades), the "already reached
425+
* another terminal state" response previously caused the operator to always return
426+
* CancelResult.pending(), looping forever without completing the upgrade/deletion.
427+
*/
428+
@Test
429+
public void cancelApplicationJobWithCancelModeAndTerminalStateMessage() throws Exception {
430+
var flinkService =
431+
getTestingService(
432+
"Job cancellation failed because the job has already reached another terminal state (FAILED).",
433+
HttpResponseStatus.BAD_REQUEST);
434+
435+
JobID jobID = JobID.generate();
436+
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
437+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
438+
JobStatus jobStatus = deployment.getStatus().getJobStatus();
439+
jobStatus.setJobId(jobID.toHexString());
440+
jobStatus.setState(RUNNING);
441+
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
442+
443+
var result =
444+
flinkService.cancelJob(
445+
deployment,
446+
SuspendMode.CANCEL,
447+
configManager.getObserveConfig(deployment),
448+
false);
449+
// Must NOT be pending — the operator would otherwise loop forever on the upgrade
450+
assertFalse(result.isPending());
451+
assertEquals(FINISHED, jobStatus.getState());
452+
}
453+
367454
@ParameterizedTest
368455
@ValueSource(booleans = {true, false})
369456
public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint)

0 commit comments

Comments
 (0)