Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.rest.messages.ConfigurationInfoEntry;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
Expand All @@ -66,9 +68,11 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;

/**
Expand Down Expand Up @@ -240,7 +244,7 @@ public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) th
}
}

private static boolean supportsInPlaceScaling(
private boolean supportsInPlaceScaling(
AbstractFlinkResource<?, ?> resource, Configuration observeConfig) {
if (resource.getSpec().getJob() == null
|| !observeConfig.get(
Expand All @@ -253,11 +257,43 @@ private static boolean supportsInPlaceScaling(
return false;
}

if (!observeConfig
.get(JobManagerOptions.SCHEDULER)
.equals(JobManagerOptions.SchedulerType.Adaptive)) {
LOG.debug("In-place rescaling is only available with the adaptive scheduler");
return false;
if (!observeConfig.get(SCHEDULER).equals(JobManagerOptions.SchedulerType.Adaptive)) {
// fallback to the JM running configuration to check if the scheduler is adaptive
boolean adaptiveSupported = false;
try (var client = getClusterClient(observeConfig)) {
var jobParameters = new JobMessageParameters();
jobParameters.jobPathParameter.resolve(
JobID.fromHexString(resource.getStatus().getJobStatus().getJobId()));

var jmConfig =
client.sendRequest(
JobManagerJobConfigurationHeaders.getInstance(),
jobParameters,
EmptyRequestBody.getInstance())
.get(
operatorConfig.getFlinkClientTimeout().toSeconds(),
TimeUnit.SECONDS);

Optional<String> scheduler =
jmConfig.stream()
.filter(e -> SCHEDULER.key().equals(e.getKey()))
.map(ConfigurationInfoEntry::getValue)
.findFirst();

String value = scheduler.orElse("");
if (value.equals(JobManagerOptions.SchedulerType.Adaptive.toString())) {
adaptiveSupported = true;
}
} catch (Exception e) {
LOG.debug(
"Failed to query JM configuration for scheduler type, "
+ "assuming in-place scaling is not supported",
e);
}
if (!adaptiveSupported) {
LOG.debug("In-place rescaling is only available with the adaptive scheduler");
return false;
}
}

var status = resource.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
import org.apache.flink.runtime.rest.messages.ConfigurationInfoEntry;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.util.concurrent.Executors;
Expand Down Expand Up @@ -254,6 +258,23 @@ public void testScaling() throws Exception {

var current = new AtomicReference<Map<JobVertexID, JobVertexResourceRequirements>>();
var updated = new AtomicReference<Map<JobVertexID, JobVertexResourceRequirements>>();
var testingClusterClient =
new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);

// Default JM config fallback: return Default scheduler (no adaptive)
var jmConfigResponse = new ConfigurationInfo();
jmConfigResponse.add(
new ConfigurationInfoEntry(
JobManagerOptions.SCHEDULER.key(),
JobManagerOptions.SchedulerType.Default.toString()));
testingClusterClient.setRequestProcessor(
(h, p, r) -> {
if (h instanceof JobManagerJobConfigurationHeaders) {
return CompletableFuture.completedFuture(jmConfigResponse);
}
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
});

var service =
new NativeFlinkService(
client, null, executorService, operatorConfig, eventRecorder) {
Expand All @@ -271,6 +292,11 @@ protected void updateVertexResources(
Map<JobVertexID, JobVertexResourceRequirements> newReqs) {
updated.set(newReqs);
}

@Override
public RestClusterClient<String> getClusterClient(Configuration config) {
return testingClusterClient;
}
};

var flinkDep = TestUtils.buildApplicationCluster();
Expand Down Expand Up @@ -479,6 +505,131 @@ protected void updateVertexResources(
testScaleConditionDep(flinkDep, service, d -> {}, false);
}

@Test
public void testScalingWithJmConfigFallback() throws Exception {
var v1 = new JobVertexID();

var current = new AtomicReference<Map<JobVertexID, JobVertexResourceRequirements>>();
current.set(
Map.of(
v1,
new JobVertexResourceRequirements(
new JobVertexResourceRequirements.Parallelism(1, 1))));
var updated = new AtomicReference<Map<JobVertexID, JobVertexResourceRequirements>>();
var testingClusterClient =
new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);

var service =
new NativeFlinkService(
client, null, executorService, operatorConfig, eventRecorder) {
@Override
protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
RestClusterClient<String> client,
AbstractFlinkResource<?, ?> resource) {
return current.get();
}

@Override
protected void updateVertexResources(
RestClusterClient<String> client,
AbstractFlinkResource<?, ?> resource,
Map<JobVertexID, JobVertexResourceRequirements> newReqs) {
updated.set(newReqs);
}

@Override
public RestClusterClient<String> getClusterClient(Configuration config) {
return testingClusterClient;
}
};

// Build deployment WITHOUT adaptive scheduler in K8s spec (uses Default)
var flinkDep = TestUtils.buildApplicationCluster();
var spec = flinkDep.getSpec();
spec.setFlinkVersion(FlinkVersion.v1_18);

var appConfig = spec.getFlinkConfiguration().asConfiguration();
appConfig.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Default);
appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "4"));
spec.setFlinkConfiguration(appConfig);

var reconStatus = flinkDep.getStatus().getReconciliationStatus();
reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
flinkDep.getStatus().getJobStatus().setState(JobStatus.RUNNING);
flinkDep.getStatus().getJobStatus().setJobId(new JobID().toHexString());

// --- Case 1: JM REST returns Adaptive scheduler → in-place scaling should succeed ---
var adaptiveConfig = new ConfigurationInfo();
adaptiveConfig.add(
new ConfigurationInfoEntry(
JobManagerOptions.SCHEDULER.key(),
JobManagerOptions.SchedulerType.Adaptive.toString()));
testingClusterClient.setRequestProcessor(
(h, p, r) -> {
if (h instanceof JobManagerJobConfigurationHeaders) {
return CompletableFuture.completedFuture(adaptiveConfig);
}
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
});

testScaleConditionDep(flinkDep, service, d -> {}, true);

// --- Case 2: JM REST returns Default scheduler → in-place scaling should fail ---
var defaultConfig = new ConfigurationInfo();
defaultConfig.add(
new ConfigurationInfoEntry(
JobManagerOptions.SCHEDULER.key(),
JobManagerOptions.SchedulerType.Default.toString()));
testingClusterClient.setRequestProcessor(
(h, p, r) -> {
if (h instanceof JobManagerJobConfigurationHeaders) {
return CompletableFuture.completedFuture(defaultConfig);
}
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
});

testScaleConditionDep(flinkDep, service, d -> {}, false);

// --- Case 3: JM REST throws exception → in-place scaling should fail ---
testingClusterClient.setRequestProcessor(
(h, p, r) -> {
if (h instanceof JobManagerJobConfigurationHeaders) {
return CompletableFuture.failedFuture(
new RuntimeException("Connection refused"));
}
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
});

testScaleConditionDep(flinkDep, service, d -> {}, false);

// --- Case 4: JM REST returns Adaptive but job is in terminal state → should NOT scale ---
testingClusterClient.setRequestProcessor(
(h, p, r) -> {
if (h instanceof JobManagerJobConfigurationHeaders) {
return CompletableFuture.completedFuture(adaptiveConfig);
}
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
});

testScaleConditionDep(
flinkDep,
service,
d -> d.getStatus().getJobStatus().setState(JobStatus.FAILED),
false);

// --- Case 5: JM REST returns empty config (no scheduler key) → should NOT scale ---
var emptyConfig = new ConfigurationInfo();
testingClusterClient.setRequestProcessor(
(h, p, r) -> {
if (h instanceof JobManagerJobConfigurationHeaders) {
return CompletableFuture.completedFuture(emptyConfig);
}
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
});

testScaleConditionDep(flinkDep, service, d -> {}, false);
}

private void testScaleConditionDep(
FlinkDeployment dep,
NativeFlinkService service,
Expand Down