Skip to content

Commit bd7fa38

Browse files
authored
fix: replace asyncio.sleep FAF guards with deterministic awaits (#919)
* fix: replace asyncio.sleep FAF guards with deterministic awaits Bumps cpex to 0.1.0.dev12, which exposes PluginResult.background_tasks and wait_for_background_tasks(). Uses these to eliminate all racy asyncio.sleep() calls that were guarding FIRE_AND_FORGET plugin completion in tests. - Add drain_background_tasks() to mellea/plugins/manager.py: accumulates PluginResult objects with background tasks in invoke_hook(), drains via wait_for_background_tasks() in tests - test/plugins/test_execution_modes.py: capture invoke_hook result and call result.wait_for_background_tasks() directly - test/telemetry/test_metrics_backend.py: replace 7x asyncio.sleep(0.05) with drain_background_tasks() Closes #691 Assisted-by: Claude Code Signed-off-by: Alex Bozarth <ajbozart@us.ibm.com> * fix: isolate FAF plugin results per test to prevent cross-loop errors Add discard_background_tasks() to clear accumulated FIRE_AND_FORGET results without awaiting them, and call it in the enable_metrics fixture to discard stale results from previous test event loops before each test. Also clears _pending_background_results in shutdown_plugins() for completeness. Assisted-by: Claude Code Signed-off-by: Alex Bozarth <ajbozart@us.ibm.com> * fix: gate background result collection behind opt-in flag Adds `_collect_background_results` boolean (default False) so `_pending_background_results` is never populated in production. Collection is enabled/disabled explicitly by test fixtures via the new `enable_background_collection()` / `disable_background_collection()` helpers, eliminating the unbounded growth on metrics-enabled servers. Also replaces "FAF" abbreviation in docstrings with "fire-and-forget" for clarity. Assisted-by: Claude Code Signed-off-by: Alex Bozarth <ajbozart@us.ibm.com> --------- Signed-off-by: Alex Bozarth <ajbozart@us.ibm.com>
1 parent 790c319 commit bd7fa38

5 files changed

Lines changed: 68 additions & 28 deletions

File tree

mellea/plugins/manager.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,47 @@
2626
_plugin_manager: Any | None = None
2727
_plugins_enabled: bool = False
2828
_session_tags: dict[str, set[str]] = {} # session_id -> set of plugin names
29+
_pending_background_results: list[Any] = []
30+
_collect_background_results: bool = False # opt-in; only tests enable this
2931

3032
DEFAULT_PLUGIN_TIMEOUT: int = 5 # seconds
3133
DEFAULT_HOOK_POLICY: Literal["allow"] | Literal["deny"] = "deny"
3234

3335

36+
def enable_background_collection() -> None:
37+
"""Enable fire-and-forget result collection. Call in test fixtures before each test."""
38+
global _collect_background_results
39+
_collect_background_results = True
40+
41+
42+
def disable_background_collection() -> None:
43+
"""Disable fire-and-forget result collection and clear any accumulated results."""
44+
global _collect_background_results, _pending_background_results
45+
_collect_background_results = False
46+
_pending_background_results = []
47+
48+
49+
async def drain_background_tasks() -> None:
50+
"""Await all accumulated FIRE_AND_FORGET tasks and clear the pending list.
51+
52+
Call this in tests after any operation that may have triggered fire-and-forget plugins,
53+
to ensure side effects (metrics recording, etc.) complete before assertions.
54+
"""
55+
global _pending_background_results
56+
pending, _pending_background_results = _pending_background_results, []
57+
for result in pending:
58+
await result.wait_for_background_tasks()
59+
60+
61+
def discard_background_tasks() -> None:
62+
"""Discard all accumulated FIRE_AND_FORGET tasks without awaiting them.
63+
64+
Call this in test fixtures to clear stale results from a previous event
65+
loop before running the next test.
66+
"""
67+
_pending_background_results.clear()
68+
69+
3470
def has_plugins(hook_type: HookType | None = None) -> bool:
3571
"""Fast check: are plugins configured and available for the given hook type.
3672
@@ -143,6 +179,7 @@ async def shutdown_plugins() -> None:
143179
_plugin_manager = None
144180
_plugins_enabled = False
145181
_session_tags.clear()
182+
_pending_background_results.clear()
146183

147184

148185
def track_session_plugin(session_id: str, plugin_name: str) -> None:
@@ -229,6 +266,9 @@ async def invoke_hook(
229266
violations_as_exceptions=False,
230267
)
231268

269+
if _collect_background_results and result and result.background_tasks:
270+
_pending_background_results.append(result)
271+
232272
if result and not result.continue_processing and result.violation:
233273
v = result.violation
234274
logger.warning(

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ sandbox = [
105105
backends = ["mellea[watsonx,hf,litellm]"]
106106

107107
hooks = [
108-
"cpex>=0.1.0.dev10; python_version >= '3.11'",
108+
"cpex>=0.1.0.dev12; python_version >= '3.11'",
109109
"grpcio>=1.78.0",
110110
]
111111

test/plugins/test_execution_modes.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
from __future__ import annotations
2323

24-
import asyncio
25-
2624
import pytest
2725

2826
pytest.importorskip("cpex.framework")
@@ -344,11 +342,10 @@ async def faf_observer(payload, ctx):
344342

345343
register(faf_observer)
346344

347-
await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload())
345+
result, _ = await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload())
348346

349-
# The hook runs as a background asyncio task; yield to the event loop to
350-
# allow it to complete before asserting.
351-
await asyncio.sleep(0.05)
347+
assert result is not None
348+
await result.wait_for_background_tasks()
352349
assert invocations == ["fired"]
353350

354351
@pytest.mark.asyncio
@@ -413,9 +410,10 @@ async def enforce_second(payload, ctx):
413410
register(faf_first)
414411
register(enforce_second)
415412

416-
await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload())
413+
result, _ = await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload())
417414

418-
await asyncio.sleep(0.05)
415+
assert result is not None
416+
await result.wait_for_background_tasks()
419417
assert order == ["enforce", "faf"]
420418

421419
@pytest.mark.asyncio

test/telemetry/test_metrics_backend.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
Tests that backends correctly record token metrics through the telemetry system.
44
"""
55

6-
import asyncio
76
import os
87

98
import pytest
@@ -12,6 +11,12 @@
1211
IBM_GRANITE_4_HYBRID_MICRO,
1312
IBM_GRANITE_4_HYBRID_SMALL,
1413
)
14+
from mellea.plugins.manager import (
15+
disable_background_collection,
16+
discard_background_tasks,
17+
drain_background_tasks,
18+
enable_background_collection,
19+
)
1520
from mellea.stdlib.components import Message
1621
from mellea.stdlib.context import SimpleContext
1722
from test.predicates import require_api_key, require_gpu
@@ -41,6 +46,8 @@ def metric_reader():
4146
@pytest.fixture
4247
def enable_metrics(monkeypatch):
4348
"""Enable metrics for tests."""
49+
enable_background_collection()
50+
discard_background_tasks()
4451
monkeypatch.setenv("MELLEA_METRICS_ENABLED", "true")
4552
# Force reload of metrics module to pick up env vars
4653
import importlib
@@ -52,6 +59,7 @@ def enable_metrics(monkeypatch):
5259
# Reset after test
5360
monkeypatch.setenv("MELLEA_METRICS_ENABLED", "false")
5461
importlib.reload(mellea.telemetry.metrics)
62+
disable_background_collection()
5563

5664

5765
@pytest.fixture(scope="module")
@@ -169,8 +177,7 @@ async def test_ollama_token_metrics_integration(enable_metrics, metric_reader, s
169177
await mot.avalue()
170178

171179
# Force metrics export and collection
172-
# Yield to event loop so FIRE_AND_FORGET plugin tasks complete
173-
await asyncio.sleep(0.05)
180+
await drain_background_tasks()
174181
provider.force_flush()
175182
metrics_data = metric_reader.get_metrics_data()
176183

@@ -235,8 +242,7 @@ async def test_openai_token_metrics_integration(enable_metrics, metric_reader, s
235242
await mot.astream()
236243
await mot.avalue()
237244

238-
# Yield to event loop so FIRE_AND_FORGET plugin tasks complete
239-
await asyncio.sleep(0.05)
245+
await drain_background_tasks()
240246
provider.force_flush()
241247
metrics_data = metric_reader.get_metrics_data()
242248

@@ -290,8 +296,7 @@ async def test_watsonx_token_metrics_integration(enable_metrics, metric_reader):
290296
)
291297
await mot.avalue()
292298

293-
# Yield to event loop so FIRE_AND_FORGET plugin tasks complete
294-
await asyncio.sleep(0.05)
299+
await drain_background_tasks()
295300
provider.force_flush()
296301
metrics_data = metric_reader.get_metrics_data()
297302

@@ -354,8 +359,7 @@ async def test_litellm_token_metrics_integration(
354359
await mot.astream()
355360
await mot.avalue()
356361

357-
# Yield to event loop so FIRE_AND_FORGET plugin tasks complete
358-
await asyncio.sleep(0.05)
362+
await drain_background_tasks()
359363
provider.force_flush()
360364
metrics_data = metric_reader.get_metrics_data()
361365

@@ -413,8 +417,7 @@ async def test_huggingface_token_metrics_integration(
413417
await mot.astream()
414418
await mot.avalue()
415419

416-
# Yield to event loop so FIRE_AND_FORGET plugin tasks complete
417-
await asyncio.sleep(0.05)
420+
await drain_background_tasks()
418421
provider.force_flush()
419422
metrics_data = metric_reader.get_metrics_data()
420423

@@ -470,8 +473,7 @@ async def test_error_metrics_on_backend_failure(enable_metrics, metric_reader):
470473
with pytest.raises(Exception):
471474
await mot.avalue()
472475

473-
# Yield to event loop so FIRE_AND_FORGET plugin task completes
474-
await asyncio.sleep(0.05)
476+
await drain_background_tasks()
475477
provider.force_flush()
476478
metrics_data = metric_reader.get_metrics_data()
477479

@@ -508,8 +510,7 @@ async def test_ollama_sampling_metrics_integration(enable_metrics, metric_reader
508510
action=Instruction("Say hello"), context=ctx, backend=backend, requirements=None
509511
)
510512

511-
# Yield to event loop so FIRE_AND_FORGET plugin tasks complete
512-
await asyncio.sleep(0.05)
513+
await drain_background_tasks()
513514
provider.force_flush()
514515
metrics_data = metric_reader.get_metrics_data()
515516

uv.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)