From c396d58725ea0a8703bf602e6126c5e36a3604f2 Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Thu, 23 Apr 2026 16:57:21 -0500 Subject: [PATCH 1/3] fix: replace asyncio.sleep FAF guards with deterministic awaits Bumps cpex to 0.1.0.dev12, which exposes PluginResult.background_tasks and wait_for_background_tasks(). Uses these to eliminate all racy asyncio.sleep() calls that were guarding FIRE_AND_FORGET plugin completion in tests. - Add drain_background_tasks() to mellea/plugins/manager.py: accumulates PluginResult objects with background tasks in invoke_hook(), drains via wait_for_background_tasks() in tests - test/plugins/test_execution_modes.py: capture invoke_hook result and call result.wait_for_background_tasks() directly - test/telemetry/test_metrics_backend.py: replace 7x asyncio.sleep(0.05) with drain_background_tasks() Closes #691 Assisted-by: Claude Code Signed-off-by: Alex Bozarth --- mellea/plugins/manager.py | 16 ++++++++++++++++ pyproject.toml | 2 +- test/plugins/test_execution_modes.py | 14 ++++++-------- test/telemetry/test_metrics_backend.py | 23 ++++++++--------------- uv.lock | 9 +++++---- 5 files changed, 36 insertions(+), 28 deletions(-) diff --git a/mellea/plugins/manager.py b/mellea/plugins/manager.py index f29b7eb2a..bb7a17b72 100644 --- a/mellea/plugins/manager.py +++ b/mellea/plugins/manager.py @@ -26,11 +26,24 @@ _plugin_manager: Any | None = None _plugins_enabled: bool = False _session_tags: dict[str, set[str]] = {} # session_id -> set of plugin names +_pending_background_results: list[Any] = [] DEFAULT_PLUGIN_TIMEOUT: int = 5 # seconds DEFAULT_HOOK_POLICY: Literal["allow"] | Literal["deny"] = "deny" +async def drain_background_tasks() -> None: + """Await all accumulated FIRE_AND_FORGET tasks and clear the pending list. + + Call this in tests after any operation that may have triggered FAF plugins, + to ensure side effects (metrics recording, etc.) complete before assertions. + """ + global _pending_background_results + pending, _pending_background_results = _pending_background_results, [] + for result in pending: + await result.wait_for_background_tasks() + + def has_plugins(hook_type: HookType | None = None) -> bool: """Fast check: are plugins configured and available for the given hook type. @@ -229,6 +242,9 @@ async def invoke_hook( violations_as_exceptions=False, ) + if result and result.background_tasks: + _pending_background_results.append(result) + if result and not result.continue_processing and result.violation: v = result.violation logger.warning( diff --git a/pyproject.toml b/pyproject.toml index 0b6d3c9ff..5e30a2436 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,7 +105,7 @@ sandbox = [ backends = ["mellea[watsonx,hf,litellm]"] hooks = [ - "cpex>=0.1.0.dev10; python_version >= '3.11'", + "cpex>=0.1.0.dev12; python_version >= '3.11'", "grpcio>=1.78.0", ] diff --git a/test/plugins/test_execution_modes.py b/test/plugins/test_execution_modes.py index 5167943f4..6083c3b36 100644 --- a/test/plugins/test_execution_modes.py +++ b/test/plugins/test_execution_modes.py @@ -21,8 +21,6 @@ from __future__ import annotations -import asyncio - import pytest pytest.importorskip("cpex.framework") @@ -344,11 +342,10 @@ async def faf_observer(payload, ctx): register(faf_observer) - await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload()) + result, _ = await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload()) - # The hook runs as a background asyncio task; yield to the event loop to - # allow it to complete before asserting. - await asyncio.sleep(0.05) + assert result is not None + await result.wait_for_background_tasks() assert invocations == ["fired"] @pytest.mark.asyncio @@ -413,9 +410,10 @@ async def enforce_second(payload, ctx): register(faf_first) register(enforce_second) - await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload()) + result, _ = await invoke_hook(HookType.SESSION_PRE_INIT, _session_payload()) - await asyncio.sleep(0.05) + assert result is not None + await result.wait_for_background_tasks() assert order == ["enforce", "faf"] @pytest.mark.asyncio diff --git a/test/telemetry/test_metrics_backend.py b/test/telemetry/test_metrics_backend.py index 95a40b5f7..b519cbe09 100644 --- a/test/telemetry/test_metrics_backend.py +++ b/test/telemetry/test_metrics_backend.py @@ -3,7 +3,6 @@ Tests that backends correctly record token metrics through the telemetry system. """ -import asyncio import os import pytest @@ -12,6 +11,7 @@ IBM_GRANITE_4_HYBRID_MICRO, IBM_GRANITE_4_HYBRID_SMALL, ) +from mellea.plugins.manager import drain_background_tasks from mellea.stdlib.components import Message from mellea.stdlib.context import SimpleContext from test.predicates import require_api_key, require_gpu @@ -169,8 +169,7 @@ async def test_ollama_token_metrics_integration(enable_metrics, metric_reader, s await mot.avalue() # Force metrics export and collection - # Yield to event loop so FIRE_AND_FORGET plugin tasks complete - await asyncio.sleep(0.05) + await drain_background_tasks() provider.force_flush() metrics_data = metric_reader.get_metrics_data() @@ -235,8 +234,7 @@ async def test_openai_token_metrics_integration(enable_metrics, metric_reader, s await mot.astream() await mot.avalue() - # Yield to event loop so FIRE_AND_FORGET plugin tasks complete - await asyncio.sleep(0.05) + await drain_background_tasks() provider.force_flush() metrics_data = metric_reader.get_metrics_data() @@ -290,8 +288,7 @@ async def test_watsonx_token_metrics_integration(enable_metrics, metric_reader): ) await mot.avalue() - # Yield to event loop so FIRE_AND_FORGET plugin tasks complete - await asyncio.sleep(0.05) + await drain_background_tasks() provider.force_flush() metrics_data = metric_reader.get_metrics_data() @@ -354,8 +351,7 @@ async def test_litellm_token_metrics_integration( await mot.astream() await mot.avalue() - # Yield to event loop so FIRE_AND_FORGET plugin tasks complete - await asyncio.sleep(0.05) + await drain_background_tasks() provider.force_flush() metrics_data = metric_reader.get_metrics_data() @@ -413,8 +409,7 @@ async def test_huggingface_token_metrics_integration( await mot.astream() await mot.avalue() - # Yield to event loop so FIRE_AND_FORGET plugin tasks complete - await asyncio.sleep(0.05) + await drain_background_tasks() provider.force_flush() metrics_data = metric_reader.get_metrics_data() @@ -470,8 +465,7 @@ async def test_error_metrics_on_backend_failure(enable_metrics, metric_reader): with pytest.raises(Exception): await mot.avalue() - # Yield to event loop so FIRE_AND_FORGET plugin task completes - await asyncio.sleep(0.05) + await drain_background_tasks() provider.force_flush() metrics_data = metric_reader.get_metrics_data() @@ -508,8 +502,7 @@ async def test_ollama_sampling_metrics_integration(enable_metrics, metric_reader action=Instruction("Say hello"), context=ctx, backend=backend, requirements=None ) - # Yield to event loop so FIRE_AND_FORGET plugin tasks complete - await asyncio.sleep(0.05) + await drain_background_tasks() provider.force_flush() metrics_data = metric_reader.get_metrics_data() diff --git a/uv.lock b/uv.lock index 18e87d7cb..ef9d2c29f 100644 --- a/uv.lock +++ b/uv.lock @@ -749,7 +749,7 @@ toml = [ [[package]] name = "cpex" -version = "0.1.0.dev11" +version = "0.1.0.dev12" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "fastapi" }, @@ -757,15 +757,16 @@ dependencies = [ { name = "jinja2" }, { name = "mcp" }, { name = "orjson" }, + { name = "packaging" }, { name = "prometheus-client" }, { name = "prometheus-fastapi-instrumentator" }, { name = "pydantic" }, { name = "pydantic-settings" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/84/4c/c462d23f98b7b388dd6642d71702f5f47d120c0df471a22add22ad99fe69/cpex-0.1.0.dev11.tar.gz", hash = "sha256:c7c30650fd49fdae7ec67f46c1d57486db090ae79ec39a6ed2f8ed990b78f6e5", size = 754645, upload-time = "2026-04-13T21:02:26.901Z" } +sdist = { url = "https://files.pythonhosted.org/packages/0d/f6/d5a194b338b3d55b1b9b8619baafa504ae8146168cf4b91fcefa95811a16/cpex-0.1.0.dev12.tar.gz", hash = "sha256:9fb08e0fa27236747c26c841260951a83252029c0e55a7550c65a060473f200c", size = 3475629, upload-time = "2026-04-23T17:34:14.434Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/1f/5e/d051f25b6bb2241f61f710ef0a127cbdd4b8ca2612d8b777d4164347e1f6/cpex-0.1.0.dev11-py3-none-any.whl", hash = "sha256:c34d23f16191744e98415cdf850d981e377399a2d802abda565c276f217ae380", size = 167516, upload-time = "2026-04-13T21:02:25.687Z" }, + { url = "https://files.pythonhosted.org/packages/39/ed/f70537bd8adbf1f847a703e02c3abd2cdc53dfa87e44977aa25dd163774b/cpex-0.1.0.dev12-py3-none-any.whl", hash = "sha256:5c10688b6f7ca8c3673fce9dfd94d0b3a348e0e63566546ced5068574a38403e", size = 236654, upload-time = "2026-04-23T17:34:12.592Z" }, ] [[package]] @@ -3334,7 +3335,7 @@ typecheck = [ requires-dist = [ { name = "accelerate", marker = "extra == 'hf'", specifier = ">=1.9.0" }, { name = "boto3", marker = "extra == 'litellm'" }, - { name = "cpex", marker = "python_full_version >= '3.11' and extra == 'hooks'", specifier = ">=0.1.0.dev10" }, + { name = "cpex", marker = "python_full_version >= '3.11' and extra == 'hooks'", specifier = ">=0.1.0.dev12" }, { name = "datasets", marker = "extra == 'hf'", specifier = ">=4.0.0" }, { name = "docling", marker = "extra == 'docling'", specifier = ">=2.45.0" }, { name = "elasticsearch", marker = "extra == 'granite-retriever'", specifier = ">=8.0.0,<9.0.0" }, From e755cccc945f6bfe5b8d468abab69a14fba9bd99 Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Fri, 24 Apr 2026 11:29:42 -0500 Subject: [PATCH 2/3] fix: isolate FAF plugin results per test to prevent cross-loop errors Add discard_background_tasks() to clear accumulated FIRE_AND_FORGET results without awaiting them, and call it in the enable_metrics fixture to discard stale results from previous test event loops before each test. Also clears _pending_background_results in shutdown_plugins() for completeness. Assisted-by: Claude Code Signed-off-by: Alex Bozarth --- mellea/plugins/manager.py | 10 ++++++++++ test/telemetry/test_metrics_backend.py | 4 +++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/mellea/plugins/manager.py b/mellea/plugins/manager.py index bb7a17b72..a7b294a3a 100644 --- a/mellea/plugins/manager.py +++ b/mellea/plugins/manager.py @@ -44,6 +44,15 @@ async def drain_background_tasks() -> None: await result.wait_for_background_tasks() +def discard_background_tasks() -> None: + """Discard all accumulated FIRE_AND_FORGET tasks without awaiting them. + + Call this in test fixtures to clear stale results from a previous event + loop before running the next test. + """ + _pending_background_results.clear() + + def has_plugins(hook_type: HookType | None = None) -> bool: """Fast check: are plugins configured and available for the given hook type. @@ -156,6 +165,7 @@ async def shutdown_plugins() -> None: _plugin_manager = None _plugins_enabled = False _session_tags.clear() + _pending_background_results.clear() def track_session_plugin(session_id: str, plugin_name: str) -> None: diff --git a/test/telemetry/test_metrics_backend.py b/test/telemetry/test_metrics_backend.py index b519cbe09..4bf489022 100644 --- a/test/telemetry/test_metrics_backend.py +++ b/test/telemetry/test_metrics_backend.py @@ -11,7 +11,7 @@ IBM_GRANITE_4_HYBRID_MICRO, IBM_GRANITE_4_HYBRID_SMALL, ) -from mellea.plugins.manager import drain_background_tasks +from mellea.plugins.manager import discard_background_tasks, drain_background_tasks from mellea.stdlib.components import Message from mellea.stdlib.context import SimpleContext from test.predicates import require_api_key, require_gpu @@ -41,6 +41,7 @@ def metric_reader(): @pytest.fixture def enable_metrics(monkeypatch): """Enable metrics for tests.""" + discard_background_tasks() monkeypatch.setenv("MELLEA_METRICS_ENABLED", "true") # Force reload of metrics module to pick up env vars import importlib @@ -52,6 +53,7 @@ def enable_metrics(monkeypatch): # Reset after test monkeypatch.setenv("MELLEA_METRICS_ENABLED", "false") importlib.reload(mellea.telemetry.metrics) + discard_background_tasks() @pytest.fixture(scope="module") From 31fd5255492d1b14b1a39191050cfd1ce4bec96e Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Mon, 27 Apr 2026 12:01:28 -0500 Subject: [PATCH 3/3] fix: gate background result collection behind opt-in flag Adds `_collect_background_results` boolean (default False) so `_pending_background_results` is never populated in production. Collection is enabled/disabled explicitly by test fixtures via the new `enable_background_collection()` / `disable_background_collection()` helpers, eliminating the unbounded growth on metrics-enabled servers. Also replaces "FAF" abbreviation in docstrings with "fire-and-forget" for clarity. Assisted-by: Claude Code Signed-off-by: Alex Bozarth --- mellea/plugins/manager.py | 18 ++++++++++++++++-- test/telemetry/test_metrics_backend.py | 10 ++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/mellea/plugins/manager.py b/mellea/plugins/manager.py index a7b294a3a..e196a0a25 100644 --- a/mellea/plugins/manager.py +++ b/mellea/plugins/manager.py @@ -27,15 +27,29 @@ _plugins_enabled: bool = False _session_tags: dict[str, set[str]] = {} # session_id -> set of plugin names _pending_background_results: list[Any] = [] +_collect_background_results: bool = False # opt-in; only tests enable this DEFAULT_PLUGIN_TIMEOUT: int = 5 # seconds DEFAULT_HOOK_POLICY: Literal["allow"] | Literal["deny"] = "deny" +def enable_background_collection() -> None: + """Enable fire-and-forget result collection. Call in test fixtures before each test.""" + global _collect_background_results + _collect_background_results = True + + +def disable_background_collection() -> None: + """Disable fire-and-forget result collection and clear any accumulated results.""" + global _collect_background_results, _pending_background_results + _collect_background_results = False + _pending_background_results = [] + + async def drain_background_tasks() -> None: """Await all accumulated FIRE_AND_FORGET tasks and clear the pending list. - Call this in tests after any operation that may have triggered FAF plugins, + Call this in tests after any operation that may have triggered fire-and-forget plugins, to ensure side effects (metrics recording, etc.) complete before assertions. """ global _pending_background_results @@ -252,7 +266,7 @@ async def invoke_hook( violations_as_exceptions=False, ) - if result and result.background_tasks: + if _collect_background_results and result and result.background_tasks: _pending_background_results.append(result) if result and not result.continue_processing and result.violation: diff --git a/test/telemetry/test_metrics_backend.py b/test/telemetry/test_metrics_backend.py index 4bf489022..5ce1db66b 100644 --- a/test/telemetry/test_metrics_backend.py +++ b/test/telemetry/test_metrics_backend.py @@ -11,7 +11,12 @@ IBM_GRANITE_4_HYBRID_MICRO, IBM_GRANITE_4_HYBRID_SMALL, ) -from mellea.plugins.manager import discard_background_tasks, drain_background_tasks +from mellea.plugins.manager import ( + disable_background_collection, + discard_background_tasks, + drain_background_tasks, + enable_background_collection, +) from mellea.stdlib.components import Message from mellea.stdlib.context import SimpleContext from test.predicates import require_api_key, require_gpu @@ -41,6 +46,7 @@ def metric_reader(): @pytest.fixture def enable_metrics(monkeypatch): """Enable metrics for tests.""" + enable_background_collection() discard_background_tasks() monkeypatch.setenv("MELLEA_METRICS_ENABLED", "true") # Force reload of metrics module to pick up env vars @@ -53,7 +59,7 @@ def enable_metrics(monkeypatch): # Reset after test monkeypatch.setenv("MELLEA_METRICS_ENABLED", "false") importlib.reload(mellea.telemetry.metrics) - discard_background_tasks() + disable_background_collection() @pytest.fixture(scope="module")