Skip to content

Commit e480eb7

Browse files
authored
[FLINK-38858] Reuse jobid after failed submissions to guard against job duplication
1 parent 90edc9c commit e480eb7

8 files changed

Lines changed: 59 additions & 10 deletions

File tree

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@
212212
<td>Boolean</td>
213213
<td>Indicate whether a savepoint must be taken when deleting a FlinkDeployment or FlinkSessionJob.</td>
214214
</tr>
215+
<tr>
216+
<td><h5>kubernetes.operator.job.submission.timeout</h5></td>
217+
<td style="word-wrap: break-word;">10 min</td>
218+
<td>Duration</td>
219+
<td>The timeout for session job submissions.</td>
220+
</tr>
215221
<tr>
216222
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
217223
<td style="word-wrap: break-word;">false</td>

docs/layouts/shortcodes/generated/system_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@
5050
<td>Duration</td>
5151
<td>The timeout for the observer to wait the flink rest client to return.</td>
5252
</tr>
53+
<tr>
54+
<td><h5>kubernetes.operator.job.submission.timeout</h5></td>
55+
<td style="word-wrap: break-word;">10 min</td>
56+
<td>Duration</td>
57+
<td>The timeout for session job submissions.</td>
58+
</tr>
5359
<tr>
5460
<td><h5>kubernetes.operator.leader-election.enabled</h5></td>
5561
<td style="word-wrap: break-word;">false</td>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class FlinkOperatorConfiguration {
8080
int reportedExceptionEventsMaxCount;
8181
int reportedExceptionEventsMaxStackTraceLength;
8282
boolean manageIngress;
83+
Duration jobSubmissionTimeout;
8384

8485
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
8586
Duration reconcileInterval =
@@ -207,6 +208,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
207208
boolean manageIngress =
208209
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS);
209210

211+
Duration jobSubmissionTimeout =
212+
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SUBMISSION_TIMEOUT);
213+
210214
return new FlinkOperatorConfiguration(
211215
reconcileInterval,
212216
reconcilerMaxParallelism,
@@ -239,7 +243,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
239243
slowRequestThreshold,
240244
reportedExceptionEventsMaxCount,
241245
reportedExceptionEventsMaxStackTraceLength,
242-
manageIngress);
246+
manageIngress,
247+
jobSubmissionTimeout);
243248
}
244249

245250
private static GenericRetry getRetryConfig(Configuration conf) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,13 @@ public static String operatorConfigKey(String key) {
134134
.withDescription(
135135
"The timeout for the resource clean up to wait for flink to shutdown cluster.");
136136

137+
@Documentation.Section(SECTION_SYSTEM)
138+
public static final ConfigOption<Duration> OPERATOR_JOB_SUBMISSION_TIMEOUT =
139+
operatorConfig("job.submission.timeout")
140+
.durationType()
141+
.defaultValue(Duration.ofMinutes(10))
142+
.withDescription("The timeout for session job submissions.");
143+
137144
@Documentation.Section(SECTION_DYNAMIC)
138145
public static final ConfigOption<Boolean> DEPLOYMENT_ROLLBACK_ENABLED =
139146
operatorConfig("deployment.rollback.enabled")

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
1919

2020
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.JobStatus;
2122
import org.apache.flink.autoscaler.JobAutoScaler;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -81,10 +82,22 @@ public void deploy(
8182
MSG_SUBMIT,
8283
ctx.getKubernetesClient());
8384

84-
// Generate job id and record in status for durability
85-
var jobId = JobID.generate();
86-
ctx.getResource().getStatus().getJobStatus().setJobId(jobId.toHexString());
87-
statusRecorder.patchAndCacheStatus(ctx.getResource(), ctx.getKubernetesClient());
85+
var jobStatus = ctx.getResource().getStatus().getJobStatus();
86+
87+
String existingJobIdStr = jobStatus.getJobId();
88+
JobID jobId;
89+
var jobState = jobStatus.getState();
90+
var reuseJobId = jobState == JobStatus.RECONCILING;
91+
if (existingJobIdStr != null && reuseJobId) {
92+
jobId = JobID.fromHexString(existingJobIdStr);
93+
LOG.info("Reusing existing job ID {} for deployment retry", jobId);
94+
} else {
95+
jobId = JobID.generate();
96+
LOG.info("Generated new job ID {} for deployment", jobId);
97+
jobStatus.setJobId(jobId.toHexString());
98+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING);
99+
statusRecorder.patchAndCacheStatus(ctx.getResource(), ctx.getKubernetesClient());
100+
}
88101

89102
ctx.getFlinkService()
90103
.submitJobToSessionCluster(
@@ -93,9 +106,6 @@ public void deploy(
93106
jobId,
94107
deployConfig,
95108
savepoint.orElse(null));
96-
97-
var status = ctx.getResource().getStatus();
98-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
99109
}
100110

101111
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -908,7 +908,7 @@ protected void runJar(
908908
LOG.info("Submitting job: {} to session cluster.", jobID);
909909
clusterClient
910910
.sendRequest(headers, parameters, runRequestBody)
911-
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
911+
.get(operatorConfig.getJobSubmissionTimeout().toSeconds(), TimeUnit.SECONDS);
912912
} catch (Exception e) {
913913
LOG.error("Failed to submit job to session cluster.", e);
914914
throw new FlinkRuntimeException(e);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
5252
import org.apache.flink.kubernetes.operator.service.SuspendMode;
5353
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
54+
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
5455
import org.apache.flink.runtime.client.JobStatusMessage;
5556
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
5657
import org.apache.flink.runtime.execution.ExecutionState;
@@ -292,6 +293,10 @@ public JobID submitJobToSessionCluster(
292293
if (deployFailure) {
293294
throw new Exception("Deployment failure");
294295
}
296+
if (jobs.stream().anyMatch(job -> job.f1.getJobId().equals(jobID))) {
297+
throw DuplicateJobSubmissionException.of(jobID);
298+
}
299+
295300
JobStatusMessage jobStatusMessage =
296301
new JobStatusMessage(
297302
jobID,

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void before() {
8888
}
8989

9090
@Test
91-
public void testSubmitJobButException() {
91+
public void testSubmitJobButException() throws Exception {
9292
flinkService.setDeployFailure(true);
9393

9494
try {
@@ -97,6 +97,10 @@ public void testSubmitJobButException() {
9797
// Ignore
9898
}
9999

100+
assertEquals(sessionJob.getStatus().getJobStatus().getState(), RECONCILING);
101+
String jobId = sessionJob.getStatus().getJobStatus().getJobId();
102+
assertNotNull(jobId);
103+
100104
Assertions.assertEquals(2, testController.events().size());
101105
// Discard submit event
102106
testController.events().remove();
@@ -105,6 +109,12 @@ public void testSubmitJobButException() {
105109
Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType());
106110
Assertions.assertEquals("Error", event.getReason());
107111

112+
flinkService.setDeployFailure(false);
113+
testController.reconcile(sessionJob, context);
114+
115+
// Make sure we reused the original failed job id
116+
assertEquals(jobId, flinkService.listJobs().get(0).f1.getJobId().toHexString());
117+
108118
testController.cleanup(sessionJob, context);
109119
}
110120

0 commit comments

Comments
 (0)