From 2c2515c29eaa3929f7466324b39fd9b5f69803b1 Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Thu, 10 Jul 2025 23:47:58 +0530 Subject: [PATCH 1/8] [FLINK-28648][Flink-Kubernetes-Operator] Fix Cleanup Process for Session Cluster --- .../KubernetesOperatorConfigOptions.java | 8 ++ .../deployment/SessionReconciler.java | 103 ++++++++++++++++-- 2 files changed, 104 insertions(+), 7 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 6f5891cc5d..bd5e0a46e8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -647,6 +647,14 @@ public static String operatorConfigKey(String key) { .withDescription( "Indicate whether the job should be drained when stopping with savepoint."); + @Documentation.Section(SECTION_DYNAMIC) + public static final ConfigOption BLOCK_ON_UNMANAGED_JOBS = + operatorConfig("session.block-on-unmanaged-jobs") + .booleanType() + .defaultValue(true) + .withDescription( + "Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI."); + @Documentation.Section(SECTION_ADVANCED) public static final ConfigOption REFRESH_CLUSTER_RESOURCE_VIEW = operatorConfig("cluster.resource-view.refresh-interval") diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index c628bf940e..983c381d8f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.api.common.JobID; import org.apache.flink.autoscaler.NoopJobAutoscaler; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; @@ -25,11 +26,16 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; import org.slf4j.Logger; @@ -120,6 +126,61 @@ private void recoverSession(FlinkResourceContext ctx) throws Ex .setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); } + /** + * Detects unmanaged jobs running in the session cluster. Unmanaged jobs are jobs that exist in + * the Flink cluster but are not managed by FlinkSessionJob resources. + */ + private Set getUnmanagedJobs( + FlinkResourceContext ctx, Set sessionJobs) { + LOG.info( + "Starting unmanaged job detection for session cluster: {}", + ctx.getResource().getMetadata().getName()); + try { + // Get all jobs running in the Flink cluster + var flinkService = ctx.getFlinkService(); + var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig()); + var allJobs = + clusterClient + .sendRequest( + JobsOverviewHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()) + .get() + .getJobs(); + + // Get job IDs managed by FlinkSessionJob resources + Set managedJobIds = + sessionJobs.stream() + .map(job -> job.getStatus().getJobStatus().getJobId()) + .filter(jobId -> jobId != null) + .map(JobID::fromHexString) + .collect(Collectors.toSet()); + + // Get job IDs that are not managed by FlinkSessionJob resources and are currently + // running + // FAILED(TerminalState.GLOBALLY) + // CANCELED(TerminalState.GLOBALLY) + // FINISHED(TerminalState.GLOBALLY) + // Above terminal states are ignored + Set unmanagedJobIds = + allJobs.stream() + .filter( + job -> + !job.getStatus() + .isGloballyTerminalState()) // Only consider + // running jobs + .map(JobDetails::getJobId) + .filter(jobId -> !managedJobIds.contains(jobId)) + .collect(Collectors.toSet()); + + LOG.info("Detected {} unmanaged job IDs: {}", unmanagedJobIds.size(), unmanagedJobIds); + return unmanagedJobIds; + } catch (Exception e) { + LOG.warn("Failed to detect unmanaged jobs in session cluster", e); + return Set.of(); + } + } + @Override public DeleteControl cleanupInternal(FlinkResourceContext ctx) { Set sessionJobs = @@ -143,13 +204,41 @@ public DeleteControl cleanupInternal(FlinkResourceContext ctx) } return DeleteControl.noFinalizerRemoval() .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis()); - } else { - LOG.info("Stopping session cluster"); - var conf = ctx.getDeployConfig(ctx.getResource().getSpec()); - ctx.getFlinkService() - .deleteClusterDeployment( - deployment.getMetadata(), deployment.getStatus(), conf, true); - return DeleteControl.defaultDelete(); } + + // Check for unmanaged jobs if the option is enabled (Enabled by default) + boolean blockOnUnmanagedJobs = + ctx.getObserveConfig() + .getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS); + if (blockOnUnmanagedJobs) { + Set unmanagedJobs = getUnmanagedJobs(ctx, sessionJobs); + if (!unmanagedJobs.isEmpty()) { + var error = + String.format( + "The session cluster has unmanaged jobs %s that should be cancelled first", + unmanagedJobs.stream() + .map(JobID::toHexString) + .collect(Collectors.toList())); + LOG.warn(error); + if (eventRecorder.triggerEvent( + deployment, + EventRecorder.Type.Warning, + EventRecorder.Reason.CleanupFailed, + EventRecorder.Component.Operator, + error, + ctx.getKubernetesClient())) { + LOG.warn(error); + } + return DeleteControl.noFinalizerRemoval() + .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis()); + } + } + + LOG.info("Stopping session cluster"); + var conf = ctx.getDeployConfig(ctx.getResource().getSpec()); + ctx.getFlinkService() + .deleteClusterDeployment( + deployment.getMetadata(), deployment.getStatus(), conf, true); + return DeleteControl.defaultDelete(); } } From 9fb1bb66f5c03fe0b9556f4191e607fac11a39eb Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Fri, 11 Jul 2025 16:02:11 +0530 Subject: [PATCH 2/8] [FLINK-28648][Flink-Kubernetes-Operator] Fix Cleanup Process for Session Cluster --- .../deployment/SessionReconciler.java | 2 +- .../deployment/SessionReconcilerTest.java | 170 ++++++++++++++++++ 2 files changed, 171 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 983c381d8f..22d3990b7b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -161,7 +161,7 @@ private Set getUnmanagedJobs( // FAILED(TerminalState.GLOBALLY) // CANCELED(TerminalState.GLOBALLY) // FINISHED(TerminalState.GLOBALLY) - // Above terminal states are ignored + // Above terminal states are not considered since they are no longer active jobs Set unmanagedJobIds = allJobs.stream() .filter( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java index 54484065fa..4032ab949c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java @@ -17,17 +17,24 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter; +import org.apache.flink.runtime.client.JobStatusMessage; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.KubernetesClient; @@ -36,13 +43,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -139,4 +149,164 @@ public void testSetOwnerReference() throws Exception { deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE); Assertions.assertEquals(expectedOwnerReferences, or); } + + @Test + public void testUnmanagedJobsIdentification() throws Exception { + // Create a session cluster deployment with blocking enabled + FlinkDeployment deployment = TestUtils.buildSessionCluster(); + deployment + .getSpec() + .getFlinkConfiguration() + .put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true"); + + assertEquals( + "true", + deployment + .getSpec() + .getFlinkConfiguration() + .get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key())); + + // First, deploy the session cluster to set up proper state + reconciler.reconcile(deployment, flinkService.getContext()); + + // Verify deployment is in DEPLOYED state + assertEquals( + ReconciliationState.DEPLOYED, + deployment.getStatus().getReconciliationStatus().getState()); + + // Create different types of jobs + JobID managedJobId1 = new JobID(); + JobID managedJobId2 = new JobID(); + JobID unmanagedRunningJobId1 = new JobID(); + JobID unmanagedTerminatedJobId = new JobID(); + JobID unmanagedRunningJobId2 = new JobID(); + + // Add a mix of managed and unmanaged jobs to the testing service + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + managedJobId1, + "managed-job-1", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + managedJobId2, + "managed-job-2", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + unmanagedRunningJobId1, + "unmanaged-running-job-1", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + unmanagedTerminatedJobId, + "unmanaged-terminated-job", + JobStatus.CANCELED, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + unmanagedRunningJobId2, + "unmanaged-running-job-2", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + + // Create FlinkSessionJob resources for the managed jobs + FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob(); + managedSessionJob1.getMetadata().setName("managed-session-job-1"); + managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString()); + kubernetesClient.resource(managedSessionJob1).createOrReplace(); + + FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob(); + managedSessionJob2.getMetadata().setName("managed-session-job-2"); + managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString()); + kubernetesClient.resource(managedSessionJob2).createOrReplace(); + + Set sessionJobs = new HashSet<>(); + sessionJobs.add(managedSessionJob1); + sessionJobs.add(managedSessionJob2); + + // Test with blocking enabled - should identify running unmanaged jobs correctly + var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient); + var resourceContext = getResourceContext(deployment, context); + + // Use reflection to access the private getUnmanagedJobs method + var sessionReconciler = reconciler.getReconciler(); + var getUnmanagedJobsMethod = + sessionReconciler + .getClass() + .getDeclaredMethod( + "getUnmanagedJobs", FlinkResourceContext.class, Set.class); + getUnmanagedJobsMethod.setAccessible(true); + + @SuppressWarnings("unchecked") + var unmanagedJobs = + (java.util.Set) + getUnmanagedJobsMethod.invoke( + sessionReconciler, resourceContext, sessionJobs); + + // Verify only RUNNING unmanaged jobs are identified - should be 2 + assertEquals(2, unmanagedJobs.size(), "Should identify exactly 2 running unmanaged jobs"); + + // Verify that none of the managed job IDs are in the unmanaged list + Set managedJobIds = + sessionJobs.stream() + .map(job -> job.getStatus().getJobStatus().getJobId()) + .collect(Collectors.toSet()); + + for (String managedJobId : managedJobIds) { + assertFalse( + unmanagedJobs.contains(managedJobId), + "Managed job " + managedJobId + " should NOT be in unmanaged jobs"); + } + + // Verify managed jobs are properly set up (we should have 2 managed jobs) + assertEquals(2, managedJobIds.size(), "Should have 2 managed jobs configured"); + + // Test scenario with no running unmanaged jobs + flinkService + .listJobs() + .removeIf( + job -> + job.f1.getJobId().equals(unmanagedRunningJobId1) + || job.f1.getJobId().equals(unmanagedRunningJobId2)); + + @SuppressWarnings("unchecked") + var unmanagedJobsAfterRemoval = + (java.util.Set) + getUnmanagedJobsMethod.invoke( + sessionReconciler, resourceContext, sessionJobs); + + assertEquals( + 0, + unmanagedJobsAfterRemoval.size(), + "Should have no unmanaged jobs when only managed and terminated jobs exist"); + } } From 8a1235271e1026b9d24f1b9f5f94f2976e309df0 Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Fri, 11 Jul 2025 16:07:57 +0530 Subject: [PATCH 3/8] [FLINK-28648][Flink-Kubernetes-Operator] Fix Cleanup Process for Session Cluster --- .../operator/reconciler/deployment/SessionReconcilerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java index 4032ab949c..29e6e23503 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java @@ -151,7 +151,7 @@ public void testSetOwnerReference() throws Exception { } @Test - public void testUnmanagedJobsIdentification() throws Exception { + public void testGetUnmanagedJobs() throws Exception { // Create a session cluster deployment with blocking enabled FlinkDeployment deployment = TestUtils.buildSessionCluster(); deployment From 8ed2ea48c4bc5c8bcbca1a4423ab5b0734762780 Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Mon, 14 Jul 2025 01:45:52 +0530 Subject: [PATCH 4/8] [FLINK-28648][Flink-Kubernetes-Operator] Add java docs for the added configuration --- docs/layouts/shortcodes/generated/dynamic_section.html | 6 ++++++ .../generated/kubernetes_operator_config_configuration.html | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html index 54c1eb6946..f81afa4f2a 100644 --- a/docs/layouts/shortcodes/generated/dynamic_section.html +++ b/docs/layouts/shortcodes/generated/dynamic_section.html @@ -194,6 +194,12 @@ Duration The interval before a savepoint trigger attempt is marked as unsuccessful. + +
kubernetes.operator.session.block-on-unmanaged-jobs
+ true + Boolean + Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI. +
kubernetes.operator.snapshot.resource.enabled
true diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 4a72d8b766..2ba0de9ed5 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -416,6 +416,12 @@ Duration The interval before a savepoint trigger attempt is marked as unsuccessful. + +
kubernetes.operator.session.block-on-unmanaged-jobs
+ true + Boolean + Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI. +
kubernetes.operator.snapshot.resource.enabled
true From ff26986c7827cd62ef8c2253a54193428fb92f86 Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Mon, 14 Jul 2025 14:27:02 +0530 Subject: [PATCH 5/8] [FLINK-28648][Flink-Kubernetes-Operator] Changed getUnManagedJobs -> getNonTerminated + annotated @VisibleForTesting --- .../deployment/SessionReconciler.java | 46 +++------- .../deployment/SessionReconcilerTest.java | 87 +++++++++---------- 2 files changed, 54 insertions(+), 79 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 22d3990b7b..40b039d0df 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.autoscaler.NoopJobAutoscaler; import org.apache.flink.configuration.Configuration; @@ -130,11 +131,9 @@ private void recoverSession(FlinkResourceContext ctx) throws Ex * Detects unmanaged jobs running in the session cluster. Unmanaged jobs are jobs that exist in * the Flink cluster but are not managed by FlinkSessionJob resources. */ - private Set getUnmanagedJobs( - FlinkResourceContext ctx, Set sessionJobs) { - LOG.info( - "Starting unmanaged job detection for session cluster: {}", - ctx.getResource().getMetadata().getName()); + @VisibleForTesting + Set getNonTerminalJobs(FlinkResourceContext ctx) { + LOG.debug("Starting nonTerminal jobs detection for session cluster"); try { // Get all jobs running in the Flink cluster var flinkService = ctx.getFlinkService(); @@ -148,35 +147,17 @@ private Set getUnmanagedJobs( .get() .getJobs(); - // Get job IDs managed by FlinkSessionJob resources - Set managedJobIds = - sessionJobs.stream() - .map(job -> job.getStatus().getJobStatus().getJobId()) - .filter(jobId -> jobId != null) - .map(JobID::fromHexString) - .collect(Collectors.toSet()); - - // Get job IDs that are not managed by FlinkSessionJob resources and are currently // running - // FAILED(TerminalState.GLOBALLY) - // CANCELED(TerminalState.GLOBALLY) - // FINISHED(TerminalState.GLOBALLY) // Above terminal states are not considered since they are no longer active jobs - Set unmanagedJobIds = + Set nonTerminalJobIds = allJobs.stream() - .filter( - job -> - !job.getStatus() - .isGloballyTerminalState()) // Only consider - // running jobs + .filter(job -> !job.getStatus().isGloballyTerminalState()) .map(JobDetails::getJobId) - .filter(jobId -> !managedJobIds.contains(jobId)) .collect(Collectors.toSet()); - LOG.info("Detected {} unmanaged job IDs: {}", unmanagedJobIds.size(), unmanagedJobIds); - return unmanagedJobIds; + return nonTerminalJobIds; } catch (Exception e) { - LOG.warn("Failed to detect unmanaged jobs in session cluster", e); + LOG.warn("Failed to detect nonTerminal jobs in session cluster", e); return Set.of(); } } @@ -206,20 +187,19 @@ public DeleteControl cleanupInternal(FlinkResourceContext ctx) .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis()); } - // Check for unmanaged jobs if the option is enabled (Enabled by default) + // Check for unmanaged non-terminated jobs if the option is enabled (Enabled by default) boolean blockOnUnmanagedJobs = ctx.getObserveConfig() .getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS); if (blockOnUnmanagedJobs) { - Set unmanagedJobs = getUnmanagedJobs(ctx, sessionJobs); - if (!unmanagedJobs.isEmpty()) { + Set nonTerminalJobs = getNonTerminalJobs(ctx); + if (!nonTerminalJobs.isEmpty()) { var error = String.format( - "The session cluster has unmanaged jobs %s that should be cancelled first", - unmanagedJobs.stream() + "The session cluster has non terminated jobs %s that should be cancelled first", + nonTerminalJobs.stream() .map(JobID::toHexString) .collect(Collectors.toList())); - LOG.warn(error); if (eventRecorder.triggerEvent( deployment, EventRecorder.Type.Warning, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java index 29e6e23503..9f84f2c113 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java @@ -32,7 +32,6 @@ import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; -import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter; import org.apache.flink.runtime.client.JobStatusMessage; @@ -49,7 +48,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -151,8 +149,7 @@ public void testSetOwnerReference() throws Exception { } @Test - public void testGetUnmanagedJobs() throws Exception { - // Create a session cluster deployment with blocking enabled + public void testGetNonTerminalJobs() throws Exception { FlinkDeployment deployment = TestUtils.buildSessionCluster(); deployment .getSpec() @@ -166,7 +163,6 @@ public void testGetUnmanagedJobs() throws Exception { .getFlinkConfiguration() .get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key())); - // First, deploy the session cluster to set up proper state reconciler.reconcile(deployment, flinkService.getContext()); // Verify deployment is in DEPLOYED state @@ -181,7 +177,7 @@ public void testGetUnmanagedJobs() throws Exception { JobID unmanagedTerminatedJobId = new JobID(); JobID unmanagedRunningJobId2 = new JobID(); - // Add a mix of managed and unmanaged jobs to the testing service + // Add jobs to the testing service flinkService .listJobs() .add( @@ -253,44 +249,46 @@ public void testGetUnmanagedJobs() throws Exception { sessionJobs.add(managedSessionJob1); sessionJobs.add(managedSessionJob2); - // Test with blocking enabled - should identify running unmanaged jobs correctly + // Test with blocking enabled - should identify all non-terminal jobs var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient); var resourceContext = getResourceContext(deployment, context); - // Use reflection to access the private getUnmanagedJobs method - var sessionReconciler = reconciler.getReconciler(); - var getUnmanagedJobsMethod = - sessionReconciler - .getClass() - .getDeclaredMethod( - "getUnmanagedJobs", FlinkResourceContext.class, Set.class); - getUnmanagedJobsMethod.setAccessible(true); - - @SuppressWarnings("unchecked") - var unmanagedJobs = - (java.util.Set) - getUnmanagedJobsMethod.invoke( - sessionReconciler, resourceContext, sessionJobs); - - // Verify only RUNNING unmanaged jobs are identified - should be 2 - assertEquals(2, unmanagedJobs.size(), "Should identify exactly 2 running unmanaged jobs"); - - // Verify that none of the managed job IDs are in the unmanaged list - Set managedJobIds = - sessionJobs.stream() - .map(job -> job.getStatus().getJobStatus().getJobId()) - .collect(Collectors.toSet()); - - for (String managedJobId : managedJobIds) { - assertFalse( - unmanagedJobs.contains(managedJobId), - "Managed job " + managedJobId + " should NOT be in unmanaged jobs"); - } + var sessionReconciler = (SessionReconciler) reconciler.getReconciler(); + Set nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext); + + // Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged + // running) + assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs"); + + assertTrue( + nonTerminalJobs.contains(unmanagedRunningJobId1), + "Should contain unmanagedRunningJobId1"); + assertTrue( + nonTerminalJobs.contains(unmanagedRunningJobId2), + "Should contain unmanagedRunningJobId2"); - // Verify managed jobs are properly set up (we should have 2 managed jobs) - assertEquals(2, managedJobIds.size(), "Should have 2 managed jobs configured"); + // Verify terminated job is not included + assertFalse( + nonTerminalJobs.contains(unmanagedTerminatedJobId), + "Should not contain terminated job"); + + // Test scenario with only unmanaged jobs + flinkService + .listJobs() + .removeIf( + job -> + job.f1.getJobId().equals(managedJobId1) + || job.f1.getJobId().equals(managedJobId2)); + + Set nonTerminalJobsAfterSessionJobsRemoval = + sessionReconciler.getNonTerminalJobs(resourceContext); + + assertEquals( + 2, + nonTerminalJobsAfterSessionJobsRemoval.size(), + "Should have 2 non-terminal jobs when sessionjobs are deleted"); - // Test scenario with no running unmanaged jobs + // Test scenario with no running jobs flinkService .listJobs() .removeIf( @@ -298,15 +296,12 @@ public void testGetUnmanagedJobs() throws Exception { job.f1.getJobId().equals(unmanagedRunningJobId1) || job.f1.getJobId().equals(unmanagedRunningJobId2)); - @SuppressWarnings("unchecked") - var unmanagedJobsAfterRemoval = - (java.util.Set) - getUnmanagedJobsMethod.invoke( - sessionReconciler, resourceContext, sessionJobs); + Set nonTerminalJobsAfterRemoval = + sessionReconciler.getNonTerminalJobs(resourceContext); assertEquals( 0, - unmanagedJobsAfterRemoval.size(), - "Should have no unmanaged jobs when only managed and terminated jobs exist"); + nonTerminalJobsAfterRemoval.size(), + "Should have no non-terminal jobs when only terminated jobs exist"); } } From 07d5c1a9ae03d3d7958ef56fec0a06186cbb0471 Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Mon, 14 Jul 2025 14:30:49 +0530 Subject: [PATCH 6/8] [FLINK-28648][Flink-Kubernetes-Operator] Changed getUnManagedJobs -> getNonTerminated + annotated @VisibleForTesting --- .../operator/reconciler/deployment/SessionReconciler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 40b039d0df..5506449ba5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -147,8 +147,7 @@ Set getNonTerminalJobs(FlinkResourceContext ctx) { .get() .getJobs(); - // running - // Above terminal states are not considered since they are no longer active jobs + // running job Ids Set nonTerminalJobIds = allJobs.stream() .filter(job -> !job.getStatus().isGloballyTerminalState()) From 7175b2bd40db9d08f181c2f58d6027c1f01cf6b4 Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Mon, 14 Jul 2025 20:19:54 +0530 Subject: [PATCH 7/8] [FLINK-28648][Flink-Kubernetes-Operator] Cleanup Comments --- .../operator/reconciler/deployment/SessionReconciler.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 5506449ba5..46a5eb4a7b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -127,10 +127,7 @@ private void recoverSession(FlinkResourceContext ctx) throws Ex .setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); } - /** - * Detects unmanaged jobs running in the session cluster. Unmanaged jobs are jobs that exist in - * the Flink cluster but are not managed by FlinkSessionJob resources. - */ + // Detects jobs which are not in globally terminated states @VisibleForTesting Set getNonTerminalJobs(FlinkResourceContext ctx) { LOG.debug("Starting nonTerminal jobs detection for session cluster"); @@ -186,7 +183,8 @@ public DeleteControl cleanupInternal(FlinkResourceContext ctx) .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis()); } - // Check for unmanaged non-terminated jobs if the option is enabled (Enabled by default) + // Check for non-terminated jobs if the option is enabled (Enabled by default) , after + // sessionJobs are deleted boolean blockOnUnmanagedJobs = ctx.getObserveConfig() .getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS); From fb9776ea5c30adce3a8f80f202d5823faf878f87 Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Mon, 21 Jul 2025 15:39:11 +0530 Subject: [PATCH 8/8] [FLINK-28648][Kubernetes Operator] Addressed minor comments --- .../operator/reconciler/deployment/SessionReconciler.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 46a5eb4a7b..809fbfbd12 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -197,22 +197,20 @@ public DeleteControl cleanupInternal(FlinkResourceContext ctx) nonTerminalJobs.stream() .map(JobID::toHexString) .collect(Collectors.toList())); - if (eventRecorder.triggerEvent( + eventRecorder.triggerEvent( deployment, EventRecorder.Type.Warning, EventRecorder.Reason.CleanupFailed, EventRecorder.Component.Operator, error, - ctx.getKubernetesClient())) { - LOG.warn(error); - } + ctx.getKubernetesClient()); return DeleteControl.noFinalizerRemoval() .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis()); } } LOG.info("Stopping session cluster"); - var conf = ctx.getDeployConfig(ctx.getResource().getSpec()); + var conf = ctx.getObserveConfig(); ctx.getFlinkService() .deleteClusterDeployment( deployment.getMetadata(), deployment.getStatus(), conf, true);