[FLINK-39549][flink-autoscaler] Compute OBSERVED_TPR for non-source vertices behind a feature flag#1103
Open
Dennis-Mircea wants to merge 1 commit into
Open
[FLINK-39549][flink-autoscaler] Compute OBSERVED_TPR for non-source vertices behind a feature flag#1103Dennis-Mircea wants to merge 1 commit into
Dennis-Mircea wants to merge 1 commit into
Conversation
9da6eb9 to
6b85ce1
Compare
…ertices behind a feature flag
6b85ce1 to
9319dbc
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This pull request extends the autoscaler's observed (backpressure-derived) true processing rate computation, currently scoped to source vertices only, to non-source vertices as well, behind an opt-in feature flag.
Today,
OBSERVED_TPRis populated only for sources, and every downstream vertex falls back exclusively to the busy-time basedTRUE_PROCESSING_RATE. This creates an alignment asymmetry across the job graph: sources may be evaluated using a backpressure-aware capacity model while their downstream operators are evaluated purely from busy-time, which is known to be unreliable under sustained backpressure. When the real bottleneck of the pipeline is downstream, this asymmetry can produce inconsistent or suboptimal scaling decisions on the very vertex that matters most.This change generalizes the observed-TPR path so that any vertex in the graph can, when locally appropriate conditions are met, be evaluated using the same capacity model as sources. The behavior is fully opt-in, defaults to off, and preserves existing behavior for all current deployments.
This improvement is especially well suited for backpressured downstream tasks, for example IO-dependent operators whose external dependency is the limiter. For a heavily backpressured vertex the busy-time estimate becomes hypersensitive, because dividing the rate by the small busy fraction amplifies any idle time and busy-time measurement noise, so it tends to overstate capacity. The backpressure-derived
OBSERVED_TPRis anchored directly to the backpressure signal and stays stable in that regime, andselectTprMetricalready prefers it precisely when the busy-time estimate runs too high. The benefit grows with the backpressure level.Brief change log
FlinkMetric.NUM_RECORDS_IN_PER_SECis introduced for the task-levelnumRecordsInPerSecondmetric used by non-source vertices.ScalingMetricCollector#getFilteredVertexMetricNamesis extended to threadConfigurationthrough and, when the new flag is on, best-effort requestBACKPRESSURE_TIME_PER_SECandNUM_RECORDS_IN_PER_SECfor non-source vertices. Missing metrics are silently skipped (never throwMetricNotFoundException) so older Flink versions are not broken.ScalingMetrics#computeDataRateMetricsis split into the existing source path (unchanged) and a new non-source path that computesOBSERVED_TPRvia a small helpergetNonSourceObservedTpr. The helper reuses the existingcomputeObservedTprWithBackpressureformula and is gated on a per-vertex engagement signal,(busyTimeMsPerSecond + backPressuredTimeMsPerSecond) / 1000 ≥ engagement-threshold, which is the per-vertex semantic dual of the source-side "has lag" gate (a vertex that is fully engaged is, by definition, not input-starved by upstream).AutoScalerOptions:job.autoscaler.observed-true-processing-rate.non-source.enabled(defaultfalse)job.autoscaler.observed-true-processing-rate.non-source.engagement-threshold(default0.95)ScalingMetricEvaluatoris left unchanged:selectTprMetricis already vertex-agnostic and naturally picks upOBSERVED_TPRfor non-sources once it is populated, without any additional wiring or risk.ScalingMetric.OBSERVED_TPRis updated to reflect the new generality.auto_scaler_configuration.html) updated with the two new options.The feature is intentionally scoped tightly: gates remain local and independent per vertex (no cross-vertex coupling between source and non-source decisions). Independence is the right primitive here as the local engagement gate already implicitly suppresses non-source observed-TPR when the pipeline is idle, while still allowing it to fire in the most important target case (downstream bottleneck while sources have no lag).
Verifying this change
This change added tests and can be verified as follows:
ScalingMetricsTestcovering the non-source path:testNonSourceObservedTprDisabledByDefault- flag off ⇒OBSERVED_TPRis never populated for non-sources, even with all metrics present and the vertex fully engaged.testNonSourceObservedTprEnabled- covers below-threshold (noOBSERVED_TPR/ fallback to historical avg), engaged with backpressure (formula yieldsrate / (1 - bp/1000)), zero input rate while engaged (+Infinity, mirroring source semantics), and the degeneratebp ≥ 1000ms/scase.testNonSourceObservedTprCustomEngagementThreshold- verifies the engagement threshold option is honored.testNonSourceObservedTprMissingMetricsAreSilentlySkipped- verifies graceful behavior when the deployed Flink version does not expose the per-vertex metrics.ScalingMetricCollectorTestupdated for the newConfigurationparameter ongetFilteredVertexMetricNames; existing required-metrics coverage preserved.Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors: noDocumentation