Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@
<td>Boolean</td>
<td>Indicate whether a savepoint must be taken when deleting a FlinkDeployment or FlinkSessionJob.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.submission.timeout</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>The timeout for session job submissions.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/system_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>Duration</td>
<td>The timeout for the observer to wait the flink rest client to return.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.submission.timeout</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>The timeout for session job submissions.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.leader-election.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class FlinkOperatorConfiguration {
int reportedExceptionEventsMaxCount;
int reportedExceptionEventsMaxStackTraceLength;
boolean manageIngress;
Duration jobSubmissionTimeout;

public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
Expand Down Expand Up @@ -207,6 +208,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
boolean manageIngress =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS);

Duration jobSubmissionTimeout =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SUBMISSION_TIMEOUT);

return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
Expand Down Expand Up @@ -239,7 +243,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
slowRequestThreshold,
reportedExceptionEventsMaxCount,
reportedExceptionEventsMaxStackTraceLength,
manageIngress);
manageIngress,
jobSubmissionTimeout);
}

private static GenericRetry getRetryConfig(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ public static String operatorConfigKey(String key) {
.withDescription(
"The timeout for the resource clean up to wait for flink to shutdown cluster.");

@Documentation.Section(SECTION_SYSTEM)
public static final ConfigOption<Duration> OPERATOR_JOB_SUBMISSION_TIMEOUT =
operatorConfig("job.submission.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(10))
.withDescription("The timeout for session job submissions.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> DEPLOYMENT_ROLLBACK_ENABLED =
operatorConfig("deployment.rollback.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
Expand Down Expand Up @@ -81,10 +82,22 @@ public void deploy(
MSG_SUBMIT,
ctx.getKubernetesClient());

// Generate job id and record in status for durability
var jobId = JobID.generate();
ctx.getResource().getStatus().getJobStatus().setJobId(jobId.toHexString());
statusRecorder.patchAndCacheStatus(ctx.getResource(), ctx.getKubernetesClient());
var jobStatus = ctx.getResource().getStatus().getJobStatus();

String existingJobIdStr = jobStatus.getJobId();
JobID jobId;
var jobState = jobStatus.getState();
var reuseJobId = jobState == JobStatus.RECONCILING;
if (existingJobIdStr != null && reuseJobId) {
jobId = JobID.fromHexString(existingJobIdStr);
LOG.info("Reusing existing job ID {} for deployment retry", jobId);
} else {
jobId = JobID.generate();
LOG.info("Generated new job ID {} for deployment", jobId);
jobStatus.setJobId(jobId.toHexString());
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING);
statusRecorder.patchAndCacheStatus(ctx.getResource(), ctx.getKubernetesClient());
}

ctx.getFlinkService()
.submitJobToSessionCluster(
Expand All @@ -93,9 +106,6 @@ public void deploy(
jobId,
deployConfig,
savepoint.orElse(null));

var status = ctx.getResource().getStatus();
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ protected void runJar(
LOG.info("Submitting job: {} to session cluster.", jobID);
clusterClient
.sendRequest(headers, parameters, runRequestBody)
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
.get(operatorConfig.getJobSubmissionTimeout().toSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Failed to submit job to session cluster.", e);
throw new FlinkRuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.service.SuspendMode;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.ExecutionState;
Expand Down Expand Up @@ -292,6 +293,10 @@ public JobID submitJobToSessionCluster(
if (deployFailure) {
throw new Exception("Deployment failure");
}
if (jobs.stream().anyMatch(job -> job.f1.getJobId().equals(jobID))) {
throw DuplicateJobSubmissionException.of(jobID);
}

JobStatusMessage jobStatusMessage =
new JobStatusMessage(
jobID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void before() {
}

@Test
public void testSubmitJobButException() {
public void testSubmitJobButException() throws Exception {
flinkService.setDeployFailure(true);

try {
Expand All @@ -97,6 +97,10 @@ public void testSubmitJobButException() {
// Ignore
}

assertEquals(sessionJob.getStatus().getJobStatus().getState(), RECONCILING);
String jobId = sessionJob.getStatus().getJobStatus().getJobId();
assertNotNull(jobId);

Assertions.assertEquals(2, testController.events().size());
// Discard submit event
testController.events().remove();
Expand All @@ -105,6 +109,12 @@ public void testSubmitJobButException() {
Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType());
Assertions.assertEquals("Error", event.getReason());

flinkService.setDeployFailure(false);
testController.reconcile(sessionJob, context);

// Make sure we reused the original failed job id
assertEquals(jobId, flinkService.listJobs().get(0).f1.getJobId().toHexString());

testController.cleanup(sessionJob, context);
}

Expand Down