From d4cb54d632e58a5728934e319fdc91f437617d4d Mon Sep 17 00:00:00 2001 From: Gaodan Fang Date: Wed, 29 Apr 2026 18:56:31 -0400 Subject: [PATCH 1/6] feat(mcp): add user facts tools, SSE transport hardening, and warmup Add store_user_facts/retrieve_user_facts MCP tools for durable user identity. Harden SSE transport by replacing private Starlette API (request._send) with public ASGI-level SseEndpoint class, suppressing CancelledError and teardown assertion races, and bumping graceful shutdown timeout to 3s. Add optional warmup on SSE server boot to reduce first-tool-call latency. Exclude build/ from mypy to fix pre-existing stale artifact errors. Test coverage: warmup disabled/failure paths, auth-enabled SSE route wiring, user facts store/retrieve/validation. Co-Authored-By: Claude Opus 4.6 --- altk_evolve/frontend/mcp/__main__.py | 38 ++++++++-- altk_evolve/frontend/mcp/http_transport.py | 56 +++++++++++--- altk_evolve/frontend/mcp/mcp_server.py | 85 ++++++++++++++++++++++ pyproject.toml | 1 + tests/e2e/test_mcp.py | 31 ++++++++ tests/unit/test_mcp_http_transport.py | 75 ++++++++++++++++++- tests/unit/test_mcp_launcher.py | 44 ++++++++++- tests/unit/test_mcp_server.py | 76 ++++++++++++++++++- 8 files changed, 386 insertions(+), 20 deletions(-) diff --git a/altk_evolve/frontend/mcp/__main__.py b/altk_evolve/frontend/mcp/__main__.py index fcd180f1..5b44fb01 100644 --- a/altk_evolve/frontend/mcp/__main__.py +++ b/altk_evolve/frontend/mcp/__main__.py @@ -1,15 +1,30 @@ import argparse import logging +import os import sys import threading import uvicorn -from altk_evolve.frontend.mcp.mcp_server import mcp, app +from altk_evolve.frontend.mcp.mcp_server import app, get_client, mcp from altk_evolve.frontend.mcp.http_transport import create_resilient_sse_app logger = logging.getLogger("evolve-mcp") +def _is_truthy_env(name: str, default: bool) -> bool: + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() not in {"0", "false", "no", "off"} + + +def warmup_mcp_runtime() -> None: + """Pre-initialize MCP backend state to reduce first-tool-call latency.""" + logger.info("Warming up MCP runtime...") + get_client() + logger.info("MCP runtime warmup complete") + + def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run the Evolve MCP server") parser.add_argument( @@ -42,9 +57,22 @@ def run_api_server(): def run_sse_server(host: str, port: int) -> None: - """Run the MCP server over SSE with disconnect-safe transport handling.""" - sse_app = create_resilient_sse_app(mcp) - uvicorn.run(sse_app, host=host, port=port, log_level="warning") + """Run the MCP server over SSE with disconnect-tolerant teardown.""" + if _is_truthy_env("EVOLVE_MCP_WARMUP", True): + try: + warmup_mcp_runtime() + except Exception as exc: + # Keep startup resilient: failed warmup should not block server boot. + logger.warning("MCP warmup failed; continuing without warmup: %s", exc) + + uvicorn.run( + create_resilient_sse_app(mcp), + host=host, + port=port, + lifespan="on", + timeout_graceful_shutdown=3, + ws="websockets-sansio", + ) def main(): @@ -61,7 +89,7 @@ def main(): # Start FastMCP using stdio (which blocks) mcp.run() else: - run_sse_server(host=args.host, port=args.port) + run_sse_server(args.host, args.port) except KeyboardInterrupt: logger.info("MCP server stopped by user (KeyboardInterrupt)") sys.exit(0) diff --git a/altk_evolve/frontend/mcp/http_transport.py b/altk_evolve/frontend/mcp/http_transport.py index 4aace004..011a3887 100644 --- a/altk_evolve/frontend/mcp/http_transport.py +++ b/altk_evolve/frontend/mcp/http_transport.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging from collections.abc import AsyncGenerator from contextlib import asynccontextmanager @@ -10,7 +11,6 @@ from mcp.server.auth.routes import build_resource_metadata_url from mcp.server.lowlevel.server import LifespanResultT from mcp.server.sse import SseServerTransport -from starlette.requests import Request from starlette.responses import Response from starlette.routing import BaseRoute, Mount, Route @@ -28,6 +28,13 @@ def _is_benign_disconnect_exception(exc: BaseException) -> bool: """Return True when an exception only represents a dropped SSE client.""" if isinstance(exc, (anyio.ClosedResourceError, anyio.BrokenResourceError)): return True + if isinstance(exc, asyncio.CancelledError): + # Uvicorn cancels outstanding request tasks during Ctrl+C shutdown. + return True + if isinstance(exc, AssertionError) and str(exc) == "Request already responded to": + # MCP low-level request responder can assert during SSE teardown races + # when cancellation/close wins over the normal response path. + return True if isinstance(exc, BaseExceptionGroup): return len(exc.exceptions) > 0 and all(_is_benign_disconnect_exception(child) for child in exc.exceptions) @@ -59,6 +66,23 @@ async def _run_sse_session( return True +async def _handle_sse( + server: FastMCP[LifespanResultT], + sse: SseServerTransport, + scope, + receive, + send, +) -> None: + """ + Serve SSE directly as ASGI and avoid sending any follow-up HTTP response. + + `connect_sse(...)(scope, receive, send)` owns the HTTP response lifecycle. + Returning an additional Response after it exits can race with teardown and + trigger duplicate MCP request completion assertions. + """ + await _run_sse_session(server, sse, scope, receive, send) + + def create_resilient_sse_app( server: FastMCP[LifespanResultT], message_path: str | None = None, @@ -78,9 +102,27 @@ def create_resilient_sse_app( sse = SseServerTransport(message_path) - async def handle_sse(scope, receive, send) -> Response: - await _run_sse_session(server, sse, scope, receive, send) - return Response(status_code=204) + async def handle_sse(scope, receive, send) -> None: + await _handle_sse(server, sse, scope, receive, send) + + class SseEndpoint: + """ASGI app wrapping handle_sse that tracks whether a response was started.""" + + async def __call__(self, scope, receive, send) -> None: + response_started = False + + async def tracked_send(message) -> None: + nonlocal response_started + if message.get("type") == "http.response.start": + response_started = True + await send(message) + + await handle_sse(scope, receive, tracked_send) + if not response_started: + response = Response(status_code=204) + await response(scope, receive, send) + + sse_endpoint = SseEndpoint() if auth: auth_middleware = auth.get_middleware() @@ -95,7 +137,7 @@ async def handle_sse(scope, receive, send) -> Response: Route( sse_path, endpoint=RequireAuthMiddleware( - handle_sse, + sse_endpoint, auth.required_scopes, resource_metadata_url, ), @@ -113,10 +155,6 @@ async def handle_sse(scope, receive, send) -> Response: ) ) else: - - async def sse_endpoint(request: Request) -> Response: - return await handle_sse(request.scope, request.receive, request._send) - server_routes.append(Route(sse_path, endpoint=sse_endpoint, methods=["GET"])) server_routes.append(Mount(message_path, app=sse.handle_post_message)) diff --git a/altk_evolve/frontend/mcp/mcp_server.py b/altk_evolve/frontend/mcp/mcp_server.py index 36442da5..eb017bd2 100644 --- a/altk_evolve/frontend/mcp/mcp_server.py +++ b/altk_evolve/frontend/mcp/mcp_server.py @@ -9,6 +9,7 @@ import threading import uuid import os +from typing import Any from fastmcp import FastMCP from fastapi import FastAPI @@ -214,6 +215,22 @@ def get_entities_logic( return "\n".join(response_lines) +def _parse_metadata(metadata: str | None) -> dict[str, Any]: + if not metadata: + return {} + + try: + parsed = json.loads(metadata) + except json.JSONDecodeError as e: + logger.exception(f"Invalid JSON in metadata parameter: {str(e)}") + raise ValueError(f"Failed to parse metadata: {str(e)}") from e + + if not isinstance(parsed, dict): + raise ValueError("Metadata must decode to a JSON object") + + return parsed + + @mcp.tool() def get_entities( task: str, @@ -261,6 +278,74 @@ def get_guidelines( return get_entities_logic(task, "guideline", user_id=user_id, namespace_id=namespace_id, session_id=session_id) +@mcp.tool() +def store_user_facts( + user_id: str, + message: str, + metadata: str | None = None, + enable_conflict_resolution: bool = False, +) -> str: + """Extract and store user facts/preferences for a durable user identity.""" + try: + metadata_dict = _parse_metadata(metadata) + except ValueError as e: + return json.dumps( + { + "error": "Invalid JSON", + "message": str(e), + "invalid_metadata": metadata, + } + ) + + updates = get_client().store_user_facts( + namespace_id=evolve_config.namespace_id, + message=message, + user_id=user_id, + metadata=metadata_dict, + enable_conflict_resolution=enable_conflict_resolution, + ) + + serialized_updates = [ + { + "event": update.event, + "id": update.id, + "type": update.type, + "content": update.content, + "metadata": update.metadata, + } + for update in updates + ] + + return json.dumps( + { + "user_id": user_id, + "stored_count": len(serialized_updates), + "updates": serialized_updates, + } + ) + + +@mcp.tool() +def retrieve_user_facts(user_id: str, query: str | None = None, limit: int = 5) -> str: + """Retrieve categorized user facts/preferences for a durable user identity.""" + categories = get_client().retrieve_user_facts( + namespace_id=evolve_config.namespace_id, + user_id=user_id, + query=query, + limit=limit, + ) + matched_count = sum(len(items) for items in categories.values()) + + return json.dumps( + { + "user_id": user_id, + "query": query, + "matched_count": matched_count, + "categories": categories, + } + ) + + @mcp.tool() def save_trajectory( trajectory_data: str, diff --git a/pyproject.toml b/pyproject.toml index a18bc963..00c9efa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -159,6 +159,7 @@ warn_unused_configs = true disallow_untyped_defs = false explicit_package_bases = true exclude = [ + "build/", "platform-integrations/", "examples/", ] diff --git a/tests/e2e/test_mcp.py b/tests/e2e/test_mcp.py index 51a64492..195f1259 100644 --- a/tests/e2e/test_mcp.py +++ b/tests/e2e/test_mcp.py @@ -219,3 +219,34 @@ async def test_create_entity_with_invalid_json_metadata(mcp): assert result["error"] == "Invalid JSON" assert "message" in result assert "invalid_metadata" in result + + +@pytest.mark.e2e +async def test_store_and_retrieve_user_facts(mcp): + async with Client(transport=mcp) as evolve_mcp: + store_response = await evolve_mcp.call_tool_mcp( + "store_user_facts", + { + "user_id": "user-123", + "message": "I prefer concise answers with bullet points.", + "metadata": json.dumps({"source": "cuga-lite"}), + "enable_conflict_resolution": False, + }, + ) + stored = json.loads(store_response.content[0].text) + assert stored["user_id"] == "user-123" + assert stored["stored_count"] >= 1 + + retrieve_response = await evolve_mcp.call_tool_mcp( + "retrieve_user_facts", + { + "user_id": "user-123", + "query": "How should I format the answer?", + "limit": 5, + }, + ) + retrieved = json.loads(retrieve_response.content[0].text) + + assert retrieved["user_id"] == "user-123" + assert "categories" in retrieved + assert retrieved["matched_count"] >= 0 diff --git a/tests/unit/test_mcp_http_transport.py b/tests/unit/test_mcp_http_transport.py index 7ff4f167..17ada9f9 100644 --- a/tests/unit/test_mcp_http_transport.py +++ b/tests/unit/test_mcp_http_transport.py @@ -1,7 +1,14 @@ +from unittest.mock import MagicMock + import anyio import pytest +from starlette.routing import Mount, Route -from altk_evolve.frontend.mcp.http_transport import _is_benign_disconnect_exception +from altk_evolve.frontend.mcp.http_transport import ( + _is_benign_disconnect_exception, + create_resilient_sse_app, +) +from fastmcp.server.auth.middleware import RequireAuthMiddleware pytestmark = pytest.mark.unit @@ -29,3 +36,69 @@ def test_mixed_exception_group_is_not_benign() -> None: ) assert _is_benign_disconnect_exception(exc) is False + + +def _make_mock_server(): + server = MagicMock() + server.auth = None + server._get_additional_http_routes.return_value = [] + server._lifespan_manager = MagicMock() + return server + + +def _make_mock_auth(sse_path: str = "/sse"): + auth = MagicMock() + auth.required_scopes = ["read"] + auth.get_middleware.return_value = [] + auth.get_routes.return_value = [] + auth._get_resource_url.return_value = f"http://localhost:8000{sse_path}" + return auth + + +def test_create_resilient_sse_app_with_auth_wraps_sse_in_require_auth() -> None: + server = _make_mock_server() + auth = _make_mock_auth() + + app = create_resilient_sse_app(server, auth=auth) + + assert app is not None + + sse_routes = [r for r in app.routes if isinstance(r, Route) and r.path == "/sse"] + assert len(sse_routes) == 1, "Expected exactly one SSE Route at /sse" + assert isinstance(sse_routes[0].endpoint, RequireAuthMiddleware), "SSE endpoint should be wrapped in RequireAuthMiddleware" + + +def test_create_resilient_sse_app_with_auth_wraps_message_in_require_auth() -> None: + server = _make_mock_server() + auth = _make_mock_auth() + + app = create_resilient_sse_app(server, auth=auth) + + message_mounts = [r for r in app.routes if isinstance(r, Mount) and r.path == "/messages"] + assert len(message_mounts) == 1, "Expected exactly one Mount at /messages" + assert isinstance(message_mounts[0].app, RequireAuthMiddleware), "Message mount should be wrapped in RequireAuthMiddleware" + + +def test_create_resilient_sse_app_with_auth_calls_auth_provider_methods() -> None: + server = _make_mock_server() + auth = _make_mock_auth() + + create_resilient_sse_app(server, auth=auth) + + auth.get_middleware.assert_called_once() + auth.get_routes.assert_called_once_with(mcp_path="/sse") + auth._get_resource_url.assert_called_once_with("/sse") + + +def test_create_resilient_sse_app_without_auth_does_not_wrap_endpoints() -> None: + server = _make_mock_server() + + app = create_resilient_sse_app(server, auth=None) + + sse_routes = [r for r in app.routes if isinstance(r, Route) and r.path == "/sse"] + assert len(sse_routes) == 1 + assert not isinstance(sse_routes[0].endpoint, RequireAuthMiddleware), "Without auth, SSE endpoint should not be wrapped" + + message_mounts = [r for r in app.routes if isinstance(r, Mount) and r.path == "/messages"] + assert len(message_mounts) == 1 + assert not isinstance(message_mounts[0].app, RequireAuthMiddleware), "Without auth, message mount should not be wrapped" diff --git a/tests/unit/test_mcp_launcher.py b/tests/unit/test_mcp_launcher.py index 3aca2aaf..a79daf13 100644 --- a/tests/unit/test_mcp_launcher.py +++ b/tests/unit/test_mcp_launcher.py @@ -62,15 +62,53 @@ def start(self) -> None: def test_run_sse_server_uses_resilient_http_transport(monkeypatch) -> None: resilient_app = object() - captured: list[tuple[object, str, int, str]] = [] + captured: list[dict] = [] + warmup_calls: list[bool] = [] monkeypatch.setattr(launcher, "create_resilient_sse_app", lambda server: resilient_app) + monkeypatch.setattr(launcher, "warmup_mcp_runtime", lambda: warmup_calls.append(True)) + monkeypatch.setenv("EVOLVE_MCP_WARMUP", "true") monkeypatch.setattr( launcher.uvicorn, "run", - lambda app, host, port, log_level: captured.append((app, host, port, log_level)), + lambda app, **kwargs: captured.append({"app": app, **kwargs}), ) launcher.run_sse_server(host="127.0.0.1", port=8201) - assert captured == [(resilient_app, "127.0.0.1", 8201, "warning")] + assert len(captured) == 1 + assert captured[0]["app"] is resilient_app + assert captured[0]["host"] == "127.0.0.1" + assert captured[0]["port"] == 8201 + assert captured[0]["lifespan"] == "on" + assert captured[0]["timeout_graceful_shutdown"] == 3 + assert warmup_calls == [True] + + +def test_run_sse_server_skips_warmup_when_disabled(monkeypatch) -> None: + warmup_calls: list[bool] = [] + + monkeypatch.setattr(launcher, "create_resilient_sse_app", lambda server: object()) + monkeypatch.setattr(launcher, "warmup_mcp_runtime", lambda: warmup_calls.append(True)) + monkeypatch.setenv("EVOLVE_MCP_WARMUP", "false") + monkeypatch.setattr(launcher.uvicorn, "run", lambda app, **kwargs: None) + + launcher.run_sse_server(host="127.0.0.1", port=8201) + + assert warmup_calls == [] + + +def test_run_sse_server_boots_despite_warmup_failure(monkeypatch) -> None: + uvicorn_calls: list[bool] = [] + + def failing_warmup() -> None: + raise RuntimeError("warmup exploded") + + monkeypatch.setattr(launcher, "create_resilient_sse_app", lambda server: object()) + monkeypatch.setattr(launcher, "warmup_mcp_runtime", failing_warmup) + monkeypatch.setenv("EVOLVE_MCP_WARMUP", "true") + monkeypatch.setattr(launcher.uvicorn, "run", lambda app, **kwargs: uvicorn_calls.append(True)) + + launcher.run_sse_server(host="127.0.0.1", port=8201) + + assert uvicorn_calls == [True] diff --git a/tests/unit/test_mcp_server.py b/tests/unit/test_mcp_server.py index 15355317..d9dfa8b6 100644 --- a/tests/unit/test_mcp_server.py +++ b/tests/unit/test_mcp_server.py @@ -5,7 +5,12 @@ from unittest.mock import patch, MagicMock import altk_evolve.frontend.mcp.mcp_server as mcp_server_module -from altk_evolve.frontend.mcp.mcp_server import save_trajectory, create_entity +from altk_evolve.frontend.mcp.mcp_server import ( + save_trajectory, + create_entity, + store_user_facts, + retrieve_user_facts, +) from altk_evolve.schema.core import Namespace from altk_evolve.schema.conflict_resolution import EntityUpdate @@ -134,7 +139,7 @@ def test_get_client_uses_idempotent_namespace_bootstrap(monkeypatch): # --------------------------------------------------------------------------- -def test_resolve_namespace_falls_back_to_default(): +def test_resolve_namespace_falls_back_to_default(mock_get_client): """When namespace_id is None, _resolve_namespace returns the configured default.""" original_namespaces = mcp_server_module._initialized_namespaces.copy() try: @@ -264,3 +269,70 @@ def test_save_trajectory_backward_compat_no_extra_params(mock_get_client): # user_id and session_id should NOT be in metadata when not provided assert "user_id" not in traj_entity.metadata assert "session_id" not in traj_entity.metadata + + +# --------------------------------------------------------------------------- +# User facts tests +# --------------------------------------------------------------------------- + + +def test_store_user_facts_returns_structured_payload(mock_get_client): + mock_update = EntityUpdate(id="fact-1", type="fact", content="Prefers concise answers", event="ADD", metadata={"category": "style"}) + mock_get_client.store_user_facts.return_value = [mock_update] + + result = json.loads( + store_user_facts( + user_id="user-123", + message="I prefer concise answers.", + metadata=json.dumps({"source": "cuga-lite"}), + ) + ) + + assert result["user_id"] == "user-123" + assert result["stored_count"] == 1 + assert result["updates"][0]["id"] == "fact-1" + assert result["updates"][0]["metadata"]["category"] == "style" + + +def test_store_user_facts_invalid_metadata_json(mock_get_client): + result = json.loads( + store_user_facts( + user_id="user-123", + message="I prefer concise answers.", + metadata="{bad json", + ) + ) + + assert result["error"] == "Invalid JSON" + assert "invalid_metadata" in result + mock_get_client.store_user_facts.assert_not_called() + + +def test_store_user_facts_empty_message_returns_zero_updates(mock_get_client): + mock_get_client.store_user_facts.return_value = [] + + result = json.loads(store_user_facts(user_id="user-123", message="")) + + assert result["user_id"] == "user-123" + assert result["stored_count"] == 0 + assert result["updates"] == [] + + +def test_retrieve_user_facts_returns_structured_payload(mock_get_client): + mock_get_client.retrieve_user_facts.return_value = { + "style": [ + { + "id": "fact-1", + "content": "Prefers concise answers", + "key": "response_style", + "value": "concise", + } + ] + } + + result = json.loads(retrieve_user_facts(user_id="user-123", query="How should I answer?", limit=5)) + + assert result["user_id"] == "user-123" + assert result["query"] == "How should I answer?" + assert result["matched_count"] == 1 + assert result["categories"]["style"][0]["value"] == "concise" From e2ff23f244e7335ea36d1b5ea250ada2490a84bb Mon Sep 17 00:00:00 2001 From: Gaodan Fang Date: Wed, 29 Apr 2026 19:05:18 -0400 Subject: [PATCH 2/6] fix(mcp): address CodeRabbit review findings - Use logger.warning instead of logger.exception for JSON validation errors - Add missing ws="websockets-sansio" assertion in launcher test Co-Authored-By: Claude Opus 4.6 --- altk_evolve/frontend/mcp/mcp_server.py | 4 ++-- tests/unit/test_mcp_launcher.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/altk_evolve/frontend/mcp/mcp_server.py b/altk_evolve/frontend/mcp/mcp_server.py index eb017bd2..34dfa6f6 100644 --- a/altk_evolve/frontend/mcp/mcp_server.py +++ b/altk_evolve/frontend/mcp/mcp_server.py @@ -222,7 +222,7 @@ def _parse_metadata(metadata: str | None) -> dict[str, Any]: try: parsed = json.loads(metadata) except json.JSONDecodeError as e: - logger.exception(f"Invalid JSON in metadata parameter: {str(e)}") + logger.warning("Invalid JSON in metadata parameter: %s", e) raise ValueError(f"Failed to parse metadata: {str(e)}") from e if not isinstance(parsed, dict): @@ -498,7 +498,7 @@ def create_entity( try: metadata_dict = json.loads(metadata) except json.JSONDecodeError as e: - logger.exception(f"Invalid JSON in metadata parameter: {str(e)}") + logger.warning("Invalid JSON in metadata parameter: %s", e) return json.dumps({"error": "Invalid JSON", "message": f"Failed to parse metadata: {str(e)}", "invalid_metadata": metadata}) if not isinstance(metadata_dict, dict): return json.dumps( diff --git a/tests/unit/test_mcp_launcher.py b/tests/unit/test_mcp_launcher.py index a79daf13..13f87c32 100644 --- a/tests/unit/test_mcp_launcher.py +++ b/tests/unit/test_mcp_launcher.py @@ -82,6 +82,7 @@ def test_run_sse_server_uses_resilient_http_transport(monkeypatch) -> None: assert captured[0]["port"] == 8201 assert captured[0]["lifespan"] == "on" assert captured[0]["timeout_graceful_shutdown"] == 3 + assert captured[0]["ws"] == "websockets-sansio" assert warmup_calls == [True] From c80f1e354fd452d18c380db68a11153cba14b11d Mon Sep 17 00:00:00 2001 From: Gaodan Fang Date: Thu, 30 Apr 2026 14:39:22 -0400 Subject: [PATCH 3/6] fix(mcp): address remaining CodeRabbit review comments Skip 204 fallback after suppressed SSE disconnects by propagating the bool return from _run_sse_session through _handle_sse. Use per-test UUID user_id in e2e fact round-trip test and poll until the stored fact is visible, asserting matched_count > 0. Co-Authored-By: Claude Opus 4.6 --- altk_evolve/frontend/mcp/http_transport.py | 14 +++++---- tests/e2e/test_mcp.py | 34 +++++++++++++--------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/altk_evolve/frontend/mcp/http_transport.py b/altk_evolve/frontend/mcp/http_transport.py index 011a3887..74b497b4 100644 --- a/altk_evolve/frontend/mcp/http_transport.py +++ b/altk_evolve/frontend/mcp/http_transport.py @@ -72,15 +72,17 @@ async def _handle_sse( scope, receive, send, -) -> None: +) -> bool: """ Serve SSE directly as ASGI and avoid sending any follow-up HTTP response. `connect_sse(...)(scope, receive, send)` owns the HTTP response lifecycle. Returning an additional Response after it exits can race with teardown and trigger duplicate MCP request completion assertions. + + Returns False when the session ended due to a benign client disconnect. """ - await _run_sse_session(server, sse, scope, receive, send) + return await _run_sse_session(server, sse, scope, receive, send) def create_resilient_sse_app( @@ -102,8 +104,8 @@ def create_resilient_sse_app( sse = SseServerTransport(message_path) - async def handle_sse(scope, receive, send) -> None: - await _handle_sse(server, sse, scope, receive, send) + async def handle_sse(scope, receive, send) -> bool: + return await _handle_sse(server, sse, scope, receive, send) class SseEndpoint: """ASGI app wrapping handle_sse that tracks whether a response was started.""" @@ -117,8 +119,8 @@ async def tracked_send(message) -> None: response_started = True await send(message) - await handle_sse(scope, receive, tracked_send) - if not response_started: + completed = await handle_sse(scope, receive, tracked_send) + if completed and not response_started: response = Response(status_code=204) await response(scope, receive, send) diff --git a/tests/e2e/test_mcp.py b/tests/e2e/test_mcp.py index 195f1259..76ad6a36 100644 --- a/tests/e2e/test_mcp.py +++ b/tests/e2e/test_mcp.py @@ -1,3 +1,6 @@ +import asyncio +import uuid + import pytest import json from fastmcp.client import Client @@ -224,29 +227,34 @@ async def test_create_entity_with_invalid_json_metadata(mcp): @pytest.mark.e2e async def test_store_and_retrieve_user_facts(mcp): async with Client(transport=mcp) as evolve_mcp: + user_id = f"user-{uuid.uuid4()}" store_response = await evolve_mcp.call_tool_mcp( "store_user_facts", { - "user_id": "user-123", + "user_id": user_id, "message": "I prefer concise answers with bullet points.", "metadata": json.dumps({"source": "cuga-lite"}), "enable_conflict_resolution": False, }, ) stored = json.loads(store_response.content[0].text) - assert stored["user_id"] == "user-123" + assert stored["user_id"] == user_id assert stored["stored_count"] >= 1 - retrieve_response = await evolve_mcp.call_tool_mcp( - "retrieve_user_facts", - { - "user_id": "user-123", - "query": "How should I format the answer?", - "limit": 5, - }, - ) - retrieved = json.loads(retrieve_response.content[0].text) + for _ in range(10): + retrieve_response = await evolve_mcp.call_tool_mcp( + "retrieve_user_facts", + { + "user_id": user_id, + "query": "How should I format the answer?", + "limit": 5, + }, + ) + retrieved = json.loads(retrieve_response.content[0].text) + if retrieved["matched_count"] > 0: + break + await asyncio.sleep(0.25) - assert retrieved["user_id"] == "user-123" + assert retrieved["user_id"] == user_id assert "categories" in retrieved - assert retrieved["matched_count"] >= 0 + assert retrieved["matched_count"] > 0 From f4a6a546a63c92a0b46212b1ba3d3fabc36853ab Mon Sep 17 00:00:00 2001 From: Gaodan Fang Date: Thu, 30 Apr 2026 16:17:30 -0400 Subject: [PATCH 4/6] docs(e2e): note LLM config requirement in e2e conftest Co-Authored-By: Claude Opus 4.6 --- tests/e2e/conftest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 89dce2a5..bc8185c8 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,3 +1,6 @@ +# E2e tests require a working LLM for fact-extraction and guideline-generation. +# See docs/guides/configuration.md for the required EVOLVE_* env vars. + import os import uuid import pytest From c7d6b0320d3a7e4f78d13ea8b79b161c731ba167 Mon Sep 17 00:00:00 2001 From: Gaodan Fang Date: Thu, 30 Apr 2026 16:22:51 -0400 Subject: [PATCH 5/6] fix(e2e): warn when LLM env vars are missing for e2e tests Add pytest_configure hook that emits a UserWarning when neither OPENAI_API_KEY nor EVOLVE_MODEL_NAME is set, pointing to the configuration guide. Co-Authored-By: Claude Opus 4.6 --- tests/e2e/conftest.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index bc8185c8..e64f8f7b 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,11 +1,22 @@ -# E2e tests require a working LLM for fact-extraction and guideline-generation. -# See docs/guides/configuration.md for the required EVOLVE_* env vars. - import os import uuid +import warnings import pytest from altk_evolve.config.milvus import milvus_client_settings + +def pytest_configure(config): + """Warn early when LLM env vars needed by e2e tests are missing.""" + has_openai = bool(os.environ.get("OPENAI_API_KEY")) + has_evolve_model = bool(os.environ.get("EVOLVE_MODEL_NAME")) + if not has_openai and not has_evolve_model: + warnings.warn( + "No OPENAI_API_KEY or EVOLVE_MODEL_NAME set — e2e tests that depend on " + "LLM fact-extraction will fail. See docs/guides/configuration.md.", + stacklevel=1, + ) + + _EVOLVE_ENV_KEYS = ("EVOLVE_NAMESPACE_ID", "EVOLVE_BACKEND", "EVOLVE_SQLITE_PATH", "EVOLVE_DATA_DIR") From 15d959281dea5dcccdf74226fd39e63a6fd32e2d Mon Sep 17 00:00:00 2001 From: Gaodan Fang Date: Thu, 30 Apr 2026 17:30:56 -0400 Subject: [PATCH 6/6] fix(mcp): catch BaseException to suppress CancelledError in SSE teardown asyncio.CancelledError inherits from BaseException, not Exception, in Python 3.9+. The except Exception handler never caught it, making the _is_benign_disconnect_exception check for CancelledError dead code. Co-Authored-By: Claude Opus 4.6 --- altk_evolve/frontend/mcp/http_transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/altk_evolve/frontend/mcp/http_transport.py b/altk_evolve/frontend/mcp/http_transport.py index 74b497b4..49d85a33 100644 --- a/altk_evolve/frontend/mcp/http_transport.py +++ b/altk_evolve/frontend/mcp/http_transport.py @@ -57,7 +57,7 @@ async def _run_sse_session( streams[1], server._mcp_server.create_initialization_options(), ) - except Exception as exc: + except BaseException as exc: if _is_benign_disconnect_exception(exc): logger.debug("Suppressing benign SSE disconnect during response flush") return False