Skip to content

Commit 896590b

Browse files
authored
[FLINK-39432] Add option to delete with session running session jobs
1 parent c046a80 commit 896590b

5 files changed

Lines changed: 110 additions & 7 deletions

File tree

docs/layouts/shortcodes/generated/dynamic_section.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,13 @@
195195
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
196196
</tr>
197197
<tr>
198-
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
198+
<td><h5>kubernetes.operator.session.deletion.block-on-session-jobs</h5></td>
199+
<td style="word-wrap: break-word;">true</td>
200+
<td>Boolean</td>
201+
<td>Block FlinkDeployment deletion if managed jobs are running in the session cluster.</td>
202+
</tr>
203+
<tr>
204+
<td><h5>kubernetes.operator.session.deletion.block-on-unmanaged-jobs</h5></td>
199205
<td style="word-wrap: break-word;">true</td>
200206
<td>Boolean</td>
201207
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,13 @@
429429
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
430430
</tr>
431431
<tr>
432-
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
432+
<td><h5>kubernetes.operator.session.deletion.block-on-session-jobs</h5></td>
433+
<td style="word-wrap: break-word;">true</td>
434+
<td>Boolean</td>
435+
<td>Block FlinkDeployment deletion if managed jobs are running in the session cluster.</td>
436+
</tr>
437+
<tr>
438+
<td><h5>kubernetes.operator.session.deletion.block-on-unmanaged-jobs</h5></td>
433439
<td style="word-wrap: break-word;">true</td>
434440
<td>Boolean</td>
435441
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,12 +657,21 @@ public static String operatorConfigKey(String key) {
657657

658658
@Documentation.Section(SECTION_DYNAMIC)
659659
public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
660-
operatorConfig("session.block-on-unmanaged-jobs")
660+
operatorConfig("session.deletion.block-on-unmanaged-jobs")
661661
.booleanType()
662662
.defaultValue(true)
663+
.withDeprecatedKeys(operatorConfigKey("session.block-on-unmanaged-jobs"))
663664
.withDescription(
664665
"Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.");
665666

667+
@Documentation.Section(SECTION_DYNAMIC)
668+
public static final ConfigOption<Boolean> BLOCK_ON_SESSION_JOBS =
669+
operatorConfig("session.deletion.block-on-session-jobs")
670+
.booleanType()
671+
.defaultValue(true)
672+
.withDescription(
673+
"Block FlinkDeployment deletion if managed jobs are running in the session cluster.");
674+
666675
@Documentation.Section(SECTION_ADVANCED)
667676
public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
668677
operatorConfig("cluster.resource-view.refresh-interval")

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,14 @@ Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
160160

161161
@Override
162162
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
163-
Set<FlinkSessionJob> sessionJobs =
164-
ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
163+
var sessionJobs = ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
165164
var deployment = ctx.getResource();
166-
if (!sessionJobs.isEmpty()) {
165+
166+
boolean blockOnSessionJobs =
167+
ctx.getObserveConfig()
168+
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS);
169+
170+
if (blockOnSessionJobs && !sessionJobs.isEmpty()) {
167171
var error =
168172
String.format(
169173
"The session jobs %s should be deleted first",
@@ -188,7 +192,7 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx)
188192
boolean blockOnUnmanagedJobs =
189193
ctx.getObserveConfig()
190194
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
191-
if (blockOnUnmanagedJobs) {
195+
if (blockOnSessionJobs && blockOnUnmanagedJobs) {
192196
Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
193197
if (!nonTerminalJobs.isEmpty()) {
194198
var error =

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.fabric8.kubernetes.api.model.ObjectMeta;
3939
import io.fabric8.kubernetes.client.KubernetesClient;
4040
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
41+
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
4142
import lombok.Getter;
4243
import org.junit.jupiter.api.Assertions;
4344
import org.junit.jupiter.api.Test;
@@ -305,4 +306,81 @@ public void testGetNonTerminalJobs() throws Exception {
305306
nonTerminalJobsAfterRemoval.size(),
306307
"Should have no non-terminal jobs when only terminated jobs exist");
307308
}
309+
310+
@Test
311+
public void testDeleteSessionWithBlockOnSessionJobsFalse() throws Exception {
312+
FlinkDeployment deployment = TestUtils.buildSessionCluster();
313+
deployment
314+
.getSpec()
315+
.getFlinkConfiguration()
316+
.put(KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS.key(), "false");
317+
318+
reconciler.reconcile(deployment, flinkService.getContext());
319+
320+
assertEquals(
321+
ReconciliationState.DEPLOYED,
322+
deployment.getStatus().getReconciliationStatus().getState());
323+
324+
// Create some running jobs
325+
JobID managedJobId1 = new JobID();
326+
JobID managedJobId2 = new JobID();
327+
JobID unmanagedRunningJobId = new JobID();
328+
329+
flinkService
330+
.listJobs()
331+
.add(
332+
Tuple3.of(
333+
null,
334+
new JobStatusMessage(
335+
managedJobId1,
336+
"managed-job-1",
337+
JobStatus.RUNNING,
338+
System.currentTimeMillis()),
339+
new Configuration()));
340+
flinkService
341+
.listJobs()
342+
.add(
343+
Tuple3.of(
344+
null,
345+
new JobStatusMessage(
346+
managedJobId2,
347+
"managed-job-2",
348+
JobStatus.RUNNING,
349+
System.currentTimeMillis()),
350+
new Configuration()));
351+
flinkService
352+
.listJobs()
353+
.add(
354+
Tuple3.of(
355+
null,
356+
new JobStatusMessage(
357+
unmanagedRunningJobId,
358+
"unmanaged-running-job",
359+
JobStatus.RUNNING,
360+
System.currentTimeMillis()),
361+
new Configuration()));
362+
363+
// Create FlinkSessionJob resources for the managed jobs
364+
FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
365+
managedSessionJob1.getMetadata().setName("managed-session-job-1");
366+
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
367+
kubernetesClient.resource(managedSessionJob1).createOrReplace();
368+
369+
FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
370+
managedSessionJob2.getMetadata().setName("managed-session-job-2");
371+
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
372+
kubernetesClient.resource(managedSessionJob2).createOrReplace();
373+
374+
// Test cleanup with BLOCK_ON_SESSION_JOBS=false
375+
var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
376+
var resourceContext = getResourceContext(deployment, context);
377+
378+
var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
379+
DeleteControl deleteControl = sessionReconciler.cleanupInternal(resourceContext);
380+
381+
// Verify that deletion proceeds immediately despite running jobs
382+
assertTrue(
383+
deleteControl.isRemoveFinalizer(),
384+
"Session should be deleted immediately when BLOCK_ON_SESSION_JOBS is false");
385+
}
308386
}

0 commit comments

Comments
 (0)