Skip to content

[FLINK-39511] FLIP-514: Custom Evaluator plugin for Flink Autoscaler#1099

Open
Dennis-Mircea wants to merge 4 commits intoapache:mainfrom
Dennis-Mircea:FLINK-39511
Open

[FLINK-39511] FLIP-514: Custom Evaluator plugin for Flink Autoscaler#1099
Dennis-Mircea wants to merge 4 commits intoapache:mainfrom
Dennis-Mircea:FLINK-39511

Conversation

@Dennis-Mircea
Copy link
Copy Markdown
Contributor

@Dennis-Mircea Dennis-Mircea commented Apr 22, 2026

What is the purpose of the change

This PR implements the FLIP-514 and is a continuation of the work done as part of #953 PR.

It introduces a pluggable FlinkAutoscalerEvaluator SPI that lets users provide custom scaling-metric evaluation logic on top of the metrics evaluated internally by the autoscaler, both in the Flink Kubernetes Operator (via the Flink Plugins mechanism) and in flink-autoscaler-standalone (via the standard Java ServiceLoader). The evaluator returned map is merged on top of the internally-evaluated metrics for each job vertex, so users can override or augment specific ScalingMetric values (e.g. TARGET_DATA_RATE, TRUE_PROCESSING_RATE, CATCH_UP_DATA_RATE) without forking the autoscaler.

NOTE: This PR ensures the full alignment with the [FLINK-39555] FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler PR (#1085).

Brief change log

  • Introduced the FlinkAutoscalerEvaluator SPI (flink-autoscaler) with getName(), evaluateVertexMetrics(vertex, evaluatedMetrics, Context) and a Context exposing an un-modifiable view of jobConf, metricsHistory, previously evaluated vertex metrics, topology, processingBacklog, restartTime and the evaluator-specific customEvaluatorConf.
  • Added the job.autoscaler.metrics.custom-evaluator.name config option and AutoScalerOptions#forCustomEvaluator(conf, name) helper, which builds a DelegatingConfiguration scoped to job.autoscaler.metrics.custom-evaluator.<name>. for the evaluator.
  • Wired custom-evaluator resolution into JobAutoScalerImpl#getCustomEvaluatorIfRequired and passed the resolved Tuple2<FlinkAutoscalerEvaluator, Configuration> down to ScalingMetricEvaluator, which now calls the evaluator per vertex in topological order and merges the returned metrics on top of the internally evaluated ones.
  • Added operator-side discovery in org.apache.flink.kubernetes.operator.autoscaler.AutoscalerUtils#discoverCustomEvaluators(Configuration) using the Flink PluginManager (/opt/flink/plugins). The discovered evaluators are injected into JobAutoScalerImpl from FlinkOperator.
  • Added standalone-side discovery in org.apache.flink.autoscaler.standalone.AutoscalerUtils#discoverCustomEvaluators() using ServiceLoader.load(FlinkAutoscalerEvaluator.class) and wired it into StandaloneAutoscalerEntrypoint#createJobAutoscaler.
  • Documented the new plugin in both language variants of docs/.../operations/plugins.md under a new Custom Flink Autoscaler Evaluator section, including a warning hint that a single custom evaluator is supported per pipeline today.

Verifying this change

This change added tests and can be verified as follows:

  • Evaluator resolution and override semantics (flink-autoscaler):
    • JobAutoScalerImplTest#testGetCustomEvaluatorIfRequired covers the registry lookup (configured name present / absent / unknown) and the per-evaluator DelegatingConfiguration built by AutoScalerOptions.forCustomEvaluator.
    • MetricsCollectionAndEvaluationTest exercises end-to-end evaluation with a TestCustomEvaluator override on TARGET_DATA_RATE for source vertices and asserts that the custom value wins over the internally computed one while the rest of the metrics remain untouched.
  • Operator-side plugin discovery (flink-kubernetes-operator):
    • TestingFlinkDeploymentController / FlinkOperator wiring verified by existing controller tests loading the TestCustomEvaluator via META-INF/services.
  • Standalone-side SPI discovery (flink-autoscaler-standalone):
    • New AutoscalerUtilsTest#testDiscoverCustomEvaluators registers TestCustomEvaluator through META-INF/services/org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator and asserts that AutoscalerUtils.discoverCustomEvaluators() finds it keyed by getName().

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs & JavaDocs

@Dennis-Mircea
Copy link
Copy Markdown
Contributor Author

Question & Follow-up: single-evaluator limitation vs. config namespace shape

As it stands, JobAutoScalerImpl#getCustomEvaluatorIfRequired reads a single ConfigOption<String> CUSTOM_EVALUATOR_NAME (job.autoscaler.metrics.custom-evaluator.name), looks up one FlinkAutoscalerEvaluator from the registry (keyed by FlinkAutoscalerEvaluator#getName()), and ScalingMetricEvaluator only knows how to invoke one evaluator per evaluation cycle. So at runtime only one custom evaluator can be active per pipeline, even though several may be registered via META-INF/services.

At the same time, AutoScalerOptions.forCustomEvaluator(conf, name) builds the evaluator-specific view as a DelegatingConfiguration over job.autoscaler.metrics.custom-evaluator.<name>., which means users have to write keys like:

job.autoscaler.metrics.custom-evaluator.name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator.target-data-rate: 100000.0

That repeated <name> segment is future-proofing for a multi-evaluator world, but today it's pure ceremony: there is never more than one evaluator reading from that namespace. It's confusing to document, easy to mis-type, and likely to trip users up.

I see two coherent directions and would like the maintainers' preference before pushing further commits:

Option A - keep the single-evaluator contract, simplify the namespace.

Drop the <name> segment from forCustomEvaluator so Context#getCustomEvaluatorConf() is a delegating view over job.autoscaler.metrics.custom-evaluator. directly. To avoid the name key leaking into that view the selector would move out of that prefix (e.g. renamed to job.autoscaler.metrics.custom-evaluator-name, with a fallback key for the old one). User-facing config becomes:

job.autoscaler.metrics.custom-evaluator-name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.target-data-rate: 100000.0

The Option A has the following pro & cons:

  • Pros: cleanest UX, matches the actual runtime semantics.
  • Cons: represents a big limitation in the custom evaluator flexibility.

Option B - lift the limitation, align the config shape with Flink metric reporters and with FLIP-514.

Flink's metric-reporter config is the established idiom for "list of named instances, each with its own class and its own bag of options":

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

Applied to custom evaluators, the mapping is:

Reporter concept Evaluator equivalent
metrics.reporters: r1,r2 job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2
metrics.reporter.<r>.factory.class: FQN job.autoscaler.metrics.custom-evaluator.<instance>.class: FQN
metrics.reporter.<r>.<opt>: v job.autoscaler.metrics.custom-evaluator.<instance>.<opt>: v

Concretely:

job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2

job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.class: org.apache.flink.autoscaler.custom.MyFirstEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.target-data-rate: 100000.0

job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.class: org.apache.flink.autoscaler.custom.MySecondEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.some-threshold: 0.8

The consequence is that the getName() goes away from the SPI. The instance name (custom-evaluator-1) is purely a configuration-level handle. Evaluator resolution stops going through the META-INF/services -> getName() registry and instead becomes: for each entry in custom-evaluators, read its ...<instance>.class property and instantiate the fully qualified name (via Class#forName, or via a factory SPI if we want DI/customization, analogous to *ReporterFactory). This is exactly the shape FLIP-514 proposes in its Job Level Config For Custom Evaluator section, and it removes the oddity where two evaluators couldn't share the same getName() even though they were distinct implementations.

To keep the ordering contract between multiple custom evaluators explicit and self-describing rather than implicit in the declaration order of job.autoscaler.metrics.custom-evaluators, we can adopt the same design already approved in FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler and implemented for ScalingExecutorPlugin.java‎: a default int priority() method on the SPI, with evaluators executed in ascending priority and the returned metrics merged left-to-right (lower priority first, higher priority wins on conflicts). This gives us:

  • a single, well-defined ordering contract that doesn't depend on the user's config-key ordering or on ServiceLoader/PluginManager discovery order,
  • parity and composability with ScalingExecutorPlugin, so both autoscaler extension points share the same mental model
  • a natural extension path for Option B above as priority() simply becomes the tie-breaker once we allow more than one evaluator per pipeline.

The Option B has the following pro & cons:

  • Pros: consistent with an idiom Flink users already know, aligns with FLIP-514, expressive (multiple evaluators per job), and drops the getName() coupling that is awkward today.
  • Cons: bigger change than A.

My preference would be to land Option B so we match FLIP-514 directly and don't ship a config surface we know we'll have to migrate away from.

Thoughts?

@Dennis-Mircea Dennis-Mircea changed the title [FLINK-39511][flink-autoscaler] FLIP-514: Custom Evaluator plugin for Flink Autoscaler [FLINK-39511] FLIP-514: Custom Evaluator plugin for Flink Autoscaler Apr 22, 2026
@gyfora
Copy link
Copy Markdown
Contributor

gyfora commented Apr 27, 2026

Question & Follow-up: single-evaluator limitation vs. config namespace shape

As it stands, JobAutoScalerImpl#getCustomEvaluatorIfRequired reads a single ConfigOption<String> CUSTOM_EVALUATOR_NAME (job.autoscaler.metrics.custom-evaluator.name), looks up one FlinkAutoscalerEvaluator from the registry (keyed by FlinkAutoscalerEvaluator#getName()), and ScalingMetricEvaluator only knows how to invoke one evaluator per evaluation cycle. So at runtime only one custom evaluator can be active per pipeline, even though several may be registered via META-INF/services.

At the same time, AutoScalerOptions.forCustomEvaluator(conf, name) builds the evaluator-specific view as a DelegatingConfiguration over job.autoscaler.metrics.custom-evaluator.<name>., which means users have to write keys like:

job.autoscaler.metrics.custom-evaluator.name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator.target-data-rate: 100000.0

That repeated <name> segment is future-proofing for a multi-evaluator world, but today it's pure ceremony: there is never more than one evaluator reading from that namespace. It's confusing to document, easy to mis-type, and likely to trip users up.

I see two coherent directions and would like the maintainers' preference before pushing further commits:

Option A - keep the single-evaluator contract, simplify the namespace.

Drop the <name> segment from forCustomEvaluator so Context#getCustomEvaluatorConf() is a delegating view over job.autoscaler.metrics.custom-evaluator. directly. To avoid the name key leaking into that view the selector would move out of that prefix (e.g. renamed to job.autoscaler.metrics.custom-evaluator-name, with a fallback key for the old one). User-facing config becomes:

job.autoscaler.metrics.custom-evaluator-name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.target-data-rate: 100000.0

The Option A has the following pro & cons:

  • Pros: cleanest UX, matches the actual runtime semantics.
  • Cons: represents a big limitation in the custom evaluator flexibility.

Option B - lift the limitation, align the config shape with Flink metric reporters and with FLIP-514.

Flink's metric-reporter config is the established idiom for "list of named instances, each with its own class and its own bag of options":

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

Applied to custom evaluators, the mapping is:

Reporter concept Evaluator equivalent
metrics.reporters: r1,r2 job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2
metrics.reporter.<r>.factory.class: FQN job.autoscaler.metrics.custom-evaluator.<instance>.class: FQN
metrics.reporter.<r>.<opt>: v job.autoscaler.metrics.custom-evaluator.<instance>.<opt>: v
Concretely:

job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2

job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.class: org.apache.flink.autoscaler.custom.MyFirstEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.target-data-rate: 100000.0

job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.class: org.apache.flink.autoscaler.custom.MySecondEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.some-threshold: 0.8

The consequence is that the getName() goes away from the SPI. The instance name (custom-evaluator-1) is purely a configuration-level handle. Evaluator resolution stops going through the META-INF/services -> getName() registry and instead becomes: for each entry in custom-evaluators, read its ...<instance>.class property and instantiate the fully qualified name (via Class#forName, or via a factory SPI if we want DI/customization, analogous to *ReporterFactory). This is exactly the shape FLIP-514 proposes in its Job Level Config For Custom Evaluator section, and it removes the oddity where two evaluators couldn't share the same getName() even though they were distinct implementations.

To keep the ordering contract between multiple custom evaluators explicit and self-describing rather than implicit in the declaration order of job.autoscaler.metrics.custom-evaluators, we can adopt the same design already approved in FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler and implemented for ScalingExecutorPlugin.java‎: a default int priority() method on the SPI, with evaluators executed in ascending priority and the returned metrics merged left-to-right (lower priority first, higher priority wins on conflicts). This gives us:

  • a single, well-defined ordering contract that doesn't depend on the user's config-key ordering or on ServiceLoader/PluginManager discovery order,
  • parity and composability with ScalingExecutorPlugin, so both autoscaler extension points share the same mental model
  • a natural extension path for Option B above as priority() simply becomes the tie-breaker once we allow more than one evaluator per pipeline.

The Option B has the following pro & cons:

  • Pros: consistent with an idiom Flink users already know, aligns with FLIP-514, expressive (multiple evaluators per job), and drops the getName() coupling that is awkward today.
  • Cons: bigger change than A.

My preference would be to land Option B so we match FLIP-514 directly and don't ship a config surface we know we'll have to migrate away from.

Thoughts?

@Dennis-Mircea let's go for option 2 but to limit the change/complexity we can also start with supporting only a single evaluator plugin and add the priority / ordering logic as a follow up jira

pchoudhury22 and others added 3 commits April 27, 2026 15:40
Squashed from PR apache#953 (apache#953).

Original commits by Pradeepta Choudhury <pchoudhury22@apple.com>:
- Draft PR for plugin based approach for custom evaluator for scaling metric evaluation
- Add sample custom evaluator for simple trend adjustor and fallback key support with delegating configuration
- Add getName method to CustomEvaluator and javadocs
- Refactor tests for the plugin and pass null for custom evaluators in standalone autoscaler entrypoint
- Rename evaluator plugin to match naming convention for plugins

Co-authored-by: Pradeepta Choudhury <pchoudhury22@apple.com>
@Dennis-Mircea
Copy link
Copy Markdown
Contributor Author

Question & Follow-up: single-evaluator limitation vs. config namespace shape
As it stands, JobAutoScalerImpl#getCustomEvaluatorIfRequired reads a single ConfigOption<String> CUSTOM_EVALUATOR_NAME (job.autoscaler.metrics.custom-evaluator.name), looks up one FlinkAutoscalerEvaluator from the registry (keyed by FlinkAutoscalerEvaluator#getName()), and ScalingMetricEvaluator only knows how to invoke one evaluator per evaluation cycle. So at runtime only one custom evaluator can be active per pipeline, even though several may be registered via META-INF/services.
At the same time, AutoScalerOptions.forCustomEvaluator(conf, name) builds the evaluator-specific view as a DelegatingConfiguration over job.autoscaler.metrics.custom-evaluator.<name>., which means users have to write keys like:

job.autoscaler.metrics.custom-evaluator.name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator.target-data-rate: 100000.0

That repeated <name> segment is future-proofing for a multi-evaluator world, but today it's pure ceremony: there is never more than one evaluator reading from that namespace. It's confusing to document, easy to mis-type, and likely to trip users up.
I see two coherent directions and would like the maintainers' preference before pushing further commits:
Option A - keep the single-evaluator contract, simplify the namespace.
Drop the <name> segment from forCustomEvaluator so Context#getCustomEvaluatorConf() is a delegating view over job.autoscaler.metrics.custom-evaluator. directly. To avoid the name key leaking into that view the selector would move out of that prefix (e.g. renamed to job.autoscaler.metrics.custom-evaluator-name, with a fallback key for the old one). User-facing config becomes:

job.autoscaler.metrics.custom-evaluator-name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.target-data-rate: 100000.0

The Option A has the following pro & cons:

  • Pros: cleanest UX, matches the actual runtime semantics.
  • Cons: represents a big limitation in the custom evaluator flexibility.

Option B - lift the limitation, align the config shape with Flink metric reporters and with FLIP-514.
Flink's metric-reporter config is the established idiom for "list of named instances, each with its own class and its own bag of options":

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

Applied to custom evaluators, the mapping is:
Reporter concept Evaluator equivalent
metrics.reporters: r1,r2 job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2
metrics.reporter.<r>.factory.class: FQN job.autoscaler.metrics.custom-evaluator.<instance>.class: FQN
metrics.reporter.<r>.<opt>: v job.autoscaler.metrics.custom-evaluator.<instance>.<opt>: v
Concretely:

job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2

job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.class: org.apache.flink.autoscaler.custom.MyFirstEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.target-data-rate: 100000.0

job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.class: org.apache.flink.autoscaler.custom.MySecondEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.some-threshold: 0.8

The consequence is that the getName() goes away from the SPI. The instance name (custom-evaluator-1) is purely a configuration-level handle. Evaluator resolution stops going through the META-INF/services -> getName() registry and instead becomes: for each entry in custom-evaluators, read its ...<instance>.class property and instantiate the fully qualified name (via Class#forName, or via a factory SPI if we want DI/customization, analogous to *ReporterFactory). This is exactly the shape FLIP-514 proposes in its Job Level Config For Custom Evaluator section, and it removes the oddity where two evaluators couldn't share the same getName() even though they were distinct implementations.
To keep the ordering contract between multiple custom evaluators explicit and self-describing rather than implicit in the declaration order of job.autoscaler.metrics.custom-evaluators, we can adopt the same design already approved in FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler and implemented for ScalingExecutorPlugin.java‎: a default int priority() method on the SPI, with evaluators executed in ascending priority and the returned metrics merged left-to-right (lower priority first, higher priority wins on conflicts). This gives us:

  • a single, well-defined ordering contract that doesn't depend on the user's config-key ordering or on ServiceLoader/PluginManager discovery order,
  • parity and composability with ScalingExecutorPlugin, so both autoscaler extension points share the same mental model
  • a natural extension path for Option B above as priority() simply becomes the tie-breaker once we allow more than one evaluator per pipeline.

The Option B has the following pro & cons:

  • Pros: consistent with an idiom Flink users already know, aligns with FLIP-514, expressive (multiple evaluators per job), and drops the getName() coupling that is awkward today.
  • Cons: bigger change than A.

My preference would be to land Option B so we match FLIP-514 directly and don't ship a config surface we know we'll have to migrate away from.
Thoughts?

@Dennis-Mircea let's go for option 2 but to limit the change/complexity we can also start with supporting only a single evaluator plugin and add the priority / ordering logic as a follow up jira

Question & Follow-up: single-evaluator limitation vs. config namespace shape
As it stands, JobAutoScalerImpl#getCustomEvaluatorIfRequired reads a single ConfigOption<String> CUSTOM_EVALUATOR_NAME (job.autoscaler.metrics.custom-evaluator.name), looks up one FlinkAutoscalerEvaluator from the registry (keyed by FlinkAutoscalerEvaluator#getName()), and ScalingMetricEvaluator only knows how to invoke one evaluator per evaluation cycle. So at runtime only one custom evaluator can be active per pipeline, even though several may be registered via META-INF/services.
At the same time, AutoScalerOptions.forCustomEvaluator(conf, name) builds the evaluator-specific view as a DelegatingConfiguration over job.autoscaler.metrics.custom-evaluator.<name>., which means users have to write keys like:

job.autoscaler.metrics.custom-evaluator.name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator.target-data-rate: 100000.0

That repeated <name> segment is future-proofing for a multi-evaluator world, but today it's pure ceremony: there is never more than one evaluator reading from that namespace. It's confusing to document, easy to mis-type, and likely to trip users up.
I see two coherent directions and would like the maintainers' preference before pushing further commits:
Option A - keep the single-evaluator contract, simplify the namespace.
Drop the <name> segment from forCustomEvaluator so Context#getCustomEvaluatorConf() is a delegating view over job.autoscaler.metrics.custom-evaluator. directly. To avoid the name key leaking into that view the selector would move out of that prefix (e.g. renamed to job.autoscaler.metrics.custom-evaluator-name, with a fallback key for the old one). User-facing config becomes:

job.autoscaler.metrics.custom-evaluator-name: custom-evaluator
job.autoscaler.metrics.custom-evaluator.target-data-rate: 100000.0

The Option A has the following pro & cons:

  • Pros: cleanest UX, matches the actual runtime semantics.
  • Cons: represents a big limitation in the custom evaluator flexibility.

Option B - lift the limitation, align the config shape with Flink metric reporters and with FLIP-514.
Flink's metric-reporter config is the established idiom for "list of named instances, each with its own class and its own bag of options":

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

Applied to custom evaluators, the mapping is:
Reporter concept Evaluator equivalent
metrics.reporters: r1,r2 job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2
metrics.reporter.<r>.factory.class: FQN job.autoscaler.metrics.custom-evaluator.<instance>.class: FQN
metrics.reporter.<r>.<opt>: v job.autoscaler.metrics.custom-evaluator.<instance>.<opt>: v
Concretely:

job.autoscaler.metrics.custom-evaluators: custom-evaluator-1,custom-evaluator-2

job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.class: org.apache.flink.autoscaler.custom.MyFirstEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-1.target-data-rate: 100000.0

job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.class: org.apache.flink.autoscaler.custom.MySecondEvaluator
job.autoscaler.metrics.custom-evaluator.custom-evaluator-2.some-threshold: 0.8

The consequence is that the getName() goes away from the SPI. The instance name (custom-evaluator-1) is purely a configuration-level handle. Evaluator resolution stops going through the META-INF/services -> getName() registry and instead becomes: for each entry in custom-evaluators, read its ...<instance>.class property and instantiate the fully qualified name (via Class#forName, or via a factory SPI if we want DI/customization, analogous to *ReporterFactory). This is exactly the shape FLIP-514 proposes in its Job Level Config For Custom Evaluator section, and it removes the oddity where two evaluators couldn't share the same getName() even though they were distinct implementations.
To keep the ordering contract between multiple custom evaluators explicit and self-describing rather than implicit in the declaration order of job.autoscaler.metrics.custom-evaluators, we can adopt the same design already approved in FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler and implemented for ScalingExecutorPlugin.java‎: a default int priority() method on the SPI, with evaluators executed in ascending priority and the returned metrics merged left-to-right (lower priority first, higher priority wins on conflicts). This gives us:

  • a single, well-defined ordering contract that doesn't depend on the user's config-key ordering or on ServiceLoader/PluginManager discovery order,
  • parity and composability with ScalingExecutorPlugin, so both autoscaler extension points share the same mental model
  • a natural extension path for Option B above as priority() simply becomes the tie-breaker once we allow more than one evaluator per pipeline.

The Option B has the following pro & cons:

  • Pros: consistent with an idiom Flink users already know, aligns with FLIP-514, expressive (multiple evaluators per job), and drops the getName() coupling that is awkward today.
  • Cons: bigger change than A.

My preference would be to land Option B so we match FLIP-514 directly and don't ship a config surface we know we'll have to migrate away from.
Thoughts?

@Dennis-Mircea let's go for option 2 but to limit the change/complexity we can also start with supporting only a single evaluator plugin and add the priority / ordering logic as a follow up jira

The option 2 is now fully implemented and the FLINK-39554 JIRA was created as follow up. The PR is now ready for final review.

One last question and clarification about the FlinkAutoscalerEvaluator: In order to keep the alignment between the autoscaler plugins, as the FLIP-575 will introduce the ScalingExecutorPlugin, should we batter rename this SPI ScalingMetricsEvaluatorPlugin or AutoscalerMetricsEvaluatorPlugin? I'd lean toward ScalingMetricsEvaluatorPlugin for symmetry, but happy to defer to reviewers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants