diff --git a/docs/mcp/sse.mdx b/docs/mcp/sse.mdx new file mode 100644 index 000000000..80ce84a7e --- /dev/null +++ b/docs/mcp/sse.mdx @@ -0,0 +1,248 @@ +--- +title: "MCP SSE Integration" +sidebarTitle: "MCP SSE" +description: "Guide for integrating Server-Sent Events (SSE) with PraisonAI agents using MCP" +icon: "server" +--- + +## Add SSE Tool to AI Agent + +```mermaid +flowchart LR + In[In] --> Agent[AI Agent] + Agent --> Tool[SSE MCP] + Tool --> Agent + Agent --> Out[Out] + + style In fill:#8B0000,color:#fff + style Agent fill:#2E8B57,color:#fff + style Tool fill:#4169E1,color:#fff + style Out fill:#8B0000,color:#fff +``` + +## Quick Start + + + + +```python +from praisonaiagents import Agent, MCP + +search_agent = Agent( + instructions="""You are a weather agent that can provide weather information for a given city.""", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080/sse") +) + +search_agent.start("What is the weather in London?") +``` + + + + +```python +# python mcp-sse-direct-server.py --host 127.0.0.1 --port 8080 +from typing import Any +import httpx +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from mcp.server.sse import SseServerTransport +from starlette.requests import Request +from starlette.routing import Mount, Route +from mcp.server import Server +import uvicorn +import argparse +import logging +import os +import inspect + +# Set up logging based on environment variable +log_level = os.environ.get("LOGLEVEL", "info").upper() +logging.basicConfig(level=getattr(logging, log_level)) +logger = logging.getLogger("mcp-server") + +# Initialize FastMCP server for simple tools (SSE) +mcp = FastMCP("simple-tools") + +@mcp.tool() +async def get_greeting(name: str) -> str: + """Get a personalized greeting. + + Args: + name: Name of the person to greet + """ + logger.debug(f"get_greeting called with name: {name}") + return f"Hello, {name}! Welcome to our MCP SSE server." + +@mcp.tool() +async def get_weather(city: str) -> str: + """Get a simulated weather report for a city. + + Args: + city: Name of the city + """ + logger.debug(f"get_weather called with city: {city}") + # This is a mock implementation + weather_data = { + "Paris": "Sunny with a temperature of 22°C", + "London": "Rainy with a temperature of 15°C", + "New York": "Cloudy with a temperature of 18°C", + "Tokyo": "Clear skies with a temperature of 25°C", + "Sydney": "Partly cloudy with a temperature of 20°C" + } + + return weather_data.get(city, f"Weather data not available for {city}") + +def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette: + """Create a Starlette application that can serve the provided mcp server with SSE.""" + sse = SseServerTransport("/messages/") + + async def handle_sse(request: Request) -> None: + logger.debug(f"SSE connection request received from {request.client}") + async with sse.connect_sse( + request.scope, + request.receive, + request._send, # noqa: SLF001 + ) as (read_stream, write_stream): + await mcp_server.run( + read_stream, + write_stream, + mcp_server.create_initialization_options(), + ) + + return Starlette( + debug=debug, + routes=[ + Route("/sse", endpoint=handle_sse), + Mount("/messages/", app=sse.handle_post_message), + ], + ) + +if __name__ == "__main__": + mcp_server = mcp._mcp_server # noqa: WPS437 + + parser = argparse.ArgumentParser(description='Run MCP SSE-based server') + parser.add_argument('--host', default='localhost', help='Host to bind to') + parser.add_argument('--port', type=int, default=8080, help='Port to listen on') + args = parser.parse_args() + + print(f"Starting MCP SSE server on {args.host}:{args.port}") + + # Hardcode the tool names since we know what they are + tool_names = ["get_greeting", "get_weather"] + print(f"Available tools: {', '.join(tool_names)}") + + # Bind SSE request handling to MCP server + starlette_app = create_starlette_app(mcp_server, debug=True) + + uvicorn.run(starlette_app, host=args.host, port=args.port) +``` + + + + Make sure you have the required packages installed: + ```bash + pip install "praisonaiagents[llm]" mcp starlette uvicorn httpx + ``` + + + ```bash + export OPENAI_API_KEY="your_api_key" + ``` + + + + First, start the SSE server: + ```bash + python mcp-sse-direct-server.py --host 127.0.0.1 --port 8080 + ``` + + Then, in a new terminal, run the agent: + ```bash + python weather_agent.py + ``` + + + + + **Requirements** + - Python 3.10 or higher + - MCP server dependencies + + +## Alternative LLM Integrations + +### Using Groq with SSE + +```python +from praisonaiagents import Agent, MCP + +weather_agent = Agent( + instructions="""You are a weather agent that can provide weather information for a given city.""", + llm="groq/llama-3.2-90b-vision-preview", + tools=MCP("http://localhost:8080/sse") +) + +weather_agent.start("What is the weather in London?") +``` + +### Using Ollama with SSE + +```python +from praisonaiagents import Agent, MCP + +weather_agent = Agent( + instructions="""You are a weather agent that can provide weather information for a given city.""", + llm="ollama/llama3.2", + tools=MCP("http://localhost:8080/sse") +) + +weather_agent.start("What is the weather in London? Use get_weather tool, city is the required parameter.") +``` + +## Gradio UI Integration + +Create a Gradio UI for your weather service: + +```python +from praisonaiagents import Agent, MCP +import gradio as gr + +def get_weather_info(query): + weather_agent = Agent( + instructions="""You are a weather agent that can provide weather information for a given city.""", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080/sse") + ) + + result = weather_agent.start(query) + return f"## Weather Information\n\n{result}" + +demo = gr.Interface( + fn=get_weather_info, + inputs=gr.Textbox(placeholder="What's the weather in London?"), + outputs=gr.Markdown(), + title="Weather MCP Agent", + description="Ask about the weather in any major city:" +) + +if __name__ == "__main__": + demo.launch() +``` + +## Features + + + + Receive server-sent events in real-time from your AI agent. + + + Combine SSE with other MCP tools for complex workflows. + + + Use with OpenAI, Groq, Ollama, or other supported LLMs. + + + Create user-friendly interfaces for your SSE integrations. + + diff --git a/docs/mint.json b/docs/mint.json index 973b46c99..0e30277f4 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -238,6 +238,7 @@ "group": "MCP", "pages": [ "mcp/airbnb", + "mcp/sse", "mcp/ollama", "mcp/groq", "mcp/openrouter", diff --git a/src/praisonai-agents/.gitignore b/src/praisonai-agents/.gitignore index 5fbcb8eb4..9049ca748 100644 --- a/src/praisonai-agents/.gitignore +++ b/src/praisonai-agents/.gitignore @@ -84,4 +84,5 @@ build *.mp4 *.png graph.py -chroma_db/ \ No newline at end of file +chroma_db/ +.qodo diff --git a/src/praisonai-agents/README.md b/src/praisonai-agents/README.md new file mode 100644 index 000000000..b53be0187 --- /dev/null +++ b/src/praisonai-agents/README.md @@ -0,0 +1,83 @@ +# MCP SSE Server and Client Implementation + +This project demonstrates a working pattern for SSE-based MCP (Model Context Protocol) servers and clients. It consists of three main components: + +1. **server.py**: An SSE-based MCP server that provides simple tools +2. **client.py**: A standalone client that connects to the server and uses its tools with Claude +3. **mcp-sse.py**: A client using praisonaiagents that connects to the server and uses its tools with OpenAI + +## Tools Provided by the Server + +The server implements two simple tools: + +- **get_greeting**: Returns a personalized greeting for a given name +- **get_weather**: Returns simulated weather data for a given city + +## Setup and Usage + +### Prerequisites + +Make sure you have the required packages installed: + +```bash +pip install praisonaiagents mcp httpx starlette uvicorn anthropic python-dotenv +``` + +### Running the Server + +First, start the MCP SSE server: + +```bash +python server.py +``` + +By default, the server runs on 0.0.0.0:8080, but you can customize the host and port: + +```bash +python server.py --host 127.0.0.1 --port 8081 +``` + +### Running the Standalone Client + +The standalone client uses Claude to interact with the MCP server tools: + +```bash +# Set your Anthropic API key +export ANTHROPIC_API_KEY=your_api_key_here + +# Run the client +python client.py http://0.0.0.0:8080/sse +``` + +You'll see a prompt where you can type queries for Claude to process using the MCP tools. + +### Running the praisonaiagents Client + +The praisonaiagents client uses OpenAI to interact with the MCP server tools: + +```bash +# Set your OpenAI API key +export OPENAI_API_KEY=your_api_key_here + +# Run the client +python mcp-sse.py +``` + +This will automatically send a query about the weather in Paris to the agent. + +## How It Works + +1. The server exposes MCP tools via an SSE endpoint +2. Clients connect to this endpoint and discover available tools +3. When a user makes a query, the client: + - For client.py: Uses Claude to determine which tool to call + - For mcp-sse.py: Uses OpenAI to determine which tool to call +4. The client executes the tool call against the server +5. The result is returned to the user + +This pattern allows for decoupled processes where the MCP server can run independently of clients, making it suitable for cloud-native applications. + +## Customizing + +- To add more tools to the server, define new functions with the `@mcp.tool()` decorator in `server.py` +- To change the client's behavior, update the instructions and query in `mcp-sse.py` \ No newline at end of file diff --git a/src/praisonai-agents/mcp-sse-direct-agent.py b/src/praisonai-agents/mcp-sse-direct-agent.py new file mode 100644 index 000000000..061f694ba --- /dev/null +++ b/src/praisonai-agents/mcp-sse-direct-agent.py @@ -0,0 +1,222 @@ +import os +import logging +import asyncio +import time +import sys +import inspect +import json +from typing import List, Dict, Any, Optional, Callable +from contextlib import AsyncExitStack + +from mcp import ClientSession +from mcp.client.sse import sse_client + +from praisonaiagents import Agent + +# Set up logging based on environment variable +log_level = os.environ.get("LOGLEVEL", "info").upper() +logging.basicConfig(level=getattr(logging, log_level)) +logger = logging.getLogger("mcp-client") + +# Create a custom prompt that explicitly mentions the tools +system_prompt = """You are a helpful assistant that can provide greetings and check weather information. + +You have access to the following tools: +1. get_greeting(name: str) - Get a personalized greeting for a given name +2. get_weather(city: str) - Get weather information for a city (Paris, London, New York, Tokyo, Sydney) + +When asked about weather, always use the get_weather tool with the appropriate city. +When asked for a greeting, always use the get_greeting tool with the appropriate name. +""" + +# Global event loop for async operations +event_loop = None + +def get_event_loop(): + """Get or create a global event loop.""" + global event_loop + if event_loop is None or event_loop.is_closed(): + event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(event_loop) + return event_loop + +class SSEMCPTool: + """A wrapper for an MCP tool that can be used with praisonaiagents.""" + + def __init__(self, name: str, description: str, session: ClientSession, input_schema: Optional[Dict[str, Any]] = None): + self.name = name + self.__name__ = name # Required for Agent to recognize it as a tool + self.__qualname__ = name # Required for Agent to recognize it as a tool + self.__doc__ = description # Required for Agent to recognize it as a tool + self.description = description + self.session = session + self.input_schema = input_schema or {} + + # Create a signature based on input schema + params = [] + if input_schema and 'properties' in input_schema: + for param_name in input_schema['properties']: + params.append( + inspect.Parameter( + name=param_name, + kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, + default=inspect.Parameter.empty if param_name in input_schema.get('required', []) else None, + annotation=str # Default to string + ) + ) + + self.__signature__ = inspect.Signature(params) + + def __call__(self, **kwargs): + """Synchronous wrapper for the async call.""" + logger.debug(f"Tool {self.name} called with args: {kwargs}") + + # Use the global event loop + loop = get_event_loop() + + # Run the async call in the event loop + future = asyncio.run_coroutine_threadsafe(self._async_call(**kwargs), loop) + try: + # Wait for the result with a timeout + return future.result(timeout=30) + except Exception as e: + logger.error(f"Error calling tool {self.name}: {e}") + return f"Error: {str(e)}" + + async def _async_call(self, **kwargs): + """Call the tool with the provided arguments.""" + logger.debug(f"Async calling tool {self.name} with args: {kwargs}") + try: + result = await self.session.call_tool(self.name, kwargs) + + # Extract text from result + if hasattr(result, 'content') and result.content: + if hasattr(result.content[0], 'text'): + return result.content[0].text + return str(result.content[0]) + return str(result) + except Exception as e: + logger.error(f"Error in _async_call for {self.name}: {e}") + raise + + def to_openai_tool(self): + """Convert the tool to OpenAI format.""" + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.input_schema + } + } + + +class SSEMCPClient: + """A client for connecting to an MCP server over SSE.""" + + def __init__(self, server_url: str): + self.server_url = server_url + self.session = None + self.tools = [] + self._initialize() + + def _initialize(self): + """Initialize the connection and tools.""" + # Use the global event loop + loop = get_event_loop() + + # Start a background thread to run the event loop + def run_event_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + import threading + self.loop_thread = threading.Thread(target=run_event_loop, daemon=True) + self.loop_thread.start() + + # Run the initialization in the event loop + future = asyncio.run_coroutine_threadsafe(self._async_initialize(), loop) + self.tools = future.result(timeout=30) + + async def _async_initialize(self): + """Asynchronously initialize the connection and tools.""" + logger.debug(f"Connecting to MCP server at {self.server_url}") + + # Create SSE client + self._streams_context = sse_client(url=self.server_url) + streams = await self._streams_context.__aenter__() + + self._session_context = ClientSession(*streams) + self.session = await self._session_context.__aenter__() + + # Initialize + await self.session.initialize() + + # List available tools + logger.debug("Listing tools...") + response = await self.session.list_tools() + tools_data = response.tools + logger.debug(f"Found {len(tools_data)} tools: {[tool.name for tool in tools_data]}") + + # Create tool wrappers + tools = [] + for tool in tools_data: + input_schema = tool.inputSchema if hasattr(tool, 'inputSchema') else None + wrapper = SSEMCPTool( + name=tool.name, + description=tool.description if hasattr(tool, 'description') else f"Call the {tool.name} tool", + session=self.session, + input_schema=input_schema + ) + tools.append(wrapper) + + return tools + + def __iter__(self): + """Return an iterator over the tools.""" + return iter(self.tools) + + +def main(): + # Server URL + server_url = "http://0.0.0.0:8080/sse" + + try: + # Connect to the MCP server + client = SSEMCPClient(server_url) + + if not client.tools: + logger.error("No tools found on the server") + return + + logger.info(f"Connected to server with {len(client.tools)} tools: {[tool.name for tool in client.tools]}") + + # Create OpenAI-compatible tool definitions + openai_tools = [tool.to_openai_tool() for tool in client.tools] + logger.debug(f"OpenAI tools: {json.dumps(openai_tools, indent=2)}") + + # Create an agent with the tools + assistant_agent = Agent( + instructions=system_prompt, + llm="openai/gpt-4o-mini", + tools=client.tools, + verbose=True + ) + + # Start the agent with a query + logger.info("Starting agent with query about weather in Paris") + result = assistant_agent.chat( + "Hello! Can you tell me what the weather is like in Paris today?", + tools=openai_tools + ) + + logger.info(f"Agent response: {result}") + + except Exception as e: + logger.error(f"Error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/praisonai-agents/mcp-sse-direct-client.py b/src/praisonai-agents/mcp-sse-direct-client.py new file mode 100644 index 000000000..3d908fb57 --- /dev/null +++ b/src/praisonai-agents/mcp-sse-direct-client.py @@ -0,0 +1,120 @@ +# python mcp-sse-direct-client.py http://0.0.0.0:8080/sse +import asyncio +import json +import os +import sys +from typing import Optional +from contextlib import AsyncExitStack + +from mcp import ClientSession +from mcp.client.sse import sse_client + +from dotenv import load_dotenv + +load_dotenv() # load environment variables from .env + +class MCPClient: + def __init__(self): + # Initialize session and client objects + self.session: Optional[ClientSession] = None + self.exit_stack = AsyncExitStack() + + async def connect_to_sse_server(self, server_url: str): + """Connect to an MCP server running with SSE transport""" + # Store the context managers so they stay alive + self._streams_context = sse_client(url=server_url) + streams = await self._streams_context.__aenter__() + + self._session_context = ClientSession(*streams) + self.session: ClientSession = await self._session_context.__aenter__() + + # Initialize + await self.session.initialize() + + # List available tools to verify connection + print("Initialized SSE client...") + print("Listing tools...") + response = await self.session.list_tools() + tools = response.tools + print("\nConnected to server with tools:", [tool.name for tool in tools]) + + # Print tool descriptions + for tool in tools: + print(f"\n{tool.name}: {tool.description}") + if hasattr(tool, 'inputSchema') and tool.inputSchema: + print(f" Parameters: {json.dumps(tool.inputSchema, indent=2)}") + + async def cleanup(self): + """Properly clean up the session and streams""" + if hasattr(self, '_session_context'): + await self._session_context.__aexit__(None, None, None) + if hasattr(self, '_streams_context'): + await self._streams_context.__aexit__(None, None, None) + + async def process_query(self, query: str) -> str: + """Process a query by directly calling the appropriate tool""" + query = query.strip().lower() + + if query.startswith("hello") or query.startswith("hi"): + # Extract name or use a default + parts = query.split() + name = parts[1] if len(parts) > 1 else "there" + + # Call the greeting tool + print(f"\nCalling get_greeting with name: {name}") + result = await self.session.call_tool("get_greeting", {"name": name}) + return result.content[0].text if hasattr(result, 'content') and result.content else str(result) + + elif "weather" in query: + # Try to extract city name + city = None + for known_city in ["Paris", "London", "New York", "Tokyo", "Sydney"]: + if known_city.lower() in query.lower(): + city = known_city + break + + if not city: + return "I couldn't identify a city in your query. Please mention a city like Paris, London, New York, Tokyo, or Sydney." + + # Call the weather tool + print(f"\nCalling get_weather with city: {city}") + result = await self.session.call_tool("get_weather", {"city": city}) + return result.content[0].text if hasattr(result, 'content') and result.content else str(result) + + else: + return "I can help with greetings or weather information. Try asking something like 'Hello John' or 'What's the weather in Paris?'" + + async def chat_loop(self): + """Run an interactive chat loop""" + print("\nMCP Client Started!") + print("Type your queries or 'quit' to exit.") + + while True: + try: + query = input("\nQuery: ").strip() + + if query.lower() == 'quit': + break + + response = await self.process_query(query) + print("\n" + response) + + except Exception as e: + print(f"\nError: {str(e)}") + + +async def main(): + if len(sys.argv) < 2: + print("Usage: python client.py ") + sys.exit(1) + + client = MCPClient() + try: + await client.connect_to_sse_server(server_url=sys.argv[1]) + await client.chat_loop() + finally: + await client.cleanup() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/praisonai-agents/mcp-sse-direct-server.py b/src/praisonai-agents/mcp-sse-direct-server.py new file mode 100644 index 000000000..82a7b8aa5 --- /dev/null +++ b/src/praisonai-agents/mcp-sse-direct-server.py @@ -0,0 +1,95 @@ +# python mcp-sse-direct-server.py --host 127.0.0.1 --port 8080 +from typing import Any +import httpx +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from mcp.server.sse import SseServerTransport +from starlette.requests import Request +from starlette.routing import Mount, Route +from mcp.server import Server +import uvicorn +import argparse +import logging +import os +import inspect + +# Set up logging based on environment variable +log_level = os.environ.get("LOGLEVEL", "info").upper() +logging.basicConfig(level=getattr(logging, log_level)) +logger = logging.getLogger("mcp-server") + +# Initialize FastMCP server for simple tools (SSE) +mcp = FastMCP("simple-tools") + +@mcp.tool() +async def get_greeting(name: str) -> str: + """Get a personalized greeting. + + Args: + name: Name of the person to greet + """ + logger.debug(f"get_greeting called with name: {name}") + return f"Hello, {name}! Welcome to our MCP SSE server." + +@mcp.tool() +async def get_weather(city: str) -> str: + """Get a simulated weather report for a city. + + Args: + city: Name of the city + """ + logger.debug(f"get_weather called with city: {city}") + # This is a mock implementation + weather_data = { + "Paris": "Sunny with a temperature of 22°C", + "London": "Rainy with a temperature of 15°C", + "New York": "Cloudy with a temperature of 18°C", + "Tokyo": "Clear skies with a temperature of 25°C", + "Sydney": "Partly cloudy with a temperature of 20°C" + } + + return weather_data.get(city, f"Weather data not available for {city}") + +def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette: + """Create a Starlette application that can serve the provided mcp server with SSE.""" + sse = SseServerTransport("/messages/") + + async def handle_sse(request: Request) -> None: + logger.debug(f"SSE connection request received from {request.client}") + async with sse.connect_sse( + request.scope, + request.receive, + request._send, # noqa: SLF001 + ) as (read_stream, write_stream): + await mcp_server.run( + read_stream, + write_stream, + mcp_server.create_initialization_options(), + ) + + return Starlette( + debug=debug, + routes=[ + Route("/sse", endpoint=handle_sse), + Mount("/messages/", app=sse.handle_post_message), + ], + ) + +if __name__ == "__main__": + mcp_server = mcp._mcp_server # noqa: WPS437 + + parser = argparse.ArgumentParser(description='Run MCP SSE-based server') + parser.add_argument('--host', default='localhost', help='Host to bind to') + parser.add_argument('--port', type=int, default=8080, help='Port to listen on') + args = parser.parse_args() + + print(f"Starting MCP SSE server on {args.host}:{args.port}") + + # Hardcode the tool names since we know what they are + tool_names = ["get_greeting", "get_weather"] + print(f"Available tools: {', '.join(tool_names)}") + + # Bind SSE request handling to MCP server + starlette_app = create_starlette_app(mcp_server, debug=True) + + uvicorn.run(starlette_app, host=args.host, port=args.port) \ No newline at end of file diff --git a/src/praisonai-agents/mcp-sse-weather.py b/src/praisonai-agents/mcp-sse-weather.py new file mode 100644 index 000000000..bcd48b77b --- /dev/null +++ b/src/praisonai-agents/mcp-sse-weather.py @@ -0,0 +1,9 @@ +from praisonaiagents import Agent, MCP + +search_agent = Agent( + instructions="""You are a weather agent that can provide weather information for a given city.""", + llm="openai/gpt-4o-mini", + tools=MCP("http://localhost:8080/sse") +) + +search_agent.start("What is the weather in London?") \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index 2c6dc4035..922320d86 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -530,11 +530,21 @@ def execute_tool(self, function_name, arguments): from ..mcp.mcp import MCP if isinstance(self.tools, MCP): logging.debug(f"Looking for MCP tool {function_name}") - # Check if any of the MCP tools match the function name - for mcp_tool in self.tools.runner.tools: - if hasattr(mcp_tool, 'name') and mcp_tool.name == function_name: - logging.debug(f"Found matching MCP tool: {function_name}") - return self.tools.runner.call_tool(function_name, arguments) + + # Handle SSE MCP client + if hasattr(self.tools, 'is_sse') and self.tools.is_sse: + if hasattr(self.tools, 'sse_client'): + for tool in self.tools.sse_client.tools: + if tool.name == function_name: + logging.debug(f"Found matching SSE MCP tool: {function_name}") + return tool(**arguments) + # Handle stdio MCP client + elif hasattr(self.tools, 'runner'): + # Check if any of the MCP tools match the function name + for mcp_tool in self.tools.runner.tools: + if hasattr(mcp_tool, 'name') and mcp_tool.name == function_name: + logging.debug(f"Found matching MCP tool: {function_name}") + return self.tools.runner.call_tool(function_name, arguments) # Try to find the function in the agent's tools list first func = None @@ -815,7 +825,11 @@ def chat(self, prompt, temperature=0.2, tools=None, output_json=None, output_pyd logging.debug("Converting MCP tool to OpenAI format") openai_tool = tool_param.to_openai_tool() if openai_tool: - tool_param = [openai_tool] + # Handle both single tool and list of tools + if isinstance(openai_tool, list): + tool_param = openai_tool + else: + tool_param = [openai_tool] logging.debug(f"Converted MCP tool: {tool_param}") # Pass everything to LLM class diff --git a/src/praisonai-agents/praisonaiagents/llm/llm.py b/src/praisonai-agents/praisonaiagents/llm/llm.py index 64e21752d..ba1671fa6 100644 --- a/src/praisonai-agents/praisonaiagents/llm/llm.py +++ b/src/praisonai-agents/praisonaiagents/llm/llm.py @@ -293,6 +293,12 @@ def get_response( if isinstance(tool, dict) and 'type' in tool and tool['type'] == 'function': logging.debug(f"Using pre-formatted OpenAI tool: {tool['function']['name']}") formatted_tools.append(tool) + # Handle lists of tools (e.g. from MCP.to_openai_tool()) + elif isinstance(tool, list): + for subtool in tool: + if isinstance(subtool, dict) and 'type' in subtool and subtool['type'] == 'function': + logging.debug(f"Using pre-formatted OpenAI tool from list: {subtool['function']['name']}") + formatted_tools.append(subtool) elif callable(tool): tool_def = self._generate_tool_definition(tool.__name__) if tool_def: diff --git a/src/praisonai-agents/praisonaiagents/mcp/__init__.py b/src/praisonai-agents/praisonaiagents/mcp/__init__.py index c8b4f2090..ed7fef1bc 100644 --- a/src/praisonai-agents/praisonaiagents/mcp/__init__.py +++ b/src/praisonai-agents/praisonaiagents/mcp/__init__.py @@ -1,5 +1,8 @@ """ Model Context Protocol (MCP) integration for PraisonAI Agents. + +This package provides classes and utilities for connecting to MCP servers +using different transport methods (stdio, SSE, etc.). """ from .mcp import MCP diff --git a/src/praisonai-agents/praisonaiagents/mcp/mcp.py b/src/praisonai-agents/praisonaiagents/mcp/mcp.py index 6ca53f858..6233e07a4 100644 --- a/src/praisonai-agents/praisonaiagents/mcp/mcp.py +++ b/src/praisonai-agents/praisonaiagents/mcp/mcp.py @@ -6,6 +6,7 @@ import shlex import logging import os +import re from typing import Any, List, Optional, Callable, Iterable, Union from functools import wraps, partial @@ -126,6 +127,13 @@ class MCP: tools=MCP("/path/to/python /path/to/app.py") ) + # Method 3: Using an SSE endpoint + agent = Agent( + instructions="You are a helpful assistant...", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080/sse") + ) + agent.start("What is the stock price of Tesla?") ``` """ @@ -139,6 +147,7 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6 - The command to run the MCP server (e.g., Python path) - A complete command string (e.g., "/path/to/python /path/to/app.py") - For NPX: 'npx' command with args for smithery tools + - An SSE URL (e.g., "http://localhost:8080/sse") args: Arguments to pass to the command (when command_or_string is the command) command: Alternative parameter name for backward compatibility timeout: Timeout in seconds for MCP server initialization and tool calls (default: 60) @@ -149,7 +158,44 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6 if command_or_string is None and command is not None: command_or_string = command - # Handle the single string format + # Set up logging - default to WARNING level to hide INFO messages + if debug: + logging.getLogger("mcp-wrapper").setLevel(logging.DEBUG) + logging.getLogger("mcp-sse").setLevel(logging.DEBUG) + logging.getLogger("mcp.client").setLevel(logging.DEBUG) + logging.getLogger("sse").setLevel(logging.DEBUG) + logging.getLogger("mcp-server").setLevel(logging.DEBUG) + logging.getLogger("mcp-client").setLevel(logging.DEBUG) + logging.getLogger("_client").setLevel(logging.DEBUG) + logging.getLogger("httpx").setLevel(logging.DEBUG) + logging.getLogger("llm").setLevel(logging.DEBUG) + else: + # Set all MCP-related loggers to WARNING level by default + logging.getLogger("mcp-wrapper").setLevel(logging.WARNING) + logging.getLogger("mcp-sse").setLevel(logging.WARNING) + logging.getLogger("mcp.client").setLevel(logging.WARNING) + logging.getLogger("sse").setLevel(logging.WARNING) + logging.getLogger("mcp-server").setLevel(logging.WARNING) + logging.getLogger("mcp-client").setLevel(logging.WARNING) + logging.getLogger("_client").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("llm").setLevel(logging.WARNING) + + # Store additional parameters + self.timeout = timeout + self.debug = debug + + # Check if this is an SSE URL + if isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string): + # Import the SSE client implementation + from .mcp_sse import SSEMCPClient + self.sse_client = SSEMCPClient(command_or_string, debug=debug) + self._tools = list(self.sse_client.tools) + self.is_sse = True + self.is_npx = False + return + + # Handle the single string format for stdio client if isinstance(command_or_string, str) and args is None: # Split the string into command and args using shell-like parsing parts = shlex.split(command_or_string) @@ -162,7 +208,9 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6 # Use the original format with separate command and args cmd = command_or_string arguments = args or [] - + + # Set up stdio client + self.is_sse = False self.server_params = StdioServerParameters( command=cmd, args=arguments, @@ -173,13 +221,6 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6 # Wait for initialization if not self.runner.initialized.wait(timeout=30): print("Warning: MCP initialization timed out") - - # Store additional parameters - self.timeout = timeout - self.debug = debug - - if debug: - logging.getLogger("mcp-wrapper").setLevel(logging.DEBUG) # Automatically detect if this is an NPX command self.is_npx = cmd == 'npx' or (isinstance(cmd, str) and os.path.basename(cmd) == 'npx') @@ -199,6 +240,9 @@ def _generate_tool_functions(self) -> List[Callable]: Returns: List[Callable]: Functions that can be used as tools """ + if self.is_sse: + return list(self.sse_client.tools) + tool_functions = [] for tool in self.runner.tools: @@ -303,8 +347,6 @@ def _initialize_npx_mcp_tools(self, cmd, arguments): logging.error(f"Failed to initialize NPX MCP tools: {e}") raise RuntimeError(f"Failed to initialize NPX MCP tools: {e}") - - def __iter__(self) -> Iterable[Callable]: """ Allow the MCP instance to be used directly as an iterable of tools. @@ -320,37 +362,43 @@ def to_openai_tool(self): provider/model format (e.g., "openai/gpt-4o-mini"). Returns: - dict: OpenAI-compatible tool definition + dict or list: OpenAI-compatible tool definition(s) """ + if self.is_sse and hasattr(self, 'sse_client') and self.sse_client.tools: + # Return all tools from SSE client + return self.sse_client.to_openai_tools() + # For simplicity, we'll convert the first tool only if multiple exist # More complex implementations could handle multiple tools - if not self.runner.tools: + if not hasattr(self, 'runner') or not self.runner.tools: logging.warning("No MCP tools available to convert to OpenAI format") return None - # Get the first tool's schema - tool = self.runner.tools[0] + # Convert all tools to OpenAI format + openai_tools = [] + for tool in self.runner.tools: + # Create OpenAI tool definition + parameters = {} + if hasattr(tool, 'inputSchema') and tool.inputSchema: + parameters = tool.inputSchema + else: + # Create a minimal schema if none exists + parameters = { + "type": "object", + "properties": {}, + "required": [] + } + + openai_tools.append({ + "type": "function", + "function": { + "name": tool.name, + "description": tool.description if hasattr(tool, 'description') else f"Call the {tool.name} tool", + "parameters": parameters + } + }) - # Create OpenAI tool definition - parameters = {} - if hasattr(tool, 'inputSchema') and tool.inputSchema: - parameters = tool.inputSchema - else: - # Create a minimal schema if none exists - parameters = { - "type": "object", - "properties": {}, - "required": [] - } - - return { - "type": "function", - "function": { - "name": tool.name, - "description": tool.description if hasattr(tool, 'description') else f"Call the {tool.name} tool", - "parameters": parameters - } - } + return openai_tools def __del__(self): """Clean up resources when the object is garbage collected.""" diff --git a/src/praisonai-agents/praisonaiagents/mcp/mcp_sse.py b/src/praisonai-agents/praisonaiagents/mcp/mcp_sse.py new file mode 100644 index 000000000..67f0f22c8 --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/mcp/mcp_sse.py @@ -0,0 +1,184 @@ +""" +SSE (Server-Sent Events) client implementation for MCP (Model Context Protocol). +This module provides the necessary classes and functions to connect to an MCP server +over SSE transport. +""" + +import asyncio +import logging +import threading +import inspect +import json +from typing import List, Dict, Any, Optional, Callable, Iterable + +from mcp import ClientSession +from mcp.client.sse import sse_client + +logger = logging.getLogger("mcp-sse") + +# Global event loop for async operations +_event_loop = None + +def get_event_loop(): + """Get or create a global event loop.""" + global _event_loop + if _event_loop is None or _event_loop.is_closed(): + _event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_event_loop) + return _event_loop + + +class SSEMCPTool: + """A wrapper for an MCP tool that can be used with praisonaiagents.""" + + def __init__(self, name: str, description: str, session: ClientSession, input_schema: Optional[Dict[str, Any]] = None): + self.name = name + self.__name__ = name # Required for Agent to recognize it as a tool + self.__qualname__ = name # Required for Agent to recognize it as a tool + self.__doc__ = description # Required for Agent to recognize it as a tool + self.description = description + self.session = session + self.input_schema = input_schema or {} + + # Create a signature based on input schema + params = [] + if input_schema and 'properties' in input_schema: + for param_name in input_schema['properties']: + params.append( + inspect.Parameter( + name=param_name, + kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, + default=inspect.Parameter.empty if param_name in input_schema.get('required', []) else None, + annotation=str # Default to string + ) + ) + + self.__signature__ = inspect.Signature(params) + + def __call__(self, **kwargs): + """Synchronous wrapper for the async call.""" + logger.debug(f"Tool {self.name} called with args: {kwargs}") + + # Use the global event loop + loop = get_event_loop() + + # Run the async call in the event loop + future = asyncio.run_coroutine_threadsafe(self._async_call(**kwargs), loop) + try: + # Wait for the result with a timeout + return future.result(timeout=30) + except Exception as e: + logger.error(f"Error calling tool {self.name}: {e}") + return f"Error: {str(e)}" + + async def _async_call(self, **kwargs): + """Call the tool with the provided arguments.""" + logger.debug(f"Async calling tool {self.name} with args: {kwargs}") + try: + result = await self.session.call_tool(self.name, kwargs) + + # Extract text from result + if hasattr(result, 'content') and result.content: + if hasattr(result.content[0], 'text'): + return result.content[0].text + return str(result.content[0]) + return str(result) + except Exception as e: + logger.error(f"Error in _async_call for {self.name}: {e}") + raise + + def to_openai_tool(self): + """Convert the tool to OpenAI format.""" + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.input_schema + } + } + + +class SSEMCPClient: + """A client for connecting to an MCP server over SSE.""" + + def __init__(self, server_url: str, debug: bool = False): + """ + Initialize an SSE MCP client. + + Args: + server_url: The URL of the SSE MCP server + debug: Whether to enable debug logging + """ + self.server_url = server_url + self.debug = debug + self.session = None + self.tools = [] + + # Set up logging + if debug: + logger.setLevel(logging.DEBUG) + else: + # Set to WARNING by default to hide INFO messages + logger.setLevel(logging.WARNING) + + self._initialize() + + def _initialize(self): + """Initialize the connection and tools.""" + # Use the global event loop + loop = get_event_loop() + + # Start a background thread to run the event loop + def run_event_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + self.loop_thread = threading.Thread(target=run_event_loop, daemon=True) + self.loop_thread.start() + + # Run the initialization in the event loop + future = asyncio.run_coroutine_threadsafe(self._async_initialize(), loop) + self.tools = future.result(timeout=30) + + async def _async_initialize(self): + """Asynchronously initialize the connection and tools.""" + logger.debug(f"Connecting to MCP server at {self.server_url}") + + # Create SSE client + self._streams_context = sse_client(url=self.server_url) + streams = await self._streams_context.__aenter__() + + self._session_context = ClientSession(*streams) + self.session = await self._session_context.__aenter__() + + # Initialize + await self.session.initialize() + + # List available tools + logger.debug("Listing tools...") + response = await self.session.list_tools() + tools_data = response.tools + logger.debug(f"Found {len(tools_data)} tools: {[tool.name for tool in tools_data]}") + + # Create tool wrappers + tools = [] + for tool in tools_data: + input_schema = tool.inputSchema if hasattr(tool, 'inputSchema') else None + wrapper = SSEMCPTool( + name=tool.name, + description=tool.description if hasattr(tool, 'description') else f"Call the {tool.name} tool", + session=self.session, + input_schema=input_schema + ) + tools.append(wrapper) + + return tools + + def __iter__(self): + """Return an iterator over the tools.""" + return iter(self.tools) + + def to_openai_tools(self): + """Convert all tools to OpenAI format.""" + return [tool.to_openai_tool() for tool in self.tools] \ No newline at end of file diff --git a/src/praisonai-agents/pyproject.toml b/src/praisonai-agents/pyproject.toml index 7af918c7d..c025923ad 100644 --- a/src/praisonai-agents/pyproject.toml +++ b/src/praisonai-agents/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "praisonaiagents" -version = "0.0.75" +version = "0.0.77" description = "Praison AI agents for completing complex tasks with Self Reflection Agents" authors = [ { name="Mervin Praison" } @@ -28,7 +28,7 @@ memory = [ knowledge = [ "mem0ai>=0.1.0", "chromadb==0.5.23", - "markitdown", + "markitdown[all]", "chonkie>=1.0.2" ] diff --git a/src/praisonai-agents/uv.lock b/src/praisonai-agents/uv.lock index 35a0ca924..4a00693dc 100644 --- a/src/praisonai-agents/uv.lock +++ b/src/praisonai-agents/uv.lock @@ -1883,7 +1883,7 @@ wheels = [ [[package]] name = "praisonaiagents" -version = "0.0.75" +version = "0.0.77" source = { editable = "." } dependencies = [ { name = "mcp" }, @@ -1925,7 +1925,7 @@ requires-dist = [ { name = "chromadb", marker = "extra == 'knowledge'", specifier = "==0.5.23" }, { name = "chromadb", marker = "extra == 'memory'", specifier = ">=0.5.23" }, { name = "litellm", marker = "extra == 'llm'", specifier = ">=1.50.0" }, - { name = "markitdown", marker = "extra == 'knowledge'" }, + { name = "markitdown", extras = ["all"], marker = "extra == 'knowledge'" }, { name = "mcp", specifier = ">=1.6.0" }, { name = "mcp", marker = "extra == 'mcp'", specifier = ">=1.6.0" }, { name = "mem0ai", marker = "extra == 'knowledge'", specifier = ">=0.1.0" },