Skip to content

Commit d7bc703

Browse files
committed
[FLINK-39511][flink-autoscaler] FLIP-514: Custom Evaluator plugin for Flink Autoscaler
1 parent 068a1f3 commit d7bc703

18 files changed

Lines changed: 389 additions & 53 deletions

File tree

docs/content.zh/docs/operations/plugins.md

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,4 +189,96 @@ The following steps demonstrate how to develop and use a custom mutator.
189189
2023-12-12 06:26:56,667 o.a.f.k.o.u.MutatorUtils [INFO ] Discovered mutator from plugin directory[/opt/flink/plugins]: org.apache.flink.mutator.CustomFlinkMutator.
190190
```
191191
192-
## Flink Autoscaler Custom Evaluator
192+
## Custom Flink Autoscaler Evaluator
193+
194+
`FlinkAutoscalerEvaluator` is a pluggable component that allows users to provide custom scaling-metric evaluation logic on top of the metrics evaluated internally by the autoscaler. Custom evaluators are discovered through the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism when running inside the Kubernetes operator, and through the standard Java `ServiceLoader` mechanism when running with `flink-autoscaler-standalone`. In both cases the implementation class must be registered in `META-INF/services`.
195+
196+
For each evaluation cycle, the autoscaler invokes the custom evaluator selected via the `job.autoscaler.metrics.custom-evaluator.name` configuration option once per job vertex. The metrics returned by the custom evaluator are merged on top of the internally evaluated metrics, allowing users to override or augment specific `ScalingMetric` values (e.g. `TARGET_DATA_RATE`, `TRUE_PROCESSING_RATE`, `CATCH_UP_DATA_RATE`).
197+
198+
The following steps demonstrate how to develop and use a custom evaluator.
199+
200+
1. Implement the `FlinkAutoscalerEvaluator` interface:
201+
```java
202+
package org.apache.flink.autoscaler.custom;
203+
204+
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
205+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
206+
import org.apache.flink.autoscaler.metrics.ScalingMetric;
207+
import org.apache.flink.runtime.jobgraph.JobVertexID;
208+
209+
import java.util.HashMap;
210+
import java.util.Map;
211+
212+
/** Custom evaluator implementation of {@link FlinkAutoscalerEvaluator}. */
213+
public class CustomEvaluator implements FlinkAutoscalerEvaluator {
214+
215+
@Override
216+
public String getName() {
217+
return "custom-evaluator";
218+
}
219+
220+
@Override
221+
public Map<ScalingMetric, EvaluatedScalingMetric> evaluateVertexMetrics(
222+
JobVertexID vertex,
223+
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
224+
Context context) {
225+
Map<ScalingMetric, EvaluatedScalingMetric> overrides = new HashMap<>();
226+
// Example: override the target data rate for source vertices based
227+
// on a value read from the evaluator-specific configuration.
228+
double target = context.getCustomEvaluatorConf().getDouble("target-data-rate", 0.0);
229+
if (target > 0 && context.getTopology().isSource(vertex)) {
230+
overrides.put(
231+
ScalingMetric.TARGET_DATA_RATE,
232+
EvaluatedScalingMetric.avg(target));
233+
}
234+
return overrides;
235+
}
236+
}
237+
```
238+
239+
The `Context` object exposes an un-modifiable view of the job configuration, the metrics history, previously evaluated vertex metrics (evaluation happens topologically), the job topology, backlog status, max restart time, and the evaluator-specific configuration.
240+
241+
2. Create the service definition file `org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator` in `META-INF/services` with the fully-qualified class name of your implementation:
242+
```text
243+
org.apache.flink.autoscaler.custom.CustomEvaluator
244+
```
245+
246+
3. Use the Maven tool to package the project and generate the custom evaluator JAR.
247+
248+
4. Select the custom evaluator via configuration. The evaluator whose `getName()` matches the configured name will be invoked; any `job.autoscaler.metrics.custom-evaluator.<name>.*` entries are surfaced to the evaluator via `Context#getCustomEvaluatorConf()` (with the `job.autoscaler.metrics.custom-evaluator.<name>.` prefix stripped):
249+
```yaml
250+
job.autoscaler.metrics.custom-evaluator.name: custom-evaluator
251+
job.autoscaler.metrics.custom-evaluator.custom-evaluator.target-data-rate: 100000.0
252+
```
253+
{{< hint warning >}}
254+
**Only one custom evaluator per pipeline is supported**. The `job.autoscaler.metrics.custom-evaluator.name` is a single-valued option and the autoscaler resolves and invokes exactly one evaluator per evaluation cycle. Registering multiple implementations via `META-INF/services` is fine as they form a registry that different jobs can select from by name, but a single job cannot chain or compose more than one evaluator.
255+
{{< /hint >}}
256+
257+
5. Deploy the evaluator.
258+
259+
- **Operator deployment** – create a Dockerfile to build a custom image from the `apache/flink-kubernetes-operator` official image and copy the generated JAR to a custom evaluator plugin directory under `/opt/flink/plugins` (the value of the `FLINK_PLUGINS_DIR` environment variable in the flink-kubernetes-operator helm chart). The structure of the custom evaluator directory under `/opt/flink/plugins` is as follows:
260+
```text
261+
/opt/flink/plugins
262+
├── custom-evaluator
263+
│ ├── custom-evaluator.jar
264+
└── ...
265+
```
266+
267+
With the custom evaluator directory location, the Dockerfile is defined as follows:
268+
```shell script
269+
FROM apache/flink-kubernetes-operator
270+
ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
271+
ENV CUSTOM_EVALUATOR_DIR=custom-evaluator
272+
RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_EVALUATOR_DIR
273+
COPY custom-evaluator.jar $FLINK_PLUGINS_DIR/$CUSTOM_EVALUATOR_DIR/
274+
```
275+
276+
Install the flink-kubernetes-operator helm chart with the custom image and verify the `deploy/flink-kubernetes-operator` log has:
277+
```text
278+
o.a.f.k.o.a.AutoscalerUtils [INFO ] Discovered custom evaluator from plugin directory[/opt/flink/plugins]: org.apache.flink.autoscaler.custom.CustomEvaluator.
279+
```
280+
281+
- **Standalone autoscaler** – simply place the custom evaluator JAR on the classpath of the `flink-autoscaler-standalone` process. It will be picked up automatically via Java's `ServiceLoader` and discovery will be logged:
282+
```text
283+
o.a.f.a.s.AutoscalerUtils [INFO ] Discovered custom evaluator via ServiceLoader: org.apache.flink.autoscaler.custom.CustomEvaluator (name=custom-evaluator).
284+
```

