Skip to content

Commit 90edc9c

Browse files
authored
[FLINK-38859] Do not apply minimum job vertex parallelism for whole job in native mode
1 parent eb99c5a commit 90edc9c

2 files changed

Lines changed: 37 additions & 3 deletions

File tree

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,11 @@ private int getParallelism() {
363363
* effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
364364
}
365365

366-
Optional<Integer> maxOverrideParallelism = getMaxParallelismFromOverrideConfig();
367-
if (maxOverrideParallelism.isPresent() && maxOverrideParallelism.get() > 0) {
368-
return maxOverrideParallelism.get();
366+
if (KubernetesDeploymentMode.STANDALONE.equals(spec.getMode())) {
367+
Optional<Integer> maxOverrideParallelism = getMaxParallelismFromOverrideConfig();
368+
if (maxOverrideParallelism.isPresent() && maxOverrideParallelism.get() > 0) {
369+
return maxOverrideParallelism.get();
370+
}
369371
}
370372

371373
return spec.getJob().getParallelism();

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,6 +874,38 @@ public void testApplyStandaloneSessionSpec() throws URISyntaxException, IOExcept
874874
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
875875
}
876876

877+
@Test
878+
public void testParallelismOverridesOnlyAppliedForStandaloneMode()
879+
throws URISyntaxException, IOException {
880+
FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment);
881+
dep.getSpec().setTaskManager(new TaskManagerSpec());
882+
dep.getSpec().getJob().setParallelism(5);
883+
dep.getSpec()
884+
.getFlinkConfiguration()
885+
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "vertex1:10,vertex2:20");
886+
887+
// Test STANDALONE mode - parallelism overrides should be used
888+
dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
889+
Configuration configuration =
890+
new FlinkConfigBuilder(dep, new Configuration())
891+
.applyFlinkConfiguration()
892+
.applyTaskManagerSpec()
893+
.applyJobOrSessionSpec()
894+
.build();
895+
assertEquals(20, configuration.get(CoreOptions.DEFAULT_PARALLELISM));
896+
897+
// Test NATIVE mode - parallelism overrides should NOT be used, fall back to job
898+
// parallelism
899+
dep.getSpec().setMode(KubernetesDeploymentMode.NATIVE);
900+
configuration =
901+
new FlinkConfigBuilder(dep, new Configuration())
902+
.applyFlinkConfiguration()
903+
.applyTaskManagerSpec()
904+
.applyJobOrSessionSpec()
905+
.build();
906+
assertEquals(5, configuration.get(CoreOptions.DEFAULT_PARALLELISM));
907+
}
908+
877909
@Test
878910
public void testBuildFrom() throws Exception {
879911
final Configuration configuration =

0 commit comments

Comments
 (0)