Skip to content

Commit 6b85ce1

Browse files
committed
[FLINK-39549][flink-autoscaler] Compute OBSERVED_TPR for non-source vertices behind a feature flag
1 parent 28e3839 commit 6b85ce1

7 files changed

Lines changed: 68 additions & 87 deletions

File tree

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@
136136
</tr>
137137
<tr>
138138
<td><h5>job.autoscaler.observed-true-processing-rate.non-source.engagement-threshold</h5></td>
139-
<td style="word-wrap: break-word;">0.95</td>
139+
<td style="word-wrap: break-word;">0.9</td>
140140
<td>Double</td>
141141
<td>Engagement threshold (between 0 and 1) for triggering observed true processing rate computation on non-source vertices. Computed as (busyTimeMsPerSecond + backPressuredTimeMsPerSecond) / 1000. The vertex is considered fully engaged (and therefore not input-starved) when this value crosses the threshold, which is the non-source analogue of the source 'has lag' condition.</td>
142142
</tr>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -492,9 +492,6 @@ Map<String, FlinkMetric> getFilteredVertexMetricNames(
492492
m -> filteredMetrics.put(m, FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT));
493493
} else if (conf != null
494494
&& conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_NON_SOURCE_ENABLED)) {
495-
// Non-source observed TPR computation requires backpressure and a per-vertex input
496-
// rate. These are best-effort: if either is unavailable for the deployed Flink
497-
// version, we silently skip observed-TPR for this vertex (no exception).
498495
FlinkMetric.BACKPRESSURE_TIME_PER_SEC
499496
.findAny(allMetricNames)
500497
.ifPresent(m -> filteredMetrics.put(m, FlinkMetric.BACKPRESSURE_TIME_PER_SEC));

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
261261
autoScalerConfig(
262262
"observed-true-processing-rate.non-source.engagement-threshold")
263263
.doubleType()
264-
.defaultValue(0.95)
264+
.defaultValue(0.9)
265265
.withFallbackKeys(
266266
oldOperatorConfigKey(
267267
"observed-true-processing-rate.non-source.engagement-threshold"))

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
*/
3333
public enum FlinkMetric {
3434
BUSY_TIME_PER_SEC(s -> s.equals("busyTimeMsPerSecond")),
35-
NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")),
3635
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC(
3736
s -> s.startsWith("Source__") && s.endsWith(".numRecordsInPerSecond")),
3837
SOURCE_TASK_NUM_RECORDS_OUT(s -> s.startsWith("Source__") && s.endsWith(".numRecordsOut")),
@@ -41,6 +40,7 @@ public enum FlinkMetric {
4140
SOURCE_TASK_NUM_RECORDS_IN(s -> s.startsWith("Source__") && s.endsWith(".numRecordsIn")),
4241
PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
4342
BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond")),
43+
NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")),
4444

4545
HEAP_MEMORY_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
4646
HEAP_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ public enum ScalingMetric {
3434
TRUE_PROCESSING_RATE(true),
3535

3636
/**
37-
* Observed (backpressure-derived) true processing rate. Always populated for source vertices
38-
* when the source is "catching up" (lag-based gate). Optionally populated for non-source
39-
* vertices when {@code job.autoscaler.observed-true-processing-rate.non-source.enabled} is set
40-
* — gated on the vertex being fully engaged (busy + backpressured time crosses the configured
41-
* engagement threshold), which is the per-vertex analogue of the source "has lag" condition.
37+
* Observed true processing rate, derived from backpressure. Always populated for source
38+
* vertices when the source is catching up, based on source lag. Optionally populated for
39+
* non-source vertices when {@code
40+
* job.autoscaler.observed-true-processing-rate.non-source.enabled} is set, in which case it is
41+
* gated on the vertex being fully engaged (busy plus backpressured time crosses the configured
42+
* engagement threshold). That gate is the per vertex analogue of the source "has lag"
43+
* condition.
4244
*/
4345
OBSERVED_TPR(true),
4446

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ private static Optional<Double> getObservedTpr(
140140
/**
141141
* Compute the observed (backpressure-derived) true processing rate for a non-source vertex.
142142
*
143-
* <p>Triggered only when the vertex is "fully engaged" — i.e. spending nearly all its time
144-
* either busy or backpressured. This is the per-vertex analogue of the source-side "has lag"
145-
* gate in {@link #getObservedTpr}: a fully engaged vertex is not input-starved by upstream, so
146-
* its observed input rate represents its true capacity (modulo backpressure).
143+
* <p>Triggered only when the vertex is "fully engaged", meaning it spends nearly all of its
144+
* time either busy or backpressured. This mirrors the source side "has lag" gate in {@link
145+
* #getObservedTpr}. A fully engaged vertex is not input starved by upstream, so its observed
146+
* input rate represents its true capacity (modulo backpressure).
147147
*
148148
* <p>Returns {@link Optional#empty()} when the required metrics are unavailable, the vertex is
149149
* not engaged enough, or the formula degenerates (e.g. {@code backpressure >= 1000ms/s}). In
@@ -152,20 +152,21 @@ private static Optional<Double> getObservedTpr(
152152
private static Optional<Double> getNonSourceObservedTpr(
153153
Map<FlinkMetric, AggregatedMetric> flinkMetrics, Configuration conf) {
154154

155-
var bpMetric = flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC);
156-
var rateMetric = flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
157-
var busyMetric = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
158-
if (bpMetric == null || rateMetric == null || busyMetric == null) {
155+
var backpressureMsPerSecond = flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC);
156+
var numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
157+
var busyTimePerSecond = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
158+
if (backpressureMsPerSecond == null
159+
|| numRecordsInPerSecond == null
160+
|| busyTimePerSecond == null) {
159161
return Optional.empty();
160162
}
161163

162-
double busyMsPerSec = busyMetric.getAvg();
163-
double bpMsPerSec = bpMetric.getAvg();
164+
double busyMsPerSec = busyTimePerSecond.getAvg();
165+
double bpMsPerSec = backpressureMsPerSecond.getAvg();
164166
if (!Double.isFinite(busyMsPerSec) || !Double.isFinite(bpMsPerSec)) {
165167
return Optional.empty();
166168
}
167-
// Engagement = fraction of time the vertex is either doing work or blocked downstream.
168-
// High engagement means the vertex is not waiting for input -> rate reflects capacity.
169+
169170
double engagement = (Math.max(0, busyMsPerSec) + Math.max(0, bpMsPerSec)) / 1000.;
170171
double engagementThreshold =
171172
conf.get(
@@ -175,17 +176,18 @@ private static Optional<Double> getNonSourceObservedTpr(
175176
return Optional.empty();
176177
}
177178

178-
double numRecordsInPerSecond = rateMetric.getSum();
179-
if (Double.isNaN(numRecordsInPerSecond)) {
179+
double numRecordsInPerSecondSum = numRecordsInPerSecond.getSum();
180+
if (Double.isNaN(numRecordsInPerSecondSum)) {
180181
return Optional.empty();
181182
}
182183
// Mirror source-side semantics: zero input rate while engaged signals no work to do,
183184
// allow scale down.
184-
if (numRecordsInPerSecond == 0) {
185+
if (numRecordsInPerSecondSum == 0) {
185186
return Optional.of(Double.POSITIVE_INFINITY);
186187
}
187188

188-
double observedTpr = computeObservedTprWithBackpressure(numRecordsInPerSecond, bpMsPerSec);
189+
double observedTpr =
190+
computeObservedTprWithBackpressure(numRecordsInPerSecondSum, bpMsPerSec);
189191
return Double.isNaN(observedTpr) ? Optional.empty() : Optional.of(observedTpr);
190192
}
191193

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java

Lines changed: 40 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -287,50 +287,11 @@ public void testGlobalMetrics() {
287287
conf));
288288
}
289289

290-
private static AggregatedMetric aggSum(double sum) {
291-
return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, sum, Double.NaN);
292-
}
293-
294-
private static AggregatedMetric aggAvg(double avg) {
295-
return new AggregatedMetric("", Double.NaN, Double.NaN, avg, Double.NaN, Double.NaN);
296-
}
297-
298-
private static double computeNonSourceObservedTpr(
299-
double busyAvg,
300-
double bpAvg,
301-
double inputRatePerSecSum,
302-
Configuration conf,
303-
double prevTpr) {
304-
var source = new JobVertexID();
305-
var sink = new JobVertexID();
306-
var topology =
307-
new JobTopology(
308-
new VertexInfo(
309-
source, Collections.emptyMap(), 1, 1, new IOMetrics(0, 0, 0)),
310-
new VertexInfo(
311-
sink, Map.of(source, REBALANCE), 1, 1, new IOMetrics(0, 0, 0)));
312-
313-
Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
314-
var flinkMetrics = new HashMap<FlinkMetric, AggregatedMetric>();
315-
flinkMetrics.put(FlinkMetric.BUSY_TIME_PER_SEC, aggAvg(busyAvg));
316-
if (!Double.isNaN(bpAvg)) {
317-
flinkMetrics.put(FlinkMetric.BACKPRESSURE_TIME_PER_SEC, aggAvg(bpAvg));
318-
}
319-
if (!Double.isNaN(inputRatePerSecSum)) {
320-
flinkMetrics.put(FlinkMetric.NUM_RECORDS_IN_PER_SEC, aggSum(inputRatePerSecSum));
321-
}
322-
ScalingMetrics.computeDataRateMetrics(
323-
sink, flinkMetrics, scalingMetrics, topology, conf, () -> prevTpr);
324-
return scalingMetrics.getOrDefault(ScalingMetric.OBSERVED_TPR, Double.NaN);
325-
}
326-
327290
@Test
328291
public void testNonSourceObservedTprDisabledByDefault() {
329-
// Even with all required metrics present and the vertex fully engaged,
330-
// OBSERVED_TPR must NOT be populated for non-sources unless the flag is on.
331-
var conf = new Configuration();
332-
// Flag intentionally NOT set -> default false.
333-
double tpr = computeNonSourceObservedTpr(900., 100., 1000., conf, Double.NaN);
292+
double tpr =
293+
computeNonSourceObservedTpr(900., 100., 1000., new Configuration(), Double.NaN);
294+
334295
assertTrue(Double.isNaN(tpr));
335296
}
336297

@@ -339,25 +300,12 @@ public void testNonSourceObservedTprEnabled() {
339300
var conf = new Configuration();
340301
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_NON_SOURCE_ENABLED, true);
341302

342-
// Fully engaged (busy 900 + bp 100 = 1000ms/s) and 200ms/s backpressure.
343-
// Wait — set bp=200 so it's clearly above zero and the formula is exercised.
344-
// engagement = (busy + bp)/1000 = (700+200)/1000 = 0.9 -> below default 0.95 -> NaN
345303
assertTrue(Double.isNaN(computeNonSourceObservedTpr(700., 200., 1000., conf, Double.NaN)));
346-
347-
// engagement = (800+200)/1000 = 1.0 >= 0.95 -> compute
348-
// observedTpr = rate / (1 - bp/1000) = 1000 / 0.8 = 1250
349304
assertEquals(1000. / 0.8, computeNonSourceObservedTpr(800., 200., 1000., conf, Double.NaN));
350-
351-
// Zero input rate while engaged -> POSITIVE_INFINITY (mirrors source behavior).
352305
assertEquals(
353306
Double.POSITIVE_INFINITY,
354307
computeNonSourceObservedTpr(950., 50., 0., conf, Double.NaN));
355-
356-
// Backpressure >= 1000ms/s -> formula degenerates -> fallback supplier (null here)
357-
// is returned only if non-NaN, otherwise OBSERVED_TPR is not set.
358308
assertTrue(Double.isNaN(computeNonSourceObservedTpr(0., 1000., 1000., conf, Double.NaN)));
359-
360-
// Below engagement threshold -> fallback to historical observedTprAvg (PREV_TPR).
361309
assertEquals(PREV_TPR, computeNonSourceObservedTpr(100., 100., 1000., conf, PREV_TPR));
362310
}
363311

@@ -369,11 +317,8 @@ public void testNonSourceObservedTprCustomEngagementThreshold() {
369317
AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_NON_SOURCE_ENGAGEMENT_THRESHOLD,
370318
0.5);
371319

372-
// engagement = 0.6 >= 0.5 -> compute
373320
assertEquals(
374321
500. / (1 - 0.1), computeNonSourceObservedTpr(500., 100., 500., conf, Double.NaN));
375-
376-
// engagement = 0.4 < 0.5 -> NaN
377322
assertTrue(Double.isNaN(computeNonSourceObservedTpr(300., 100., 500., conf, Double.NaN)));
378323
}
379324

@@ -382,21 +327,56 @@ public void testNonSourceObservedTprMissingMetricsAreSilentlySkipped() {
382327
var conf = new Configuration();
383328
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_NON_SOURCE_ENABLED, true);
384329

385-
// No backpressure metric available (e.g. older Flink / metric not requested) -> NaN
386330
assertTrue(
387331
Double.isNaN(
388332
computeNonSourceObservedTpr(900., Double.NaN, 1000., conf, Double.NaN)));
389-
// No input rate metric available -> NaN
390333
assertTrue(
391334
Double.isNaN(
392335
computeNonSourceObservedTpr(900., 100., Double.NaN, conf, Double.NaN)));
393336
}
394337

338+
private static AggregatedMetric aggSum(double sum) {
339+
return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, sum, Double.NaN);
340+
}
341+
342+
private static AggregatedMetric aggAvg(double avg) {
343+
return new AggregatedMetric("", Double.NaN, Double.NaN, avg, Double.NaN, Double.NaN);
344+
}
345+
395346
private static AggregatedMetric aggMax(double max) {
396347
return new AggregatedMetric("", Double.NaN, max, Double.NaN, Double.NaN, Double.NaN);
397348
}
398349

399350
private static AggregatedMetric aggAvgMax(double avg, double max) {
400351
return new AggregatedMetric("", Double.NaN, max, avg, Double.NaN, Double.NaN);
401352
}
353+
354+
private static double computeNonSourceObservedTpr(
355+
double busyAvg,
356+
double bpAvg,
357+
double inputRatePerSecSum,
358+
Configuration conf,
359+
double prevTpr) {
360+
var source = new JobVertexID();
361+
var sink = new JobVertexID();
362+
var topology =
363+
new JobTopology(
364+
new VertexInfo(
365+
source, Collections.emptyMap(), 1, 1, new IOMetrics(0, 0, 0)),
366+
new VertexInfo(
367+
sink, Map.of(source, REBALANCE), 1, 1, new IOMetrics(0, 0, 0)));
368+
369+
Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
370+
var flinkMetrics = new HashMap<FlinkMetric, AggregatedMetric>();
371+
flinkMetrics.put(FlinkMetric.BUSY_TIME_PER_SEC, aggAvg(busyAvg));
372+
if (!Double.isNaN(bpAvg)) {
373+
flinkMetrics.put(FlinkMetric.BACKPRESSURE_TIME_PER_SEC, aggAvg(bpAvg));
374+
}
375+
if (!Double.isNaN(inputRatePerSecSum)) {
376+
flinkMetrics.put(FlinkMetric.NUM_RECORDS_IN_PER_SEC, aggSum(inputRatePerSecSum));
377+
}
378+
ScalingMetrics.computeDataRateMetrics(
379+
sink, flinkMetrics, scalingMetrics, topology, conf, () -> prevTpr);
380+
return scalingMetrics.getOrDefault(ScalingMetric.OBSERVED_TPR, Double.NaN);
381+
}
402382
}

0 commit comments

Comments
 (0)