docs/content/docs/operations/plugins.md

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,97 @@ The following steps demonstrate how to develop and use a custom mutator.
188188
```text
189189
2023-12-12 06:26:56,667 o.a.f.k.o.u.MutatorUtils [INFO ] Discovered mutator from plugin directory[/opt/flink/plugins]: org.apache.flink.mutator.CustomFlinkMutator.
190190
```
191+
192+
## Custom Flink Autoscaler Evaluator
193+
194+
`FlinkAutoscalerEvaluator` is a pluggable component that allows users to provide custom scaling-metric evaluation logic on top of the metrics evaluated internally by the autoscaler. Custom evaluators are discovered through the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism when running inside the Kubernetes operator, and through the standard Java `ServiceLoader` mechanism when running with `flink-autoscaler-standalone`. In both cases the implementation class must be registered in `META-INF/services`.
195+
196+
For each evaluation cycle, the autoscaler invokes the custom evaluator selected via the `job.autoscaler.metrics.custom-evaluator.name` configuration option once per job vertex. The metrics returned by the custom evaluator are merged on top of the internally evaluated metrics, allowing users to override or augment specific `ScalingMetric` values (e.g. `TARGET_DATA_RATE`, `TRUE_PROCESSING_RATE`, `CATCH_UP_DATA_RATE`).
197+
198+
The following steps demonstrate how to develop and use a custom evaluator.
199+
200+
1. Implement the `FlinkAutoscalerEvaluator` interface:
201+
```java
202+
package org.apache.flink.autoscaler.custom;
203+
204+
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
205+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
206+
import org.apache.flink.autoscaler.metrics.ScalingMetric;
207+
import org.apache.flink.runtime.jobgraph.JobVertexID;
208+
209+
import java.util.HashMap;
210+
import java.util.Map;
211+
212+
/** Custom evaluator implementation of {@link FlinkAutoscalerEvaluator}. */
213+
public class CustomEvaluator implements FlinkAutoscalerEvaluator {
214+
215+
@Override
216+
public String getName() {
217+
return "custom-evaluator";
218+
}
219+
220+
@Override
221+
public Map<ScalingMetric, EvaluatedScalingMetric> evaluateVertexMetrics(
222+
JobVertexID vertex,
223+
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
224+
Context context) {
225+
Map<ScalingMetric, EvaluatedScalingMetric> overrides = new HashMap<>();
226+
// Example: override the target data rate for source vertices based
227+
// on a value read from the evaluator-specific configuration.
228+
double target = context.getCustomEvaluatorConf().getDouble("target-data-rate", 0.0);
229+
if (target > 0 && context.getTopology().isSource(vertex)) {
230+
overrides.put(
231+
ScalingMetric.TARGET_DATA_RATE,
232+
EvaluatedScalingMetric.avg(target));
233+
}
234+
return overrides;
235+
}
236+
}
237+
```
238+
239+
The `Context` object exposes an un-modifiable view of the job configuration, the metrics history, previously evaluated vertex metrics (evaluation happens topologically), the job topology, backlog status, max restart time, and the evaluator-specific configuration.
240+
241+
2. Create the service definition file `org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator` in `META-INF/services` with the fully-qualified class name of your implementation:
242+
```text
243+
org.apache.flink.autoscaler.custom.CustomEvaluator
244+
```
245+
246+
3. Use the Maven tool to package the project and generate the custom evaluator JAR.
247+
248+
4. Select the custom evaluator via configuration. The evaluator whose `getName()` matches the configured name will be invoked; any `job.autoscaler.metrics.custom-evaluator.<name>.*` entries are surfaced to the evaluator via `Context#getCustomEvaluatorConf()` (with the `job.autoscaler.metrics.custom-evaluator.<name>.` prefix stripped):
249+
```yaml
250+
job.autoscaler.metrics.custom-evaluator.name: custom-evaluator
251+
job.autoscaler.metrics.custom-evaluator.custom-evaluator.target-data-rate: 100000.0
252+
```
253+
{{< hint warning >}}
254+
**Only one custom evaluator per pipeline is supported**. The `job.autoscaler.metrics.custom-evaluator.name` is a single-valued option and the autoscaler resolves and invokes exactly one evaluator per evaluation cycle. Registering multiple implementations via `META-INF/services` is fine as they form a registry that different jobs can select from by name, but a single job cannot chain or compose more than one evaluator.
255+
{{< /hint >}}
256+
257+
5. Deploy the evaluator.
258+
259+
- **Operator deployment** - create a Dockerfile to build a custom image from the `apache/flink-kubernetes-operator` official image and copy the generated JAR to a custom evaluator plugin directory under `/opt/flink/plugins` (the value of the `FLINK_PLUGINS_DIR` environment variable in the flink-kubernetes-operator helm chart). The structure of the custom evaluator directory under `/opt/flink/plugins` is as follows:
260+
```text
261+
/opt/flink/plugins
262+
├── custom-evaluator
263+
│ ├── custom-evaluator.jar
264+
└── ...
265+
```
266+
267+
With the custom evaluator directory location, the Dockerfile is defined as follows:
268+
```shell script
269+
FROM apache/flink-kubernetes-operator
270+
ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
271+
ENV CUSTOM_EVALUATOR_DIR=custom-evaluator
272+
RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_EVALUATOR_DIR
273+
COPY custom-evaluator.jar $FLINK_PLUGINS_DIR/$CUSTOM_EVALUATOR_DIR/
274+
```
275+
276+
Install the flink-kubernetes-operator helm chart with the custom image and verify the `deploy/flink-kubernetes-operator` log has:
277+
```text
278+
o.a.f.k.o.a.AutoscalerUtils [INFO ] Discovered custom evaluator from plugin directory[/opt/flink/plugins]: org.apache.flink.autoscaler.custom.CustomEvaluator.
279+
```
280+
281+
- **Standalone autoscaler** - simply place the custom evaluator JAR on the classpath of the `flink-autoscaler-standalone` process. It will be picked up automatically via Java's `ServiceLoader` and discovery will be logged:
282+
```text
283+
o.a.f.a.s.AutoscalerUtils [INFO ] Discovered custom evaluator via ServiceLoader: org.apache.flink.autoscaler.custom.CustomEvaluator (name=custom-evaluator).
284+
```

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@
9292
<td><p>Enum</p></td>
9393
<td>Metric aggregator to use for busyTime metrics. This affects how true processing/output rate will be computed. Using max allows us to handle jobs with data skew more robustly, while avg may provide better stability when we know that the load distribution is even.<br /><br />Possible values:<ul><li>"AVG"</li><li>"MAX"</li><li>"MIN"</li></ul></td>
9494
</tr>
95+
<tr>
96+
<td><h5>job.autoscaler.metrics.custom-evaluator.name</h5></td>
97+
<td style="word-wrap: break-word;">(none)</td>
98+
<td>String</td>
99+
<td>Name of the custom evaluator to be used.</td>
100+
</tr>
95101
<tr>
96102
<td><h5>job.autoscaler.metrics.window</h5></td>
97103
<td style="word-wrap: break-word;">15 min</td>
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.standalone;
19+
20+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.ServiceLoader;
28+
29+
/** Autoscaler related utility methods for the standalone autoscaler. */
30+
public class AutoscalerUtils {
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(AutoscalerUtils.class);
33+
34+
/**
35+
* Discovers custom evaluators for the standalone autoscaler via Java's {@link ServiceLoader}
36+
* mechanism. Implementations must be registered under {@code
37+
* META-INF/services/org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator} on the
38+
* classpath.
39+
*
40+
* @return A map of discovered custom evaluators keyed by the name returned by {@link
41+
* FlinkAutoscalerEvaluator#getName()}. If two implementations report the same name, the
42+
* later one overwrites the earlier one and a warning is logged.
43+
*/
44+
public static Map<String, FlinkAutoscalerEvaluator> discoverCustomEvaluators() {
45+
Map<String, FlinkAutoscalerEvaluator> customEvaluators = new HashMap<>();
46+
ServiceLoader.load(FlinkAutoscalerEvaluator.class)
47+
.forEach(
48+
customEvaluator -> {
49+
String customEvaluatorName = customEvaluator.getName();
50+
LOG.info(
51+
"Discovered custom evaluator via ServiceLoader: {} (name={}).",
52+
customEvaluator.getClass().getName(),
53+
customEvaluatorName);
54+
if (customEvaluators.containsKey(customEvaluatorName)) {
55+
LOG.warn(
56+
"Duplicate custom evaluator name [{}] detected. Overwriting existing [{}] with [{}].",
57+
customEvaluatorName,
58+
customEvaluators
59+
.get(customEvaluatorName)
60+
.getClass()
61+
.getName(),
62+
customEvaluator.getClass().getName());
63+
}
64+
customEvaluators.put(customEvaluatorName, customEvaluator);
65+
});
66+
return customEvaluators;
67+
}
68+
}

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.autoscaler.ScalingExecutor;
2828
import org.apache.flink.autoscaler.ScalingMetricEvaluator;
2929
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
30+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
3031
import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
3132
import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
3233
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
@@ -38,6 +39,8 @@
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
4041

42+
import java.util.Map;
43+
4144
import static org.apache.flink.autoscaler.config.AutoScalerOptions.FLINK_CLIENT_TIMEOUT;
4245
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
4346
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
@@ -88,14 +91,16 @@ JobListFetcher<KEY, Context> createJobListFetcher(Configuration conf) {
8891
JobAutoScaler<KEY, Context> createJobAutoscaler(
8992
AutoScalerEventHandler<KEY, Context> eventHandler,
9093
AutoScalerStateStore<KEY, Context> stateStore) {
94+
Map<String, FlinkAutoscalerEvaluator> customEvaluators =
95+
AutoscalerUtils.discoverCustomEvaluators();
9196
return new JobAutoScalerImpl<>(
9297
new RestApiMetricsCollector<>(),
9398
new ScalingMetricEvaluator(),
9499
new ScalingExecutor<>(eventHandler, stateStore),
95100
eventHandler,
96101
new RescaleApiScalingRealizer<>(eventHandler),
97102
stateStore,
98-
null);
103+
customEvaluators);
99104
}
100105

101106
@VisibleForTesting

0 commit comments

Comments
 (0)