Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/ess/livedata/core/message_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,18 @@ def _split_messages(

ESCALATION_OVERLOAD_THRESHOLD = 2
ESCALATION_HALF_STEPS = 2
DEESCALATION_HEADROOM_RATIO = 0.75
# A batch is "underloaded" (de-escalation candidate) when its processing time is
# below this fraction of the window; the stable "dead zone" is therefore
# [ratio, 1.0] in utilisation. De-escalating one half-step shrinks the window by
# 1/sqrt(2), i.e. raises utilisation by the consecutive-level ratio. For every
# workload to have a stable level (utilisation landing in the dead zone at *some*
# level) the dead zone must span one such step: ratio <= 1 / (level ratio). The
# nominal ratio is sqrt(2) ~ 1.414, but pulse-quantization rounds windows so the
# widest consecutive ratio is ~1.43 (e.g. round(14*sqrt(2))/14 = 20/14). Hence
# ratio <= 1/1.43 ~ 0.70. A larger value (e.g. 0.75) leaves a gap where a
# workload is underloaded at every escalated level yet overloaded one step down,
# causing steady oscillation.
DEESCALATION_HEADROOM_RATIO = 0.70
DEESCALATION_UNDERLOAD_THRESHOLD = 3
DEESCALATION_IDLE_WINDOWS = 3

Expand Down
88 changes: 71 additions & 17 deletions tests/core/adaptive_batching_scenarios_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@
"min_level": 1,
"min_final_level": 1,
},
# -- No-fixed-point load (issue #877, Finding 1) ----------------------
# A flat ~1.05s processing time overloads at level 0 (1.0s) but fits with
# headroom at every escalated level. The dead zone must be wide enough to
# give it a stable level; otherwise it oscillates 0->2->1->0 forever.
"overhead_dominated_no_fixed_point": {
"min_level_reached": 1,
"max_oscillations": 1,
"min_final_level": 1,
},
# -- Creeping overload ------------------------------------------------
"creeping_overload": {
"min_level_reached": 4,
Expand Down Expand Up @@ -714,7 +723,7 @@ def test_no_escalation_with_gc_jitter(self, seed):

Processing is fast on average (0.3s) but with high jitter
(std = 1.2 * mean = 0.36s) that regularly sends individual batches
into the dead zone (75-100% of window) and occasionally past the
into the dead zone (70-100% of window) and occasionally past the
window entirely (~4 overloaded cycles per 120s run).

The batcher must tolerate these isolated spikes because its
Expand Down Expand Up @@ -782,8 +791,8 @@ def test_boundary_jitter_escalates_and_sticks(self):
P(2 consecutive overloaded) ~ 25%, so escalation is very likely.

At level 2 (2s window): 0.5 + 1.0 = 1.5s mean (75% utilization).
At the dead-zone boundary (>= 75%), so de-escalation never triggers.
Documents limitation: once escalated, stays stuck due to dead zone.
Comfortably within the dead zone (>= 70%), so de-escalation never
triggers. Documents limitation: once escalated, stays stuck.
"""
lim = LIMITS["boundary_jitter"]
batcher, clock = make_default_batcher()
Expand All @@ -810,6 +819,51 @@ def test_boundary_jitter_escalates_and_sticks(self):
)


class TestNoFixedPointLoad:
"""Overhead-dominated load with no naturally stable level (issue #877).

A flat processing time that overloads at the base window but fits with
headroom one or more steps up has no level where utilization lands in the
dead zone — unless the dead zone is wide enough to cover a de-escalation
step (a factor of ~sqrt(2) in utilization). With too small a dead zone the
controller escalates, finds every escalated level "underloaded",
de-escalates back until it overloads again, and oscillates indefinitely.
"""

def test_settles_instead_of_oscillating(self):
"""A flat 1.05s load must converge to a stable level, not oscillate.

Regardless of window:
Level 0 (1.0s): 105% -> overloaded.
Level 1 (1.43s): 74% -> within the dead zone (stable).
Level 2 (2.0s): 53% -> underloaded.
"""
lim = LIMITS["overhead_dominated_no_fixed_point"]
batcher, clock = make_default_batcher()

cost = constant_overhead_cost(overhead_s=1.05, per_second_cost=0.0)

result = run_scenario(batcher, 120.0, cost, clock)
assert result.max_level >= lim["min_level_reached"], (
f"Precondition: load must trigger escalation "
f"(reached level {result.max_level})"
)
assert result.oscillation_count() <= lim["max_oscillations"], (
f"Oscillated {result.oscillation_count()} times — no stable level "
f"found (limit: {lim['max_oscillations']})"
)
late = result.cycles_after(60.0)
assert late, "Simulation too short for stabilization check"
late_levels = {c.level for c in late}
assert len(late_levels) == 1, (
f"Not stabilized: levels {sorted(late_levels)} in second half"
)
assert next(iter(late_levels)) >= lim["min_final_level"], (
f"Stabilized at level {next(iter(late_levels))} below the expected "
f"escalated level (>= {lim['min_final_level']})"
)


class TestCreepingOverload:
"""Load that gradually increases past processing capacity."""

Expand Down Expand Up @@ -985,8 +1039,8 @@ def test_partial_deescalation(self):
De-escalates to level 3 (2.83s): 1.8 + 0.57 = 2.37s (84%, dead zone).

Moderate phase (de-escalates from level 3 to level 2):
Level 3 (2.83s): 0.6 + 1.41 = 2.01s (71%, underloaded).
Level 2 (2.0s): 0.6 + 1.0 = 1.6s (80%, dead zone — stuck).
Level 3 (2.83s): 0.45 + 1.43 = 1.88s (66%, underloaded).
Level 2 (2.0s): 0.45 + 1.0 = 1.45s (72.5%, dead zone — stuck).
"""
lim = LIMITS["partial_deescalation"]
batcher, clock = make_default_batcher()
Expand All @@ -997,7 +1051,7 @@ def test_partial_deescalation(self):
after=step_function_cost(
step_time_s=60.0,
before=constant_overhead_cost(overhead_s=1.8, per_second_cost=0.2),
after=constant_overhead_cost(overhead_s=0.6, per_second_cost=0.5),
after=constant_overhead_cost(overhead_s=0.45, per_second_cost=0.5),
),
)

Expand Down Expand Up @@ -1122,7 +1176,7 @@ def test_severe_overload_to_cosmic_background(self):
Level 4 (4.0s): 0.2 + 0.04 = 0.24s (6% utilization).
Level 2 (2.0s): 0.2 + 0.02 = 0.22s (11% utilization).
Level 0 (1.0s): 0.2 + 0.01 = 0.21s (21% utilization).
All levels are well below the 75% headroom threshold.
All levels are well below the 70% headroom threshold.
"""
lim = LIMITS["severe_to_cosmic_background"]
batcher, clock = make_default_batcher()
Expand Down Expand Up @@ -1171,11 +1225,11 @@ def test_fast_escalation_on_clear_overload(self):


class TestDeescalationDeadZone:
"""The 75-100% utilization dead zone where de-escalation cannot trigger.
"""The 70-100% utilization dead zone where de-escalation cannot trigger.

When processing fills 75-100% of the escalated window, it falls in the
When processing fills 70-100% of the escalated window, it falls in the
"in between" zone: not overloaded (processing < window) and not
underloaded (processing >= 0.75 * window). Both consecutive counters
underloaded (processing >= 0.70 * window). Both consecutive counters
are reset every cycle, so neither escalation nor de-escalation can
trigger — even if a lower level would handle the load fine.
"""
Expand All @@ -1191,10 +1245,10 @@ def test_stuck_in_dead_zone_after_load_drop(self):
Level 4 (4.0s): 2.0 + 1.2 = 3.2s (80%, dead zone).

Moderate phase (de-escalates from level 4 to level 3, then stuck):
Level 4 (4.0s): 0.5 + 2.4 = 2.9s (72.5%, underloaded < 75%).
Level 3 (2.83s): 0.5 + 1.7 = 2.2s (78%, dead zone — stuck).
Level 2 (2.0s): 0.5 + 1.2 = 1.7s (would fit at 85%).
Level 0 (1.0s): 0.5 + 0.6 = 1.1s (would be overloaded).
Level 4 (4.0s): 0.5 + 2.2 = 2.7s (67.5%, underloaded).
Level 3 (2.83s): 0.5 + 1.57 = 2.07s (72.5%, dead zone — stuck).
Level 2 (2.0s): 0.5 + 1.1 = 1.6s (would fit at 80%).
Level 0 (1.0s): 0.5 + 0.55 = 1.05s (would be overloaded).
"""
lim = LIMITS["dead_zone_stuck"]
batcher, clock = make_default_batcher()
Expand All @@ -1205,7 +1259,7 @@ def test_stuck_in_dead_zone_after_load_drop(self):
after=step_function_cost(
step_time_s=60.0,
before=constant_overhead_cost(overhead_s=2.0, per_second_cost=0.3),
after=constant_overhead_cost(overhead_s=0.5, per_second_cost=0.6),
after=constant_overhead_cost(overhead_s=0.5, per_second_cost=0.55),
),
)

Expand All @@ -1215,9 +1269,9 @@ def test_stuck_in_dead_zone_after_load_drop(self):
f"Precondition: must reach level {lim['min_level_during_load']}+ "
f"during severe phase (reached {result.max_level})"
)
# Documents the limitation: batcher stays at level 2 despite level 1
# Documents the limitation: batcher stays at level 3 despite level 1 or 2
# being sufficient. If the strategy is improved to probe lower levels,
# this assertion should change to max_final_level: 1.
# this assertion should change to max_final_level: 2.
assert result.final_level >= lim["min_final_level"], (
f"Final level {result.final_level} — expected to stay stuck "
f"at level {lim['min_final_level']}+ (dead zone)"
Expand Down
32 changes: 30 additions & 2 deletions tests/core/message_batcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,21 +565,49 @@ def test_deescalates_under_sustained_light_load(self):
_escalate_to_level(batcher, 2)
assert batcher.state.level == 2

# Report underloaded batches (processing < 75% of 4s window)
# Report underloaded batches (processing < 70% of 4s window)
underloaded_time = batcher.batch_length_s * DEESCALATION_HEADROOM_RATIO - 0.1
for _ in range(DEESCALATION_UNDERLOAD_THRESHOLD):
batcher.report_batch(100, processing_time_s=underloaded_time)

assert batcher.state.level == 1

def test_transition_report_classified_against_new_window(self):
"""Documents issue #877, Finding 2: the first report after a level
change is classified against the *new* window, although the inner
batcher's active batch was built with the *old* one.

After escalating 1s->2s, a batch that was overloaded at its old 1s
window (processing 1.1s) is counted as underloaded against the new 2s
threshold (1.1 < 0.70 * 2.0), so it contributes a spurious de-escalation
vote: two further genuine underloads then suffice to de-escalate, rather
than the three an old-window-aware classifier would require.

In isolation this only wastes one report — the next steady cycle resets
the counter — so it is left as a documented limitation. The faithful
fix is to classify each report against the window its batch ran under.
"""
batcher = AdaptiveMessageBatcher(base_batch_length_s=1.0, max_level=3)
for _ in range(ESCALATION_OVERLOAD_THRESHOLD):
batcher.report_batch(100, processing_time_s=1.1)
assert batcher.state.level == 2

# Transition batch: 1.1s was an overload at the old 1s window, but is
# mis-counted as underload #1 here.
batcher.report_batch(100, processing_time_s=1.1)
batcher.report_batch(100, processing_time_s=0.2)
assert batcher.state.level == 2, "precondition: not yet de-escalated"
batcher.report_batch(100, processing_time_s=0.2)
assert batcher.state.level == 1

def test_does_not_deescalate_without_enough_headroom(self):
"""No de-escalation when processing uses most of the window."""
batcher = AdaptiveMessageBatcher(base_batch_length_s=1.0, max_level=2)
_escalate_to_level(batcher, 2)
assert batcher.state.level == 2
window = batcher.batch_length_s

# Processing at 80% of window — above headroom threshold (75%)
# Processing at 80% of window — above headroom threshold (70%)
for _ in range(DEESCALATION_UNDERLOAD_THRESHOLD * 3):
batcher.report_batch(100, processing_time_s=window * 0.8)

Expand Down
Loading