Skip to content

[FLINK-39549][flink-autoscaler] Compute OBSERVED_TPR for non-source vertices behind a feature flag#1103

Open
Dennis-Mircea wants to merge 1 commit into
apache:mainfrom
Dennis-Mircea:FLINK-39549
Open

[FLINK-39549][flink-autoscaler] Compute OBSERVED_TPR for non-source vertices behind a feature flag#1103
Dennis-Mircea wants to merge 1 commit into
apache:mainfrom
Dennis-Mircea:FLINK-39549

Conversation

@Dennis-Mircea

@Dennis-Mircea Dennis-Mircea commented Apr 26, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Merge order: this PR should be merged after #1078 (FLINK-39306). The two touch the same per-second metric plumbing (both introduce FlinkMetric.NUM_RECORDS_IN_PER_SEC and extend non-source metric collection), so they conflict and #1078 should land first. They are also complementary. #1078 makes the busy-time TRUE_PROCESSING_RATE ratio internally consistent for every vertex by aligning its numerator estimator with the busy-time denominator. The extra benefit of that alignment, comparability against OBSERVED_TPR inside selectTprMetric, only applies to non-source vertices once this PR populates OBSERVED_TPR for them. Together they make the non-source capacity decision coherent.

This pull request extends the autoscaler's observed (backpressure-derived) true processing rate computation, currently scoped to source vertices only, to non-source vertices as well, behind an opt-in feature flag.

Today, OBSERVED_TPR is populated only for sources, and every downstream vertex falls back exclusively to the busy-time based TRUE_PROCESSING_RATE. This creates an alignment asymmetry across the job graph: sources may be evaluated using a backpressure-aware capacity model while their downstream operators are evaluated purely from busy-time, which is known to be unreliable under sustained backpressure. When the real bottleneck of the pipeline is downstream, this asymmetry can produce inconsistent or suboptimal scaling decisions on the very vertex that matters most.

This change generalizes the observed-TPR path so that any vertex in the graph can, when locally appropriate conditions are met, be evaluated using the same capacity model as sources. The behavior is fully opt-in, defaults to off, and preserves existing behavior for all current deployments.

This improvement is especially well suited for backpressured downstream tasks, for example IO-dependent operators whose external dependency is the limiter. For a heavily backpressured vertex the busy-time estimate becomes hypersensitive, because dividing the rate by the small busy fraction amplifies any idle time and busy-time measurement noise, so it tends to overstate capacity. The backpressure-derived OBSERVED_TPR is anchored directly to the backpressure signal and stays stable in that regime, and selectTprMetric already prefers it precisely when the busy-time estimate runs too high. The benefit grows with the backpressure level.

Brief change log

  • A new FlinkMetric.NUM_RECORDS_IN_PER_SEC is introduced for the task-level numRecordsInPerSecond metric used by non-source vertices.
  • ScalingMetricCollector#getFilteredVertexMetricNames is extended to thread Configuration through and, when the new flag is on, best-effort request BACKPRESSURE_TIME_PER_SEC and NUM_RECORDS_IN_PER_SEC for non-source vertices. Missing metrics are silently skipped (never throw MetricNotFoundException) so older Flink versions are not broken.
  • ScalingMetrics#computeDataRateMetrics is split into the existing source path (unchanged) and a new non-source path that computes OBSERVED_TPR via a small helper getNonSourceObservedTpr. The helper reuses the existing computeObservedTprWithBackpressure formula and is gated on a per-vertex engagement signal, (busyTimeMsPerSecond + backPressuredTimeMsPerSecond) / 1000 ≥ engagement-threshold, which is the per-vertex semantic dual of the source-side "has lag" gate (a vertex that is fully engaged is, by definition, not input-starved by upstream).
  • Two new opt-in AutoScalerOptions:
    • job.autoscaler.observed-true-processing-rate.non-source.enabled (default false)
    • job.autoscaler.observed-true-processing-rate.non-source.engagement-threshold (default 0.95)
  • ScalingMetricEvaluator is left unchanged: selectTprMetric is already vertex-agnostic and naturally picks up OBSERVED_TPR for non-sources once it is populated, without any additional wiring or risk.
  • Javadoc on ScalingMetric.OBSERVED_TPR is updated to reflect the new generality.
  • Generated config table (auto_scaler_configuration.html) updated with the two new options.

The feature is intentionally scoped tightly: gates remain local and independent per vertex (no cross-vertex coupling between source and non-source decisions). Independence is the right primitive here as the local engagement gate already implicitly suppresses non-source observed-TPR when the pipeline is idle, while still allowing it to fire in the most important target case (downstream bottleneck while sources have no lag).

Verifying this change

This change added tests and can be verified as follows:

  • New unit tests in ScalingMetricsTest covering the non-source path:
    • testNonSourceObservedTprDisabledByDefault - flag off ⇒ OBSERVED_TPR is never populated for non-sources, even with all metrics present and the vertex fully engaged.
    • testNonSourceObservedTprEnabled - covers below-threshold (no OBSERVED_TPR / fallback to historical avg), engaged with backpressure (formula yields rate / (1 - bp/1000)), zero input rate while engaged (+Infinity, mirroring source semantics), and the degenerate bp ≥ 1000ms/s case.
    • testNonSourceObservedTprCustomEngagementThreshold - verifies the engagement threshold option is honored.
    • testNonSourceObservedTprMissingMetricsAreSilentlySkipped - verifies graceful behavior when the deployed Flink version does not expose the per-vertex metrics.
  • Existing ScalingMetricCollectorTest updated for the new Configuration parameter on getFilteredVertexMetricNames; existing required-metrics coverage preserved.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs & JavaDocs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant