Skip to content

Fix event loop starvation and enable RPC bridge for MCP server bridging#9

Open
nikhilaii93 wants to merge 1 commit intoelusznik:mainfrom
nikhilaii93:nverma-event-loop-starvation-rpc-bridge
Open

Fix event loop starvation and enable RPC bridge for MCP server bridging#9
nikhilaii93 wants to merge 1 commit intoelusznik:mainfrom
nikhilaii93:nverma-event-loop-starvation-rpc-bridge

Conversation

@nikhilaii93
Copy link
Copy Markdown

Summary

Fixes event loop starvation caused by PersistentMCPClient._forward_read tasks when bridged MCP servers are loaded, and enables end-to-end RPC bridging so sandbox code can call tools on bridged servers (e.g., JIRA).

Problem

When bridged MCP servers are loaded, each server spawns an asyncio task running async for item in raw_read_stream: in a tight loop. These tasks never yield long enough for Docker subprocess I/O (stdin/stdout pipes) to be scheduled on the same event loop, causing complete starvation. Every approach that relied on the main event loop for Docker pipe I/O failed — asyncio.StreamReader.readline(), os.read() on raw fd, loop.run_in_executor(), and asyncio.sleep() polling were all starved.

Changes

  1. Isolated Docker I/O thread — Run the entire Docker stdin/stdout exchange on a dedicated thread with its own asyncio.new_event_loop(). Signal completion via threading.Event. Detect completion from the main loop using anyio.to_thread.run_sync(done_event.wait).

  2. Fresh container per call — Always terminate and recreate containers instead of reusing running ones. Prior containers reference stale IPC temp directories cleaned up by SandboxInvocation.__aexit__.

  3. Container death detection — After the init sleep, check popen.poll() and raise SandboxError with collected stderr if the container died during init.

  4. None vs [] server semanticsservers=None means auto-discover all configured servers; servers=[] (explicit empty list) means no servers. Previously [] was falsy and triggered auto-discovery, loading unwanted servers whose failures corrupted the cancel scope stack.

  5. Environment inheritance — Merge os.environ with server-specific env so bridged servers inherit auth credentials (JIRA_API_TOKEN, GITHUB_TOKEN, etc.) from the parent process.

  6. Proper client lifecycle — Keep bridged MCP clients alive during the entire sandbox execution. Call client.stop() after execution in the same task context that called start(). Previously, cancelling _forward_task without calling stop() left anyio cancel scopes and context managers un-exited, causing RuntimeError: Attempted to exit cancel scope in a different task on the next request.

  7. Enable RPC handler — Changed rpc_handler=None to rpc_handler=invocation.handle_rpc so sandbox code can actually call bridged tools, not just see their metadata.

  8. Fix MCP_AVAILABLE_SERVERS env var — Update the container environment variable after populating invocation.server_metadata, not during SandboxInvocation.__aenter__ when it's still empty.

Verified

  • Basic sandbox: print(42)42
  • JIRA RPC bridge: await mcp_jira.get_jira_issue_summary(issue_key="SPARK-1") → returns issue title from live JIRA
  • MCP_AVAILABLE_SERVERS correctly populated with server metadata and tool lists inside the container

Test plan

  • Run basic sandbox execution: run_python(code='print(42)')
  • Run with explicit empty servers: run_python(code='print(1)', servers=[])
  • Run with bridged JIRA server: run_python(code='r = await mcp_jira.get_jira_issue_summary(issue_key="SPARK-1")\nprint(r)', servers=["jira"])
  • Verify no cancel scope errors on subsequent calls after a bridged execution
  • Verify container is fresh on each call (no stale IPC directory errors)

The main asyncio event loop is starved by PersistentMCPClient._forward_read
tasks when bridged MCP servers are loaded. Each server creates an asyncio task
running `async for item in raw_read_stream:` in a tight loop, preventing Docker
subprocess I/O from ever being scheduled.

Changes:
- Run Docker stdin/stdout exchange on an isolated thread with its own event loop,
  signalling completion via threading.Event and anyio.to_thread.run_sync
- Always create fresh containers per call to avoid stale IPC directories
- Detect container death during init and surface stderr diagnostics
- Distinguish servers=None (auto-discover) from servers=[] (no servers)
- Inherit parent process env vars so bridged servers get auth credentials
- Keep bridged MCP clients alive during execution and call client.stop()
  in the same task context that called start() to prevent cancel scope corruption
- Enable RPC handler so sandbox code can call bridged tools (e.g. JIRA)
- Fix MCP_AVAILABLE_SERVERS env var population timing
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the code execution sandbox to address event loop starvation by moving Docker stdin/stdout exchange to a dedicated thread with its own event loop. It also improves server metadata prefetching and cleanup logic. I have identified several issues: the assignment of Popen to self._process will cause runtime errors because Popen.wait is not awaitable, the blocking wait call in _ensure_started should be offloaded to a thread, the use of a magic number for initialization sleep is unreliable, and the local import of subprocess should be moved to the top of the file. Additionally, there is a naming conflict with _stop_runtime that should be resolved.

Comment on lines +1668 to +1669
# Also set the asyncio _process so returncode check works
self._process = self._popen
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Assigning the subprocess.Popen object to self._process will cause a runtime error in _stop_runtime (line 1924), as it attempts to await self._process.wait(). subprocess.Popen.wait() is a blocking method and not an awaitable. Since self._popen is now used to track the container process and is handled separately in _stop_runtime, this assignment is redundant and harmful.

Suggested change
# Also set the asyncio _process so returncode check works
self._process = self._popen
# Track the container process via self._popen

)
popen.stdin.flush()

async def _stop_runtime(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method name shadows the one defined at line 1500. Having two methods with the same name in the same class is confusing and error-prone. You should merge the logic (stopping the container process and stopping the podman machine) into a single cleanup method or use distinct names.

if self._popen is not None:
try:
self._popen.terminate()
self._popen.wait(timeout=5)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

self._popen.wait(timeout=5) is a blocking call. Executing it directly within an async def method will block the entire event loop for up to 5 seconds. You should use await anyio.to_thread.run_sync(self._popen.wait, 5) instead. Additionally, if the process does not terminate within the timeout, you should call self._popen.kill() to ensure resources are released.


# Give the container time to initialize (parse entrypoint,
# set up asyncio, register stdin reader).
await asyncio.sleep(2)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The 2-second sleep for container initialization is a magic number that may be unreliable across different environments. It can cause unnecessary latency on fast systems or fail if the container takes longer to start. Consider implementing a more robust readiness check, such as waiting for a specific signal on stdout or polling the process status with a shorter interval.

stdin=aio_subprocess.PIPE,
stdout=aio_subprocess.PIPE,
stderr=aio_subprocess.PIPE,
import subprocess as _sp
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Importing subprocess inside a method is generally discouraged. It should be moved to the top of the file to follow standard Python practices and make it available for other methods and exception handling (e.g., subprocess.TimeoutExpired).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant