Skip to content

[FLINK-39555] FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler#1085

Open
Dennis-Mircea wants to merge 1 commit intoapache:mainfrom
Dennis-Mircea:scaling-decision-filter-spi
Open

[FLINK-39555] FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler#1085
Dennis-Mircea wants to merge 1 commit intoapache:mainfrom
Dennis-Mircea:scaling-decision-filter-spi

Conversation

@Dennis-Mircea
Copy link
Copy Markdown
Contributor

@Dennis-Mircea Dennis-Mircea commented Apr 10, 2026

What is the purpose of the change

This pull request introduces the ScalingExecutorPlugin SPI as defined in FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler. The SPI allows users to plug in custom logic that intercepts, modifies, or vetoes computed scaling decisions before they are applied by the autoscaler. It is complementary to FLIP-514: Custom Evaluator plugin for Flink Autoscaler, which extends the metric evaluation layer; this contribution extends the scaling decision execution layer.

Unlike FlinkAutoscalerEvaluator (capped at one instance per job), ScalingExecutorPlugin natively supports multiple named instances chained together in a deterministic priority order, enabling composition of orthogonal concerns (resource gating, parallelism alignment, cost capping, audit, …) without coupling them inside a single plugin.

NOTE: This PR ensures the full alignment with the [FLINK-39511] FLIP-514: Custom Evaluator plugin for Flink Autoscaler PR (#1099).

Brief change log

  • New SPI - org.apache.flink.autoscaler.ScalingExecutorPlugin<KEY, CTX> with:

    • priority() (default 0, lower runs first) for deterministic chain ordering.
    • apply(Context, Map<JobVertexID, ScalingSummary>) returning a (possibly modified) summary map; an empty map vetoes the scaling operation.
    • Immutable Context bundling the JobAutoScalerContext, the prefix-stripped per-instance Configuration, the evaluated metrics, and the job topology.
  • Discovery / selection split - mirrors the FlinkAutoscalerEvaluator flow:

    • Operator-startup discovery (cached once) - flink-kubernetes-operator/AutoscalerUtils#discoverCustomScalingExecutors(Configuration) via the Flink PluginUtils mechanism; flink-autoscaler-standalone/AutoscalerUtils#discoverCustomScalingExecutors(Configuration) via Java ServiceLoader.
    • Per-reconciliation selection - ScalingExecutor#resolveCustomScalingExecutors(Configuration) matches the configured chain against the discovered SPI bag, sorts by priority(), and returns an unmodifiable ordered Collection<Map.Entry<String, ScalingExecutorPlugin>> carrying the user-chosen <name> for each resolved instance. This guarantees that per-job overrides via FlinkDeployment.spec.flinkConfiguration take effect on the next reconcile without restarting the operator.
  • Configuration (metric-reporter idiom) in AutoScalerOptions:

    • job.autoscaler.scaling.custom-executors - list of named instances.
    • job.autoscaler.scaling.custom-executor.<name>.class - implementation FQN.
    • job.autoscaler.scaling.custom-executor.<name>.<parameter> - free-form per-instance options, delivered to the plugin via Context.getConfiguration() with the per-instance prefix stripped.
    • All keys honor the legacy kubernetes.operator.-prefixed form as a fallback; canonical wins on overlap.
    • Helpers: customScalingExecutorClassKey, customScalingExecutorClassFallbackKey, customScalingExecutorClassOption, customScalingExecutorConfiguration.
  • ScalingExecutor integration - invokes applyScalingExecutorPlugins after the blocked check, after memory tuning, and after the resource-quota gate, but before persisting parallelism overrides. Approve / veto outcomes are surfaced as ScalingApproved / ScalingVetoed Kubernetes events with messageKey = <reason>:<instanceName> (so chained instances of the same plugin class do not de-duplicate). An INFO log line Custom scaling executors resolved and sorted by priority: [...] is emitted per reconciliation listing each [name, class, priority] entry in execution order.

  • Documentation - new ## Custom Scaling Executors section in docs/content/docs/operations/plugins.md covering the interface, SPI service file, packaging, YAML configuration, operator + standalone deployment with the corresponding discovery log lines, and the per-reconciliation chain log hint.

Verifying this change

This change added tests and can be verified as follows:

  • flink-autoscaler/ScalingExecutorTest - extended with a dedicated nested ScalingExecutorPluginTest suite covering:
    • Approve / veto / modify / empty-map behaviour and chaining (approve→veto, veto-short-circuits, two-modify chain).
    • priority()-driven chain ordering using high / default / low priority instances registered out of order.
    • Approve / veto Kubernetes event emission with the correct <reason>:<instanceName> messageKey, including the no-plugins-registered case.
    • Legacy kubernetes.operator.-prefixed fallback for both the scaling.custom-executors list and the per-instance <name>.class key.
    • Per-instance scoped Configuration delivery (canonical prefix, legacy prefix, and canonical-wins-on-overlap semantics).
  • flink-autoscaler-standalone/AutoscalerUtilsTest - verifies ServiceLoader-based discovery returns the registered ScalingExecutorPlugin instances.
  • flink-kubernetes-operator/AutoscalerUtilsTest - verifies plugin-directory discovery via PluginUtils.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs & JavaDocs

@Dennis-Mircea Dennis-Mircea changed the title Introduce the ScalingDecisionFilter SPI FLIP-XXX: Introduce the ScalingDecisionFilter SPI Apr 10, 2026
@Dennis-Mircea Dennis-Mircea changed the title FLIP-XXX: Introduce the ScalingDecisionFilter SPI FLIP-XXX: Introduce the ScalingExecutorPlugin SPI Apr 13, 2026
@Dennis-Mircea Dennis-Mircea changed the title FLIP-XXX: Introduce the ScalingExecutorPlugin SPI FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler Apr 21, 2026
@Dennis-Mircea Dennis-Mircea force-pushed the scaling-decision-filter-spi branch from d922cef to e44c647 Compare April 28, 2026 05:55
@Dennis-Mircea Dennis-Mircea changed the title FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler [FLINK-39555] FLIP-575: Scaling Executor Plugin SPI for Flink Autoscaler Apr 28, 2026
@Dennis-Mircea Dennis-Mircea marked this pull request as ready for review April 28, 2026 06:29
@Dennis-Mircea
Copy link
Copy Markdown
Contributor Author

This FLIP PR is now ready for the final review.

NOTE: If the #1099 PR will be merged before (and most probably it will) there will be some conflicts that will must be resolved afterwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant