Skip to content

Commit e9eec8f

Browse files
huyuanfenghuyuanfeng2018
authored andcommitted
[FLINK-37126] Add Validator for Autoscaler
1 parent d9fa479 commit e9eec8f

5 files changed

Lines changed: 256 additions & 62 deletions

File tree

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.flink.autoscaler.JobAutoScaler;
2222
import org.apache.flink.autoscaler.JobAutoScalerContext;
2323
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
24+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
25+
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
2426
import org.apache.flink.configuration.Configuration;
2527
import org.apache.flink.configuration.UnmodifiableConfiguration;
2628
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -40,6 +42,7 @@
4042
import java.util.HashSet;
4143
import java.util.List;
4244
import java.util.Map;
45+
import java.util.Optional;
4346
import java.util.Set;
4447
import java.util.concurrent.CompletableFuture;
4548
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +53,7 @@
5053
import java.util.function.Function;
5154
import java.util.stream.Collectors;
5255

56+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5357
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
5458
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;
5559

@@ -68,6 +72,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
6872
private final ScheduledExecutorService scheduledExecutorService;
6973
private final ExecutorService scalingThreadPool;
7074
private final UnmodifiableConfiguration baseConf;
75+
private final AutoscalerValidator autoscalerValidator;
7176

7277
/**
7378
* Maintain a set of job keys that during scaling, it should be accessed at {@link
@@ -103,6 +108,7 @@ public StandaloneAutoscalerExecutor(
103108
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
104109
this.scalingJobKeys = new HashSet<>();
105110
this.baseConf = new UnmodifiableConfiguration(conf);
111+
this.autoscalerValidator = new DefaultAutoscalerValidator();
106112
}
107113

108114
public void start() {
@@ -188,7 +194,19 @@ private void cleanupStoppedJob(Collection<Context> jobList) {
188194
protected void scalingSingleJob(Context jobContext) {
189195
try {
190196
MDC.put("job.key", jobContext.getJobKey().toString());
191-
autoScaler.scale(jobContext);
197+
Optional<String> validationError =
198+
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
199+
if (validationError.isPresent()) {
200+
eventHandler.handleEvent(
201+
jobContext,
202+
AutoScalerEventHandler.Type.Warning,
203+
"AutoScaler Options Validation",
204+
validationError.get(),
205+
null,
206+
baseConf.get(SCALING_EVENT_INTERVAL));
207+
} else {
208+
autoScaler.scale(jobContext);
209+
}
192210
} catch (Throwable e) {
193211
LOG.error("Error while scaling job", e);
194212
eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.standalone;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.autoscaler.JobAutoScaler;
23+
import org.apache.flink.autoscaler.JobAutoScalerContext;
24+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
25+
import org.apache.flink.autoscaler.event.TestingEventCollector;
26+
import org.apache.flink.configuration.Configuration;
27+
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
class StandaloneAutoscalerValidatorTest {
39+
private List<JobAutoScalerContext<JobID>> jobList;
40+
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
41+
private ConcurrentHashMap<JobID, Integer> scaleCounter;
42+
private Configuration correctConfiguration;
43+
private Configuration invalidConfiguration;
44+
45+
@BeforeEach
46+
void setUp() {
47+
jobList = new ArrayList<>();
48+
eventCollector = new TestingEventCollector<>();
49+
scaleCounter = new ConcurrentHashMap<>();
50+
51+
correctConfiguration = new Configuration();
52+
correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
53+
54+
invalidConfiguration = new Configuration();
55+
invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
56+
invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, -1.0);
57+
}
58+
59+
@Test
60+
void testAutoScalerWithInvalidConfig() throws Exception {
61+
JobAutoScalerContext<JobID> validJob = createJobAutoScalerContext(correctConfiguration);
62+
JobAutoScalerContext<JobID> invalidJob = createJobAutoScalerContext(invalidConfiguration);
63+
64+
jobList.add(validJob);
65+
jobList.add(invalidJob);
66+
67+
final var jobAutoScaler =
68+
new JobAutoScaler<JobID, JobAutoScalerContext<JobID>>() {
69+
@Override
70+
public void scale(JobAutoScalerContext<JobID> context) {
71+
scaleCounter.merge(context.getJobKey(), 1, Integer::sum);
72+
}
73+
74+
@Override
75+
public void cleanup(JobAutoScalerContext<JobID> context) {
76+
// No cleanup required for the test
77+
}
78+
};
79+
80+
try (var autoscalerExecutor =
81+
new StandaloneAutoscalerExecutor<>(
82+
new Configuration(), baseConf -> jobList, eventCollector, jobAutoScaler)) {
83+
84+
List<CompletableFuture<Void>> scaledFutures = autoscalerExecutor.scaling();
85+
86+
// Verification triggers two scaling tasks
87+
assertThat(scaledFutures).hasSize(2);
88+
89+
// Only legally configured tasks are scaled
90+
assertThat(scaleCounter).hasSize(1).containsKey(validJob.getJobKey());
91+
92+
// Verification Event Collector captures an event
93+
assertThat(eventCollector.events).hasSize(1);
94+
assertThat(eventCollector.events)
95+
.allMatch(event -> event.getContext().equals(invalidJob));
96+
}
97+
}
98+
99+
private JobAutoScalerContext<JobID> createJobAutoScalerContext(Configuration configuration) {
100+
JobID jobID = new JobID();
101+
return new JobAutoScalerContext<>(
102+
jobID, jobID, JobStatus.RUNNING, configuration, null, null);
103+
}
104+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.validation;
20+
21+
import org.apache.flink.configuration.Configuration;
22+
23+
import java.util.Optional;
24+
25+
/** Validator for Autoscaler. */
26+
public interface AutoscalerValidator {
27+
28+
/**
29+
* Validate autoscaler config and return optional error.
30+
*
31+
* @param flinkConf autoscaler config
32+
* @return Optional error string, should be present iff validation resulted in an error
33+
*/
34+
Optional<String> validateAutoscalerOptions(Configuration flinkConf);
35+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.validation;
20+
21+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
22+
import org.apache.flink.autoscaler.utils.CalendarUtils;
23+
import org.apache.flink.configuration.ConfigOption;
24+
import org.apache.flink.configuration.Configuration;
25+
26+
import java.util.Optional;
27+
28+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
29+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
30+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
31+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
32+
33+
/** Default implementation of {@link AutoscalerValidator}. */
34+
public class DefaultAutoscalerValidator implements AutoscalerValidator {
35+
36+
public Optional<String> validateAutoscalerOptions(Configuration flinkConf) {
37+
38+
if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
39+
return Optional.empty();
40+
}
41+
return firstPresent(
42+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
43+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
44+
validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
45+
validateNumber(flinkConf, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
46+
validateNumber(flinkConf, UTILIZATION_MAX, flinkConf.get(UTILIZATION_TARGET), 1.0d),
47+
validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, flinkConf.get(UTILIZATION_TARGET)),
48+
validateNumber(flinkConf, OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
49+
CalendarUtils.validateExcludedPeriods(flinkConf));
50+
}
51+
52+
@SafeVarargs
53+
private static Optional<String> firstPresent(Optional<String>... errOpts) {
54+
for (Optional<String> opt : errOpts) {
55+
if (opt.isPresent()) {
56+
return opt;
57+
}
58+
}
59+
return Optional.empty();
60+
}
61+
62+
private static <T extends Number> Optional<String> validateNumber(
63+
Configuration flinkConfiguration,
64+
ConfigOption<T> autoScalerConfig,
65+
Double min,
66+
Double max) {
67+
try {
68+
var configValue = flinkConfiguration.get(autoScalerConfig);
69+
if (configValue != null) {
70+
double value = configValue.doubleValue();
71+
if ((min != null && value < min) || (max != null && value > max)) {
72+
return Optional.of(
73+
String.format(
74+
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
75+
autoScalerConfig.key(),
76+
min != null ? min.toString() : "-Infinity",
77+
max != null ? max.toString() : "+Infinity"));
78+
}
79+
}
80+
return Optional.empty();
81+
} catch (IllegalArgumentException e) {
82+
return Optional.of(
83+
String.format(
84+
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
85+
}
86+
}
87+
88+
private static <T extends Number> Optional<String> validateNumber(
89+
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
90+
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
91+
}
92+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.flink.kubernetes.operator.validation;
1919

20-
import org.apache.flink.autoscaler.config.AutoScalerOptions;
21-
import org.apache.flink.autoscaler.utils.CalendarUtils;
20+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
21+
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
2222
import org.apache.flink.configuration.CheckpointingOptions;
23-
import org.apache.flink.configuration.ConfigOption;
2423
import org.apache.flink.configuration.Configuration;
2524
import org.apache.flink.configuration.HighAvailabilityOptions;
2625
import org.apache.flink.configuration.JobManagerOptions;
@@ -65,11 +64,6 @@
6564
import java.util.regex.Matcher;
6665
import java.util.regex.Pattern;
6766

68-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
69-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
70-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
71-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
72-
7367
/** Default validator implementation for {@link FlinkDeployment}. */
7468
public class DefaultValidator implements FlinkResourceValidator {
7569

@@ -88,9 +82,11 @@ public class DefaultValidator implements FlinkResourceValidator {
8882
Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
8983

9084
private final FlinkConfigManager configManager;
85+
private final AutoscalerValidator autoscalerValidator;
9186

9287
public DefaultValidator(FlinkConfigManager configManager) {
9388
this.configManager = configManager;
89+
this.autoscalerValidator = new DefaultAutoscalerValidator();
9490
}
9591

9692
@Override
@@ -598,63 +594,12 @@ private Optional<String> validateServiceAccount(String serviceAccount) {
598594
return Optional.empty();
599595
}
600596

601-
public static Optional<String> validateAutoScalerFlinkConfiguration(
597+
public Optional<String> validateAutoScalerFlinkConfiguration(
602598
Map<String, String> effectiveConfig) {
603599
if (effectiveConfig == null) {
604600
return Optional.empty();
605601
}
606602
Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig);
607-
if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
608-
return Optional.empty();
609-
}
610-
return firstPresent(
611-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
612-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
613-
validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d),
614-
validateNumber(
615-
flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
616-
validateNumber(
617-
flinkConfiguration,
618-
UTILIZATION_MAX,
619-
flinkConfiguration.get(UTILIZATION_TARGET),
620-
1.0d),
621-
validateNumber(
622-
flinkConfiguration,
623-
UTILIZATION_MIN,
624-
0.0d,
625-
flinkConfiguration.get(UTILIZATION_TARGET)),
626-
validateNumber(flinkConfiguration, OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
627-
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
628-
}
629-
630-
private static <T extends Number> Optional<String> validateNumber(
631-
Configuration flinkConfiguration,
632-
ConfigOption<T> autoScalerConfig,
633-
Double min,
634-
Double max) {
635-
try {
636-
var configValue = flinkConfiguration.get(autoScalerConfig);
637-
if (configValue != null) {
638-
double value = configValue.doubleValue();
639-
if ((min != null && value < min) || (max != null && value > max)) {
640-
return Optional.of(
641-
String.format(
642-
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
643-
autoScalerConfig.key(),
644-
min != null ? min.toString() : "-Infinity",
645-
max != null ? max.toString() : "+Infinity"));
646-
}
647-
}
648-
return Optional.empty();
649-
} catch (IllegalArgumentException e) {
650-
return Optional.of(
651-
String.format(
652-
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
653-
}
654-
}
655-
656-
private static <T extends Number> Optional<String> validateNumber(
657-
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
658-
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
603+
return autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
659604
}
660605
}

0 commit comments

Comments
 (0)