Skip to content

Commit 92cb783

Browse files
Merge branch 'main' into worktree-dashboard-autoreload-954
2 parents e4bbb79 + f7f273e commit 92cb783

3 files changed

Lines changed: 113 additions & 20 deletions

File tree

src/ess/livedata/core/message_batcher.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,18 @@ def _split_messages(
189189

190190
ESCALATION_OVERLOAD_THRESHOLD = 2
191191
ESCALATION_HALF_STEPS = 2
192-
DEESCALATION_HEADROOM_RATIO = 0.75
192+
# A batch is "underloaded" (de-escalation candidate) when its processing time is
193+
# below this fraction of the window; the stable "dead zone" is therefore
194+
# [ratio, 1.0] in utilisation. De-escalating one half-step shrinks the window by
195+
# 1/sqrt(2), i.e. raises utilisation by the consecutive-level ratio. For every
196+
# workload to have a stable level (utilisation landing in the dead zone at *some*
197+
# level) the dead zone must span one such step: ratio <= 1 / (level ratio). The
198+
# nominal ratio is sqrt(2) ~ 1.414, but pulse-quantization rounds windows so the
199+
# widest consecutive ratio is ~1.43 (e.g. round(14*sqrt(2))/14 = 20/14). Hence
200+
# ratio <= 1/1.43 ~ 0.70. A larger value (e.g. 0.75) leaves a gap where a
201+
# workload is underloaded at every escalated level yet overloaded one step down,
202+
# causing steady oscillation.
203+
DEESCALATION_HEADROOM_RATIO = 0.70
193204
DEESCALATION_UNDERLOAD_THRESHOLD = 3
194205
DEESCALATION_IDLE_WINDOWS = 3
195206

tests/core/adaptive_batching_scenarios_test.py

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@
117117
"min_level": 1,
118118
"min_final_level": 1,
119119
},
120+
# -- No-fixed-point load (issue #877, Finding 1) ----------------------
121+
# A flat ~1.05s processing time overloads at level 0 (1.0s) but fits with
122+
# headroom at every escalated level. The dead zone must be wide enough to
123+
# give it a stable level; otherwise it oscillates 0->2->1->0 forever.
124+
"overhead_dominated_no_fixed_point": {
125+
"min_level_reached": 1,
126+
"max_oscillations": 1,
127+
"min_final_level": 1,
128+
},
120129
# -- Creeping overload ------------------------------------------------
121130
"creeping_overload": {
122131
"min_level_reached": 4,
@@ -714,7 +723,7 @@ def test_no_escalation_with_gc_jitter(self, seed):
714723
715724
Processing is fast on average (0.3s) but with high jitter
716725
(std = 1.2 * mean = 0.36s) that regularly sends individual batches
717-
into the dead zone (75-100% of window) and occasionally past the
726+
into the dead zone (70-100% of window) and occasionally past the
718727
window entirely (~4 overloaded cycles per 120s run).
719728
720729
The batcher must tolerate these isolated spikes because its
@@ -782,8 +791,8 @@ def test_boundary_jitter_escalates_and_sticks(self):
782791
P(2 consecutive overloaded) ~ 25%, so escalation is very likely.
783792
784793
At level 2 (2s window): 0.5 + 1.0 = 1.5s mean (75% utilization).
785-
At the dead-zone boundary (>= 75%), so de-escalation never triggers.
786-
Documents limitation: once escalated, stays stuck due to dead zone.
794+
Comfortably within the dead zone (>= 70%), so de-escalation never
795+
triggers. Documents limitation: once escalated, stays stuck.
787796
"""
788797
lim = LIMITS["boundary_jitter"]
789798
batcher, clock = make_default_batcher()
@@ -810,6 +819,51 @@ def test_boundary_jitter_escalates_and_sticks(self):
810819
)
811820

812821

822+
class TestNoFixedPointLoad:
823+
"""Overhead-dominated load with no naturally stable level (issue #877).
824+
825+
A flat processing time that overloads at the base window but fits with
826+
headroom one or more steps up has no level where utilization lands in the
827+
dead zone — unless the dead zone is wide enough to cover a de-escalation
828+
step (a factor of ~sqrt(2) in utilization). With too small a dead zone the
829+
controller escalates, finds every escalated level "underloaded",
830+
de-escalates back until it overloads again, and oscillates indefinitely.
831+
"""
832+
833+
def test_settles_instead_of_oscillating(self):
834+
"""A flat 1.05s load must converge to a stable level, not oscillate.
835+
836+
Regardless of window:
837+
Level 0 (1.0s): 105% -> overloaded.
838+
Level 1 (1.43s): 74% -> within the dead zone (stable).
839+
Level 2 (2.0s): 53% -> underloaded.
840+
"""
841+
lim = LIMITS["overhead_dominated_no_fixed_point"]
842+
batcher, clock = make_default_batcher()
843+
844+
cost = constant_overhead_cost(overhead_s=1.05, per_second_cost=0.0)
845+
846+
result = run_scenario(batcher, 120.0, cost, clock)
847+
assert result.max_level >= lim["min_level_reached"], (
848+
f"Precondition: load must trigger escalation "
849+
f"(reached level {result.max_level})"
850+
)
851+
assert result.oscillation_count() <= lim["max_oscillations"], (
852+
f"Oscillated {result.oscillation_count()} times — no stable level "
853+
f"found (limit: {lim['max_oscillations']})"
854+
)
855+
late = result.cycles_after(60.0)
856+
assert late, "Simulation too short for stabilization check"
857+
late_levels = {c.level for c in late}
858+
assert len(late_levels) == 1, (
859+
f"Not stabilized: levels {sorted(late_levels)} in second half"
860+
)
861+
assert next(iter(late_levels)) >= lim["min_final_level"], (
862+
f"Stabilized at level {next(iter(late_levels))} below the expected "
863+
f"escalated level (>= {lim['min_final_level']})"
864+
)
865+
866+
813867
class TestCreepingOverload:
814868
"""Load that gradually increases past processing capacity."""
815869

@@ -985,8 +1039,8 @@ def test_partial_deescalation(self):
9851039
De-escalates to level 3 (2.83s): 1.8 + 0.57 = 2.37s (84%, dead zone).
9861040
9871041
Moderate phase (de-escalates from level 3 to level 2):
988-
Level 3 (2.83s): 0.6 + 1.41 = 2.01s (71%, underloaded).
989-
Level 2 (2.0s): 0.6 + 1.0 = 1.6s (80%, dead zone — stuck).
1042+
Level 3 (2.83s): 0.45 + 1.43 = 1.88s (66%, underloaded).
1043+
Level 2 (2.0s): 0.45 + 1.0 = 1.45s (72.5%, dead zone — stuck).
9901044
"""
9911045
lim = LIMITS["partial_deescalation"]
9921046
batcher, clock = make_default_batcher()
@@ -997,7 +1051,7 @@ def test_partial_deescalation(self):
9971051
after=step_function_cost(
9981052
step_time_s=60.0,
9991053
before=constant_overhead_cost(overhead_s=1.8, per_second_cost=0.2),
1000-
after=constant_overhead_cost(overhead_s=0.6, per_second_cost=0.5),
1054+
after=constant_overhead_cost(overhead_s=0.45, per_second_cost=0.5),
10011055
),
10021056
)
10031057

@@ -1122,7 +1176,7 @@ def test_severe_overload_to_cosmic_background(self):
11221176
Level 4 (4.0s): 0.2 + 0.04 = 0.24s (6% utilization).
11231177
Level 2 (2.0s): 0.2 + 0.02 = 0.22s (11% utilization).
11241178
Level 0 (1.0s): 0.2 + 0.01 = 0.21s (21% utilization).
1125-
All levels are well below the 75% headroom threshold.
1179+
All levels are well below the 70% headroom threshold.
11261180
"""
11271181
lim = LIMITS["severe_to_cosmic_background"]
11281182
batcher, clock = make_default_batcher()
@@ -1171,11 +1225,11 @@ def test_fast_escalation_on_clear_overload(self):
11711225

11721226

11731227
class TestDeescalationDeadZone:
1174-
"""The 75-100% utilization dead zone where de-escalation cannot trigger.
1228+
"""The 70-100% utilization dead zone where de-escalation cannot trigger.
11751229
1176-
When processing fills 75-100% of the escalated window, it falls in the
1230+
When processing fills 70-100% of the escalated window, it falls in the
11771231
"in between" zone: not overloaded (processing < window) and not
1178-
underloaded (processing >= 0.75 * window). Both consecutive counters
1232+
underloaded (processing >= 0.70 * window). Both consecutive counters
11791233
are reset every cycle, so neither escalation nor de-escalation can
11801234
trigger — even if a lower level would handle the load fine.
11811235
"""
@@ -1191,10 +1245,10 @@ def test_stuck_in_dead_zone_after_load_drop(self):
11911245
Level 4 (4.0s): 2.0 + 1.2 = 3.2s (80%, dead zone).
11921246
11931247
Moderate phase (de-escalates from level 4 to level 3, then stuck):
1194-
Level 4 (4.0s): 0.5 + 2.4 = 2.9s (72.5%, underloaded < 75%).
1195-
Level 3 (2.83s): 0.5 + 1.7 = 2.2s (78%, dead zone — stuck).
1196-
Level 2 (2.0s): 0.5 + 1.2 = 1.7s (would fit at 85%).
1197-
Level 0 (1.0s): 0.5 + 0.6 = 1.1s (would be overloaded).
1248+
Level 4 (4.0s): 0.5 + 2.2 = 2.7s (67.5%, underloaded).
1249+
Level 3 (2.83s): 0.5 + 1.57 = 2.07s (72.5%, dead zone — stuck).
1250+
Level 2 (2.0s): 0.5 + 1.1 = 1.6s (would fit at 80%).
1251+
Level 0 (1.0s): 0.5 + 0.55 = 1.05s (would be overloaded).
11981252
"""
11991253
lim = LIMITS["dead_zone_stuck"]
12001254
batcher, clock = make_default_batcher()
@@ -1205,7 +1259,7 @@ def test_stuck_in_dead_zone_after_load_drop(self):
12051259
after=step_function_cost(
12061260
step_time_s=60.0,
12071261
before=constant_overhead_cost(overhead_s=2.0, per_second_cost=0.3),
1208-
after=constant_overhead_cost(overhead_s=0.5, per_second_cost=0.6),
1262+
after=constant_overhead_cost(overhead_s=0.5, per_second_cost=0.55),
12091263
),
12101264
)
12111265

@@ -1215,9 +1269,9 @@ def test_stuck_in_dead_zone_after_load_drop(self):
12151269
f"Precondition: must reach level {lim['min_level_during_load']}+ "
12161270
f"during severe phase (reached {result.max_level})"
12171271
)
1218-
# Documents the limitation: batcher stays at level 2 despite level 1
1272+
# Documents the limitation: batcher stays at level 3 despite level 1 or 2
12191273
# being sufficient. If the strategy is improved to probe lower levels,
1220-
# this assertion should change to max_final_level: 1.
1274+
# this assertion should change to max_final_level: 2.
12211275
assert result.final_level >= lim["min_final_level"], (
12221276
f"Final level {result.final_level} — expected to stay stuck "
12231277
f"at level {lim['min_final_level']}+ (dead zone)"

tests/core/message_batcher_test.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -565,21 +565,49 @@ def test_deescalates_under_sustained_light_load(self):
565565
_escalate_to_level(batcher, 2)
566566
assert batcher.state.level == 2
567567

568-
# Report underloaded batches (processing < 75% of 4s window)
568+
# Report underloaded batches (processing < 70% of 4s window)
569569
underloaded_time = batcher.batch_length_s * DEESCALATION_HEADROOM_RATIO - 0.1
570570
for _ in range(DEESCALATION_UNDERLOAD_THRESHOLD):
571571
batcher.report_batch(100, processing_time_s=underloaded_time)
572572

573573
assert batcher.state.level == 1
574574

575+
def test_transition_report_classified_against_new_window(self):
576+
"""Documents issue #877, Finding 2: the first report after a level
577+
change is classified against the *new* window, although the inner
578+
batcher's active batch was built with the *old* one.
579+
580+
After escalating 1s->2s, a batch that was overloaded at its old 1s
581+
window (processing 1.1s) is counted as underloaded against the new 2s
582+
threshold (1.1 < 0.70 * 2.0), so it contributes a spurious de-escalation
583+
vote: two further genuine underloads then suffice to de-escalate, rather
584+
than the three an old-window-aware classifier would require.
585+
586+
In isolation this only wastes one report — the next steady cycle resets
587+
the counter — so it is left as a documented limitation. The faithful
588+
fix is to classify each report against the window its batch ran under.
589+
"""
590+
batcher = AdaptiveMessageBatcher(base_batch_length_s=1.0, max_level=3)
591+
for _ in range(ESCALATION_OVERLOAD_THRESHOLD):
592+
batcher.report_batch(100, processing_time_s=1.1)
593+
assert batcher.state.level == 2
594+
595+
# Transition batch: 1.1s was an overload at the old 1s window, but is
596+
# mis-counted as underload #1 here.
597+
batcher.report_batch(100, processing_time_s=1.1)
598+
batcher.report_batch(100, processing_time_s=0.2)
599+
assert batcher.state.level == 2, "precondition: not yet de-escalated"
600+
batcher.report_batch(100, processing_time_s=0.2)
601+
assert batcher.state.level == 1
602+
575603
def test_does_not_deescalate_without_enough_headroom(self):
576604
"""No de-escalation when processing uses most of the window."""
577605
batcher = AdaptiveMessageBatcher(base_batch_length_s=1.0, max_level=2)
578606
_escalate_to_level(batcher, 2)
579607
assert batcher.state.level == 2
580608
window = batcher.batch_length_s
581609

582-
# Processing at 80% of window — above headroom threshold (75%)
610+
# Processing at 80% of window — above headroom threshold (70%)
583611
for _ in range(DEESCALATION_UNDERLOAD_THRESHOLD * 3):
584612
batcher.report_batch(100, processing_time_s=window * 0.8)
585613

0 commit comments

Comments
 (0)