diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index a5a63f8b4f..4078e65379 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -45,8 +45,10 @@ import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; +import org.apache.flink.runtime.rest.messages.ConfigurationInfoEntry; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders; import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders; import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders; @@ -66,9 +68,11 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; /** @@ -240,7 +244,7 @@ public boolean scale(FlinkResourceContext ctx, Configuration deployConfig) th } } - private static boolean supportsInPlaceScaling( + private boolean supportsInPlaceScaling( AbstractFlinkResource resource, Configuration observeConfig) { if (resource.getSpec().getJob() == null || !observeConfig.get( @@ -253,11 +257,43 @@ private static boolean supportsInPlaceScaling( return false; } - if (!observeConfig - .get(JobManagerOptions.SCHEDULER) - .equals(JobManagerOptions.SchedulerType.Adaptive)) { - LOG.debug("In-place rescaling is only available with the adaptive scheduler"); - return false; + if (!observeConfig.get(SCHEDULER).equals(JobManagerOptions.SchedulerType.Adaptive)) { + // fallback to the JM running configuration to check if the scheduler is adaptive + boolean adaptiveSupported = false; + try (var client = getClusterClient(observeConfig)) { + var jobParameters = new JobMessageParameters(); + jobParameters.jobPathParameter.resolve( + JobID.fromHexString(resource.getStatus().getJobStatus().getJobId())); + + var jmConfig = + client.sendRequest( + JobManagerJobConfigurationHeaders.getInstance(), + jobParameters, + EmptyRequestBody.getInstance()) + .get( + operatorConfig.getFlinkClientTimeout().toSeconds(), + TimeUnit.SECONDS); + + Optional scheduler = + jmConfig.stream() + .filter(e -> SCHEDULER.key().equals(e.getKey())) + .map(ConfigurationInfoEntry::getValue) + .findFirst(); + + String value = scheduler.orElse(""); + if (value.equals(JobManagerOptions.SchedulerType.Adaptive.toString())) { + adaptiveSupported = true; + } + } catch (Exception e) { + LOG.debug( + "Failed to query JM configuration for scheduler type, " + + "assuming in-place scaling is not supported", + e); + } + if (!adaptiveSupported) { + LOG.debug("In-place rescaling is only available with the adaptive scheduler"); + return false; + } } var status = resource.getStatus(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index 666f24da81..ba79947787 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -43,7 +43,11 @@ import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; +import org.apache.flink.runtime.rest.messages.ConfigurationInfo; +import org.apache.flink.runtime.rest.messages.ConfigurationInfoEntry; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders; import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders; import org.apache.flink.util.concurrent.Executors; @@ -254,6 +258,23 @@ public void testScaling() throws Exception { var current = new AtomicReference>(); var updated = new AtomicReference>(); + var testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + + // Default JM config fallback: return Default scheduler (no adaptive) + var jmConfigResponse = new ConfigurationInfo(); + jmConfigResponse.add( + new ConfigurationInfoEntry( + JobManagerOptions.SCHEDULER.key(), + JobManagerOptions.SchedulerType.Default.toString())); + testingClusterClient.setRequestProcessor( + (h, p, r) -> { + if (h instanceof JobManagerJobConfigurationHeaders) { + return CompletableFuture.completedFuture(jmConfigResponse); + } + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + }); + var service = new NativeFlinkService( client, null, executorService, operatorConfig, eventRecorder) { @@ -271,6 +292,11 @@ protected void updateVertexResources( Map newReqs) { updated.set(newReqs); } + + @Override + public RestClusterClient getClusterClient(Configuration config) { + return testingClusterClient; + } }; var flinkDep = TestUtils.buildApplicationCluster(); @@ -479,6 +505,131 @@ protected void updateVertexResources( testScaleConditionDep(flinkDep, service, d -> {}, false); } + @Test + public void testScalingWithJmConfigFallback() throws Exception { + var v1 = new JobVertexID(); + + var current = new AtomicReference>(); + current.set( + Map.of( + v1, + new JobVertexResourceRequirements( + new JobVertexResourceRequirements.Parallelism(1, 1)))); + var updated = new AtomicReference>(); + var testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + + var service = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder) { + @Override + protected Map getVertexResources( + RestClusterClient client, + AbstractFlinkResource resource) { + return current.get(); + } + + @Override + protected void updateVertexResources( + RestClusterClient client, + AbstractFlinkResource resource, + Map newReqs) { + updated.set(newReqs); + } + + @Override + public RestClusterClient getClusterClient(Configuration config) { + return testingClusterClient; + } + }; + + // Build deployment WITHOUT adaptive scheduler in K8s spec (uses Default) + var flinkDep = TestUtils.buildApplicationCluster(); + var spec = flinkDep.getSpec(); + spec.setFlinkVersion(FlinkVersion.v1_18); + + var appConfig = spec.getFlinkConfiguration().asConfiguration(); + appConfig.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Default); + appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "4")); + spec.setFlinkConfiguration(appConfig); + + var reconStatus = flinkDep.getStatus().getReconciliationStatus(); + reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep); + flinkDep.getStatus().getJobStatus().setState(JobStatus.RUNNING); + flinkDep.getStatus().getJobStatus().setJobId(new JobID().toHexString()); + + // --- Case 1: JM REST returns Adaptive scheduler → in-place scaling should succeed --- + var adaptiveConfig = new ConfigurationInfo(); + adaptiveConfig.add( + new ConfigurationInfoEntry( + JobManagerOptions.SCHEDULER.key(), + JobManagerOptions.SchedulerType.Adaptive.toString())); + testingClusterClient.setRequestProcessor( + (h, p, r) -> { + if (h instanceof JobManagerJobConfigurationHeaders) { + return CompletableFuture.completedFuture(adaptiveConfig); + } + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + }); + + testScaleConditionDep(flinkDep, service, d -> {}, true); + + // --- Case 2: JM REST returns Default scheduler → in-place scaling should fail --- + var defaultConfig = new ConfigurationInfo(); + defaultConfig.add( + new ConfigurationInfoEntry( + JobManagerOptions.SCHEDULER.key(), + JobManagerOptions.SchedulerType.Default.toString())); + testingClusterClient.setRequestProcessor( + (h, p, r) -> { + if (h instanceof JobManagerJobConfigurationHeaders) { + return CompletableFuture.completedFuture(defaultConfig); + } + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + }); + + testScaleConditionDep(flinkDep, service, d -> {}, false); + + // --- Case 3: JM REST throws exception → in-place scaling should fail --- + testingClusterClient.setRequestProcessor( + (h, p, r) -> { + if (h instanceof JobManagerJobConfigurationHeaders) { + return CompletableFuture.failedFuture( + new RuntimeException("Connection refused")); + } + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + }); + + testScaleConditionDep(flinkDep, service, d -> {}, false); + + // --- Case 4: JM REST returns Adaptive but job is in terminal state → should NOT scale --- + testingClusterClient.setRequestProcessor( + (h, p, r) -> { + if (h instanceof JobManagerJobConfigurationHeaders) { + return CompletableFuture.completedFuture(adaptiveConfig); + } + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + }); + + testScaleConditionDep( + flinkDep, + service, + d -> d.getStatus().getJobStatus().setState(JobStatus.FAILED), + false); + + // --- Case 5: JM REST returns empty config (no scheduler key) → should NOT scale --- + var emptyConfig = new ConfigurationInfo(); + testingClusterClient.setRequestProcessor( + (h, p, r) -> { + if (h instanceof JobManagerJobConfigurationHeaders) { + return CompletableFuture.completedFuture(emptyConfig); + } + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + }); + + testScaleConditionDep(flinkDep, service, d -> {}, false); + } + private void testScaleConditionDep( FlinkDeployment dep, NativeFlinkService service,