22"""
33MCP Data Collector Module
44
5- This module provides a Python interface to collect release data.
6- It can either use the MCP server (for remote access) or call OAR functions directly (for local access).
5+ This module provides a Python interface to collect release data from the MCP server.
76
87Usage:
9- # Use MCP server (remote )
10- collector = MCPDataCollector(use_mcp=True, server_url="http://server:8000/sse" )
8+ # Use MCP server (local default: http://localhost:8000/mcp )
9+ collector = MCPDataCollector()
1110
12- # Use direct OAR calls (local, faster )
13- collector = MCPDataCollector(use_mcp=False )
11+ # Use MCP server (remote )
12+ collector = MCPDataCollector(server_url="http://server:8000/mcp" )
1413
14+ # Fetch release data
1515 status = collector.get_release_status("4.19.1")
1616 metadata = collector.get_release_metadata("4.19.1")
1717"""
2121import os
2222import sys
2323import asyncio
24+ import concurrent .futures
2425from typing import Dict , Any , Optional
25- from mcp .client .sse import sse_client
26+ import httpx
27+ from mcp .client .streamable_http import streamable_http_client
2628from mcp import ClientSession
2729
2830logger = logging .getLogger (__name__ )
@@ -41,76 +43,137 @@ class MCPDataCollector:
4143 - Release status (task completion, overall status)
4244 - Release metadata (advisories, builds, dates)
4345 - Shipment status (shipped, flow type)
46+
47+ Note: Maintains a persistent event loop to avoid "Task group is not initialized"
48+ errors with FastMCP's streamable-http transport.
4449 """
4550
4651 def __init__ (self , server_url : Optional [str ] = None ):
4752 """
4853 Initialize the MCP data collector
4954
5055 Args:
51- server_url: MCP server URL (default: from MCP_SERVER_URL env var or http://localhost:8000/sse )
56+ server_url: MCP server URL (default: from MCP_SERVER_URL env var or http://localhost:8000/mcp )
5257 """
53- default_url = os .environ .get ('MCP_SERVER_URL' , 'http://localhost:8000/sse ' )
58+ default_url = os .environ .get ('MCP_SERVER_URL' , 'http://localhost:8000/mcp ' )
5459 self .server_url = server_url or default_url
55- self .timeout = 120 # HTTP request timeout in seconds (increased from 60)
56- self .sse_read_timeout = 600 # SSE read timeout in seconds (10 min for slow operations, increased from 300)
60+ self .timeout = 120 # HTTP connection timeout in seconds
61+ self .read_timeout = 600 # HTTP read timeout in seconds (10 min for slow operations)
62+
63+ # Create a persistent event loop for this instance
64+ # This avoids "Task group is not initialized" errors with streamable-http transport
65+ # when making multiple calls (asyncio.run() creates new loop each time which breaks FastMCP)
66+
67+ # Check if we're in an environment with a running loop (like Streamlit)
68+ try :
69+ asyncio .get_running_loop ()
70+ self ._loop_is_external = True
71+ logger .info (f"Detected external event loop (e.g., Streamlit)" )
72+ except RuntimeError :
73+ self ._loop_is_external = False
74+ logger .info (f"No external event loop detected" )
75+
76+ # Always create our own dedicated event loop (don't reuse external loops)
77+ self ._loop = asyncio .new_event_loop ()
78+ logger .info (f"Created dedicated event loop for MCP data collector" )
79+
5780 logger .info (f"Initialized MCP data collector with server: { self .server_url } " )
5881
59- async def _call_mcp_tool_async (self , tool_name : str , ** kwargs ) -> Dict [str , Any ]:
82+ async def _call_mcp_tool_async (self , tool_name : str , max_retries : int = 5 , ** kwargs ) -> Dict [str , Any ]:
6083 """
61- Call an MCP tool via SSE transport (async)
84+ Call an MCP tool via HTTP transport with retry logic (async)
85+
86+ This implements retry logic similar to Claude Code's MCP client configuration
87+ to handle transient "Task group is not initialized" errors from FastMCP.
6288
6389 Args:
6490 tool_name: Name of the MCP tool to call
91+ max_retries: Maximum number of retry attempts (default: 5, matching Claude Code)
6592 **kwargs: Tool parameters
6693
6794 Returns:
6895 Parsed JSON response from the tool
6996
7097 Raises:
71- RuntimeError: If tool call fails
98+ RuntimeError: If tool call fails after all retries
7299 """
73- try :
74- logger .debug (f"Calling MCP tool: { tool_name } with args: { kwargs } " )
75-
76- # Connect to MCP server via SSE with explicit timeouts
77- async with sse_client (
78- self .server_url ,
79- timeout = self .timeout ,
80- sse_read_timeout = self .sse_read_timeout
81- ) as (read , write ):
82- async with ClientSession (read , write ) as session :
83- # Initialize the session
84- await session .initialize ()
85-
86- # Call the tool (no prefix needed for fastmcp server)
87- # Set a timeout for the tool call itself
88- result = await asyncio .wait_for (
89- session .call_tool (tool_name , arguments = kwargs ),
90- timeout = self .sse_read_timeout
91- )
92-
93- # Check for error response
94- if result .isError :
95- error_msg = result .content [0 ].text if result .content else "Unknown error"
96- logger .error (f"MCP tool { tool_name } returned error: { error_msg } " )
97- raise RuntimeError (f"Tool returned error: { error_msg } " )
98-
99- # Extract text content from result
100- if result .content and len (result .content ) > 0 :
101- text_content = result .content [0 ].text
102- # Parse JSON response
103- return json .loads (text_content )
104- else :
105- logger .warning (f"Empty response from tool { tool_name } " )
106- return {}
107-
108- except asyncio .TimeoutError :
109- logger .error (f"Timeout calling MCP tool { tool_name } after { self .sse_read_timeout } s" )
110- raise RuntimeError (f"MCP tool call timed out after { self .sse_read_timeout } s" )
111- except Exception as e :
112- logger .error (f"Failed to call MCP tool { tool_name } : { str (e )} " )
113- raise RuntimeError (f"MCP tool call failed: { str (e )} " )
100+ last_error = None
101+ initial_delay = 2.0 # seconds, matching Claude Code's config
102+
103+ for attempt in range (max_retries ):
104+ try :
105+ logger .debug (f"Calling MCP tool: { tool_name } with args: { kwargs } (attempt { attempt + 1 } /{ max_retries } )" )
106+
107+ # Create httpx client with timeout configuration
108+ # Configure both connect and read timeouts to match MCP server expectations
109+ http_timeout = httpx .Timeout (
110+ connect = self .timeout , # Connection timeout
111+ read = self .read_timeout , # Read timeout for long operations
112+ write = self .timeout , # Write timeout
113+ pool = self .timeout # Pool timeout
114+ )
115+
116+ async with httpx .AsyncClient (timeout = http_timeout ) as http_client :
117+ # Connect to MCP server via HTTP with configured client
118+ async with streamable_http_client (
119+ self .server_url ,
120+ http_client = http_client
121+ ) as (read , write , _ ):
122+ async with ClientSession (read , write ) as session :
123+ # Initialize the session
124+ await session .initialize ()
125+
126+ # Call the tool (no prefix needed for fastmcp server)
127+ # Set a timeout for the tool call itself
128+ result = await asyncio .wait_for (
129+ session .call_tool (tool_name , arguments = kwargs ),
130+ timeout = self .read_timeout
131+ )
132+
133+ # Check for error response
134+ if result .isError :
135+ error_msg = result .content [0 ].text if result .content else "Unknown error"
136+ logger .error (f"MCP tool { tool_name } returned error: { error_msg } " )
137+ raise RuntimeError (f"Tool returned error: { error_msg } " )
138+
139+ # Extract text content from result
140+ if result .content and len (result .content ) > 0 :
141+ text_content = result .content [0 ].text
142+ # Parse JSON response
143+ return json .loads (text_content )
144+ else :
145+ logger .warning (f"Empty response from tool { tool_name } " )
146+ return {}
147+
148+ except asyncio .TimeoutError as e :
149+ last_error = e
150+ logger .warning (f"Timeout calling MCP tool { tool_name } after { self .read_timeout } s (attempt { attempt + 1 } /{ max_retries } )" )
151+ if attempt < max_retries - 1 :
152+ delay = initial_delay * (2 ** attempt ) # Exponential backoff
153+ logger .info (f"Retrying in { delay :.1f} s..." )
154+ await asyncio .sleep (delay )
155+ continue
156+ except Exception as e :
157+ last_error = e
158+ # Check if this is a "Task group is not initialized" error
159+ error_str = str (e )
160+ if "Task" in error_str and "group" in error_str :
161+ logger .warning (f"Task group error calling MCP tool { tool_name } : { error_str } (attempt { attempt + 1 } /{ max_retries } )" )
162+ if attempt < max_retries - 1 :
163+ delay = initial_delay * (2 ** attempt ) # Exponential backoff
164+ logger .info (f"Retrying in { delay :.1f} s..." )
165+ await asyncio .sleep (delay )
166+ continue
167+ # Non-retryable error, fail immediately
168+ logger .error (f"Failed to call MCP tool { tool_name } : { str (e )} " )
169+ raise RuntimeError (f"MCP tool call failed: { str (e )} " )
170+
171+ # All retries exhausted
172+ error_msg = f"MCP tool call failed after { max_retries } attempts"
173+ if last_error :
174+ error_msg += f": { str (last_error )} "
175+ logger .error (error_msg )
176+ raise RuntimeError (error_msg )
114177
115178 async def get_release_status_async (self , release : str ) -> Dict [str , Any ]:
116179 """
@@ -224,22 +287,50 @@ async def get_all_release_data_async(self, release: str) -> Dict[str, Any]:
224287 'shipped' : shipped
225288 }
226289
290+ def _run_async (self , coro ):
291+ """
292+ Run async coroutine using the persistent event loop.
293+
294+ This method ensures we reuse the same event loop across calls,
295+ which is required for FastMCP's streamable-http transport.
296+
297+ When running in environments with existing event loops (like Streamlit),
298+ we use our own dedicated loop in a way that doesn't conflict.
299+ """
300+ if self ._loop_is_external :
301+ # We're in an environment with a running loop (e.g., Streamlit)
302+ # We can't use loop.run_until_complete() directly because another loop is running
303+ # Instead, we need to run the coroutine in our persistent loop using threading
304+ # This avoids "This event loop is already running" error
305+ def run_in_thread ():
306+ # Set our loop as the event loop for this thread
307+ asyncio .set_event_loop (self ._loop )
308+ # Run the coroutine to completion
309+ return self ._loop .run_until_complete (coro )
310+
311+ with concurrent .futures .ThreadPoolExecutor () as executor :
312+ future = executor .submit (run_in_thread )
313+ return future .result ()
314+ else :
315+ # Use our persistent loop directly
316+ return self ._loop .run_until_complete (coro )
317+
227318 # Synchronous wrappers for backward compatibility
228319 def get_release_status (self , release : str ) -> Dict [str , Any ]:
229320 """Get release status (sync wrapper)"""
230- return asyncio . run (self .get_release_status_async (release ))
321+ return self . _run_async (self .get_release_status_async (release ))
231322
232323 def get_release_metadata (self , release : str ) -> Dict [str , Any ]:
233324 """Get release metadata (sync wrapper)"""
234- return asyncio . run (self .get_release_metadata_async (release ))
325+ return self . _run_async (self .get_release_metadata_async (release ))
235326
236327 def is_release_shipped (self , release : str ) -> Dict [str , Any ]:
237328 """Check if release is shipped (sync wrapper)"""
238- return asyncio . run (self .is_release_shipped_async (release ))
329+ return self . _run_async (self .is_release_shipped_async (release ))
239330
240331 def get_all_release_data (self , release : str ) -> Dict [str , Any ]:
241332 """Get all release data (sync wrapper)"""
242- return asyncio . run (self .get_all_release_data_async (release ))
333+ return self . _run_async (self .get_all_release_data_async (release ))
243334
244335
245336if __name__ == "__main__" :
0 commit comments