Conversation
…hance agent launch functionality in `agents.py` to support both HTTP and MCP protocols, including improved endpoint management, healthcheck capabilities, and error handling for agent execution.
|
Caution Review failedThe pull request is closed. WalkthroughThis update introduces new MCP (Model Context Protocol) server features and examples, enhances agent and multi-agent server launching to support both HTTP and MCP protocols, and adds comprehensive documentation for deploying and building MCP servers. The PraisonAI and praisonaiagents package versions are incremented, and all Docker and documentation references are updated accordingly. Changes
Sequence Diagram(s)Launching an MCP Multi-Agent ServersequenceDiagram
participant UserScript
participant PraisonAIAgents
participant Agent1
participant Agent2
participant FastMCP
participant StarletteApp
UserScript->>PraisonAIAgents: Initialize with Agent1, Agent2, name
UserScript->>PraisonAIAgents: launch(port=8080, protocol="mcp")
PraisonAIAgents->>FastMCP: Create FastMCP instance
PraisonAIAgents->>FastMCP: Register tool (named after agents group)
FastMCP->>StarletteApp: Setup SSE and message endpoints
PraisonAIAgents->>StarletteApp: Start server in background thread
Note over StarletteApp: Server listens for MCP requests
Agent Launch with Protocol SelectionsequenceDiagram
participant UserScript
participant Agent
participant FastAPI/Starlette
participant FastMCP
UserScript->>Agent: launch(port, protocol)
alt protocol == "http"
Agent->>FastAPI/Starlette: Setup HTTP endpoints
FastAPI/Starlette->>UserScript: Start server in thread
else protocol == "mcp"
Agent->>FastMCP: Setup MCP tool and endpoints
FastMCP->>UserScript: Start server in thread
end
Poem
Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure 📜 Recent review detailsConfiguration used: CodeRabbit UI ⛔ Files ignored due to path filters (2)
📒 Files selected for processing (22)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
✅ Deploy Preview for praisonai ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
There was a problem hiding this comment.
Hello @MervinPraison, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
Hello team, Gemini here! I've analyzed this pull request titled "2.1.6". Based on the title, description, and the changes included, this PR primarily focuses on introducing support for launching PraisonAI agents and agent collections (workflows) as Model Context Protocol (MCP) servers. It includes significant code modifications to the launch methods within the Agent and Agents classes to handle the new mcp protocol. Alongside the core implementation, the PR adds comprehensive documentation and examples demonstrating how to create, deploy (including Docker and cloud platforms), and connect to these new MCP servers. Finally, it updates the project's version to 2.1.6 and bumps the praisonaiagents dependency to 0.0.82.
Highlights
- MCP Server Support: Introduces the ability to launch individual
Agentinstances andAgentscollections (workflows) as Model Context Protocol (MCP) servers, enabling standardized tool access and communication. - Updated
launchMethod: Thelaunchmethod in bothAgentandAgentsclasses has been refactored to accept aprotocolparameter (httpormcp) to switch between traditional FastAPI endpoints and new MCP server endpoints. - New Documentation: Adds two new comprehensive documentation pages: one on creating MCP servers (
docs/mcp/mcp-server.mdx) and another on deploying them to production environments, including Docker, AWS ECS, and Google Cloud Run (docs/deploy/mcp-server-deploy.mdx). These are also added to the documentation navigation (docs/mint.json). - New Examples: Includes new example scripts demonstrating how to set up simple single-agent and multi-agent MCP servers, as well as an example of an MCP client using SSE.
- Version Bumps: Updates the main
PraisonAIpackage version to 2.1.6 and thepraisonaiagentsdependency version to 0.0.82 acrosspyproject.toml,uv.lock, and Dockerfiles. - Minor Agent Improvement: Adds a small fix in the
Agent.achatmethod to ensure that if no specifictoolsare provided in the call, the agent defaults to using its own configuredself.tools.
Changelog
Click here to see the changelog
- docker/Dockerfile
- Updated praisonai version from 2.1.5 to 2.1.6.
- docker/Dockerfile.chat
- Updated praisonai version from 2.1.5 to 2.1.6.
- docker/Dockerfile.dev
- Updated praisonai version from 2.1.5 to 2.1.6.
- Updated praisonaiagents version from 0.0.81 to 0.0.82.
- docker/Dockerfile.ui
- Updated praisonai version from 2.1.5 to 2.1.6.
- docs/api/praisonai/deploy.html
- Updated praisonai version in Dockerfile snippet within documentation.
- docs/deploy/mcp-server-deploy.mdx
- Added new documentation page for deploying MCP servers.
- docs/developers/local-development.mdx
- Updated praisonai version in Dockerfile snippet.
- docs/mcp/mcp-server.mdx
- Added new documentation page for creating MCP servers.
- docs/mint.json
- Added new documentation pages for MCP server creation and deployment to navigation.
- docs/ui/chat.mdx
- Updated praisonai version in Dockerfile snippet.
- docs/ui/code.mdx
- Updated praisonai version in Dockerfile snippet.
- examples/api/mcp-sse.py
- Added new example script for an MCP client using SSE.
- examples/api/simple-mcp-multi-agents-server.py
- Added new example script for a multi-agent MCP server.
- examples/api/simple-mcp-server.py
- Added new example script for a single-agent MCP server.
- praisonai/deploy.py
- Updated praisonai version in the generated Dockerfile content.
- pyproject.toml
- Updated praisonai version from 2.1.5 to 2.1.6.
- Updated praisonaiagents dependency constraint from >=0.0.81 to >=0.0.82.
- src/praisonai-agents/mcp-sse.py
- Added new script (likely source for example) for an MCP client using SSE.
- src/praisonai-agents/praisonaiagents/agent/agent.py
- Refactored
launchmethod to support 'mcp' protocol for launching agents as MCP servers. - Integrated FastMCP, SseServerTransport, and Starlette for MCP server functionality.
- Added an MCP tool wrapper for agent execution.
- Added dependency checks for MCP mode.
- Ensured
achatmethod defaults toself.toolsif no tools are provided.
- Refactored
- src/praisonai-agents/praisonaiagents/agents/agents.py
- Refactored
launchmethod to support 'mcp' protocol for launching agent collections as MCP workflow servers. - Integrated FastMCP, SseServerTransport, and Starlette for MCP server functionality for workflows.
- Added an MCP tool wrapper for executing the agent workflow.
- Added dependency checks for MCP mode.
- Added optional
nameparameter toPraisonAIAgentsconstructor.
- Refactored
- src/praisonai-agents/pyproject.toml
- Updated praisonaiagents version from 0.0.81 to 0.0.82.
- src/praisonai-agents/simple-mcp-multi-agents-server.py
- Added new script (likely source for example) for a multi-agent MCP server.
- src/praisonai-agents/simple-mcp-server.py
- Added new script (likely source for example) for a single-agent MCP server.
- src/praisonai-agents/uv.lock
- Updated locked praisonaiagents version to 0.0.82.
- uv.lock
- Updated locked praisonai version to 2.1.6.
- Updated locked praisonaiagents version to 0.0.82.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Agent takes a task,
Launches server, answers ask,
Protocol shifts now.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Code Review
This pull request introduces version 2.1.6, primarily focusing on adding Model Context Protocol (MCP) server capabilities to agents and agent groups. This includes significant refactoring of the launch methods in agent.py and agents.py, along with new documentation and examples for MCP. The version numbers have also been consistently updated across various files.
Overall, the changes are well-implemented and the new MCP functionality is a valuable addition. The documentation for MCP is comprehensive. I have a few suggestions for improving maintainability and ensuring consistency in the documentation.
Summary of Findings
- Dependency Version Consistency: The
praisonaiagentsversion in the new MCP documentation files (docs/deploy/mcp-server-deploy.mdx,docs/mcp/mcp-server.mdx) should be updated from>=0.0.81to>=0.0.82to match the version specified inpyproject.toml. - Code Maintainability: The
launchmethods in bothsrc/praisonai-agents/praisonaiagents/agent/agent.pyandsrc/praisonai-agents/praisonaiagents/agents/agents.pyhave become quite large due to the addition of MCP support. Refactoring these methods by extracting protocol-specific logic (HTTP and MCP) into separate helper functions would improve readability and maintainability. - MCP Dependency Installation Instructions: The
pip installinstructions for MCP dependencies inagent.pyandagents.py(and consequently in the documentation derived from them) mention apraison-mcppackage. It's unclear if this is a publicly available package. Clarifying this or pointing to the correct package (e.g.,mcp-model-context-protocol) would be beneficial for users. - MCP Server Instance Naming: In
src/praisonai-agents/praisonaiagents/agents/agents.py, the MCP server instance name is hardcoded. Making this name dynamic usingself.name(if available) would improve consistency and identifiability, especially if multiple agent groups are launched. - Fragile Server Blocking Logic (Low Severity - Not Commented): The logic in
launchmethods to detect if it's the last call in a script (to keep the server alive by blocking) relies on stack inspection and file reading. This approach can be fragile. While it might work for many common use cases, a more robust server lifecycle management (e.g., an explicitserve()call) could be considered for future enhancements.
Merge Readiness
This pull request is a significant step forward, introducing MCP server capabilities and updating to version 2.1.6. The core changes are well-implemented, and the new documentation is a great addition.
I've identified a few medium-severity issues primarily related to maintainability of the launch methods and consistency in documentation regarding dependency versions. Addressing these would further improve the codebase and user experience.
While I cannot approve pull requests, I believe these changes are in good shape after considering the suggested improvements. The author should review the feedback, particularly the points on refactoring and documentation consistency, before this PR is merged by the maintainers.
| <Step title="Install Dependencies"> | ||
| Make sure you have the required packages installed: | ||
| ```bash | ||
| pip install "praisonaiagents[mcp]>=0.0.81" |
|
|
||
| For the multi-agent example with search capabilities: | ||
| ```bash | ||
| pip install "praisonaiagents[mcp]>=0.0.81" duckduckgo-search |
|
|
||
| Create a `requirements.txt` file: | ||
| ``` | ||
| praisonaiagents[mcp]>=0.0.81 |
| <Step title="Install Dependencies"> | ||
| Make sure you have the required packages installed: | ||
| ```bash | ||
| pip install "praisonaiagents[mcp]>=0.0.81" |
| <Steps> | ||
| <Step title="Install Additional Dependencies"> | ||
| ```bash | ||
| pip install "praisonaiagents[mcp]>=0.0.81" duckduckgo-search |
| def launch(self, path: str = '/', port: int = 8000, host: str = '0.0.0.0', debug: bool = False, protocol: str = "http"): | ||
| """ | ||
| Launch the agent as an HTTP API endpoint. | ||
| Launch the agent as an HTTP API endpoint or an MCP server. | ||
|
|
||
| Args: | ||
| path: API endpoint path (default: '/') | ||
| path: API endpoint path (default: '/') for HTTP, or base path for MCP. | ||
| port: Server port (default: 8000) | ||
| host: Server host (default: '0.0.0.0') | ||
| debug: Enable debug mode for uvicorn (default: False) | ||
| protocol: "http" to launch as FastAPI, "mcp" to launch as MCP server. | ||
|
|
||
| Returns: | ||
| None | ||
| """ | ||
| global _server_started, _registered_agents, _shared_apps | ||
|
|
||
| # Try to import FastAPI dependencies - lazy loading | ||
| try: | ||
| import uvicorn | ||
| from fastapi import FastAPI, HTTPException, Request | ||
| from fastapi.responses import JSONResponse | ||
| from pydantic import BaseModel | ||
| import threading | ||
| import time | ||
| if protocol == "http": | ||
| global _server_started, _registered_agents, _shared_apps | ||
|
|
||
| # Define the request model here since we need pydantic | ||
| class AgentQuery(BaseModel): | ||
| query: str | ||
| # Try to import FastAPI dependencies - lazy loading | ||
| try: | ||
| import uvicorn | ||
| from fastapi import FastAPI, HTTPException, Request | ||
| from fastapi.responses import JSONResponse | ||
| from pydantic import BaseModel | ||
| import threading | ||
| import time | ||
|
|
||
| except ImportError as e: | ||
| # Check which specific module is missing | ||
| missing_module = str(e).split("No module named '")[-1].rstrip("'") | ||
| display_error(f"Missing dependency: {missing_module}. Required for launch() method.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method.") | ||
| print(f"\nTo add API capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module}") | ||
| print("\nOr install all API dependencies with:") | ||
| print("pip install 'praisonaiagents[api]'") | ||
| return None | ||
|
|
||
| # Initialize port-specific collections if needed | ||
| if port not in _registered_agents: | ||
| _registered_agents[port] = {} | ||
|
|
||
| # Initialize shared FastAPI app if not already created for this port | ||
| if _shared_apps.get(port) is None: | ||
| _shared_apps[port] = FastAPI( | ||
| title=f"PraisonAI Agents API (Port {port})", | ||
| description="API for interacting with PraisonAI Agents" | ||
| ) | ||
| # Define the request model here since we need pydantic | ||
| class AgentQuery(BaseModel): | ||
| query: str | ||
|
|
||
| except ImportError as e: | ||
| # Check which specific module is missing | ||
| missing_module = str(e).split("No module named '")[-1].rstrip("'") | ||
| display_error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") | ||
| print(f"\nTo add API capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module}") | ||
| print("\nOr install all API dependencies with:") | ||
| print("pip install 'praisonaiagents[api]'") | ||
| return None | ||
|
|
||
| # Initialize port-specific collections if needed | ||
| if port not in _registered_agents: | ||
| _registered_agents[port] = {} | ||
|
|
||
| # Initialize shared FastAPI app if not already created for this port | ||
| if _shared_apps.get(port) is None: | ||
| _shared_apps[port] = FastAPI( | ||
| title=f"PraisonAI Agents API (Port {port})", | ||
| description="API for interacting with PraisonAI Agents" | ||
| ) | ||
|
|
||
| # Add a root endpoint with a welcome message | ||
| @_shared_apps[port].get("/") | ||
| async def root(): | ||
| return { | ||
| "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", | ||
| "endpoints": list(_registered_agents[port].keys()) | ||
| } | ||
|
|
||
| # Add healthcheck endpoint | ||
| @_shared_apps[port].get("/health") | ||
| async def healthcheck(): | ||
| return { | ||
| "status": "ok", | ||
| "endpoints": list(_registered_agents[port].keys()) | ||
| } | ||
|
|
||
| # Add a root endpoint with a welcome message | ||
| @_shared_apps[port].get("/") | ||
| async def root(): | ||
| return { | ||
| "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", | ||
| "endpoints": list(_registered_agents[port].keys()) | ||
| } | ||
| # Normalize path to ensure it starts with / | ||
| if not path.startswith('/'): | ||
| path = f'/{path}' | ||
|
|
||
| # Check if path is already registered for this port | ||
| if path in _registered_agents[port]: | ||
| logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") | ||
| print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") | ||
| # Use a modified path to avoid conflicts | ||
| original_path = path | ||
| path = f"{path}_{self.agent_id[:6]}" | ||
| logging.warning(f"Using '{path}' instead of '{original_path}'") | ||
| print(f"🔄 Using '{path}' instead") | ||
|
|
||
| # Add healthcheck endpoint | ||
| @_shared_apps[port].get("/health") | ||
| async def healthcheck(): | ||
| return { | ||
| "status": "ok", | ||
| "endpoints": list(_registered_agents[port].keys()) | ||
| } | ||
|
|
||
| # Normalize path to ensure it starts with / | ||
| if not path.startswith('/'): | ||
| path = f'/{path}' | ||
| # Register the agent to this path | ||
| _registered_agents[port][path] = self.agent_id | ||
|
|
||
| # Check if path is already registered for this port | ||
| if path in _registered_agents[port]: | ||
| logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") | ||
| print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") | ||
| # Use a modified path to avoid conflicts | ||
| original_path = path | ||
| path = f"{path}_{self.agent_id[:6]}" | ||
| logging.warning(f"Using '{path}' instead of '{original_path}'") | ||
| print(f"🔄 Using '{path}' instead") | ||
|
|
||
| # Register the agent to this path | ||
| _registered_agents[port][path] = self.agent_id | ||
|
|
||
| # Define the endpoint handler | ||
| @_shared_apps[port].post(path) | ||
| async def handle_agent_query(request: Request, query_data: Optional[AgentQuery] = None): | ||
| # Handle both direct JSON with query field and form data | ||
| if query_data is None: | ||
| # Define the endpoint handler | ||
| @_shared_apps[port].post(path) | ||
| async def handle_agent_query(request: Request, query_data: Optional[AgentQuery] = None): | ||
| # Handle both direct JSON with query field and form data | ||
| if query_data is None: | ||
| try: | ||
| request_data = await request.json() | ||
| if "query" not in request_data: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| query = request_data["query"] | ||
| except: | ||
| # Fallback to form data or query params | ||
| form_data = await request.form() | ||
| if "query" in form_data: | ||
| query = form_data["query"] | ||
| else: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| else: | ||
| query = query_data.query | ||
|
|
||
| try: | ||
| request_data = await request.json() | ||
| if "query" not in request_data: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| query = request_data["query"] | ||
| except: | ||
| # Fallback to form data or query params | ||
| form_data = await request.form() | ||
| if "query" in form_data: | ||
| query = form_data["query"] | ||
| # Use async version if available, otherwise use sync version | ||
| if asyncio.iscoroutinefunction(self.chat): | ||
| response = await self.achat(query) | ||
| else: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| else: | ||
| query = query_data.query | ||
| # Run sync function in a thread to avoid blocking | ||
| loop = asyncio.get_event_loop() | ||
| response = await loop.run_in_executor(None, lambda p=query: self.chat(p)) | ||
|
|
||
| return {"response": response} | ||
| except Exception as e: | ||
| logging.error(f"Error processing query: {str(e)}", exc_info=True) | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={"error": f"Error processing query: {str(e)}"} | ||
| ) | ||
|
|
||
| print(f"🚀 Agent '{self.name}' available at http://{host}:{port}") | ||
|
|
||
| # Start the server if it's not already running for this port | ||
| if not _server_started.get(port, False): | ||
| # Mark the server as started first to prevent duplicate starts | ||
| _server_started[port] = True | ||
|
|
||
| try: | ||
| # Use async version if available, otherwise use sync version | ||
| if asyncio.iscoroutinefunction(self.chat): | ||
| response = await self.achat(query) | ||
| else: | ||
| # Run sync function in a thread to avoid blocking | ||
| loop = asyncio.get_event_loop() | ||
| response = await loop.run_in_executor(None, lambda: self.chat(query)) | ||
| # Start the server in a separate thread | ||
| def run_server(): | ||
| try: | ||
| print(f"✅ FastAPI server started at http://{host}:{port}") | ||
| print(f"📚 API documentation available at http://{host}:{port}/docs") | ||
| print(f"🔌 Available endpoints: {', '.join(list(_registered_agents[port].keys()))}") | ||
| uvicorn.run(_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") | ||
| except Exception as e: | ||
| logging.error(f"Error starting server: {str(e)}", exc_info=True) | ||
| print(f"❌ Error starting server: {str(e)}") | ||
|
|
||
| return {"response": response} | ||
| except Exception as e: | ||
| logging.error(f"Error processing query: {str(e)}", exc_info=True) | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={"error": f"Error processing query: {str(e)}"} | ||
| ) | ||
|
|
||
| print(f"🚀 Agent '{self.name}' available at http://{host}:{port}{path}") | ||
|
|
||
| # Start the server if it's not already running for this port | ||
| if not _server_started.get(port, False): | ||
| # Mark the server as started first to prevent duplicate starts | ||
| _server_started[port] = True | ||
| # Run server in a background thread | ||
| server_thread = threading.Thread(target=run_server, daemon=True) | ||
| server_thread.start() | ||
|
|
||
| # Wait for a moment to allow the server to start and register endpoints | ||
| time.sleep(0.5) | ||
| else: | ||
| # If server is already running, wait a moment to make sure the endpoint is registered | ||
| time.sleep(0.1) | ||
| print(f"🔌 Available endpoints on port {port}: {', '.join(list(_registered_agents[port].keys()))}") | ||
| # Get the stack frame to check if this is the last launch() call in the script | ||
| import inspect | ||
| stack = inspect.stack() | ||
|
|
||
| # Start the server in a separate thread | ||
| def run_server(): | ||
| # If this is called from a Python script (not interactive), try to detect if it's the last launch call | ||
| if len(stack) > 1 and stack[1].filename.endswith('.py'): | ||
| caller_frame = stack[1] | ||
| caller_line = caller_frame.lineno | ||
|
|
||
| try: | ||
| print(f"✅ FastAPI server started at http://{host}:{port}") | ||
| print(f"📚 API documentation available at http://{host}:{port}/docs") | ||
| print(f"🔌 Available endpoints: {', '.join(list(_registered_agents[port].keys()))}") | ||
| uvicorn.run(_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") | ||
| # Read the file to check if there are more launch calls after this one | ||
| with open(caller_frame.filename, 'r') as f: | ||
| lines = f.readlines() | ||
|
|
||
| # Check if there are more launch() calls after the current line | ||
| has_more_launches = False | ||
| for line_content in lines[caller_line:]: # renamed line to line_content | ||
| if '.launch(' in line_content and not line_content.strip().startswith('#'): | ||
| has_more_launches = True | ||
| break | ||
|
|
||
| # If this is the last launch call, block the main thread | ||
| if not has_more_launches: | ||
| try: | ||
| print("\nAll agents registered for HTTP mode. Press Ctrl+C to stop the servers.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
| except Exception as e: | ||
| logging.error(f"Error starting server: {str(e)}", exc_info=True) | ||
| print(f"❌ Error starting server: {str(e)}") | ||
|
|
||
| # Run server in a background thread | ||
| server_thread = threading.Thread(target=run_server, daemon=True) | ||
| server_thread.start() | ||
|
|
||
| # Wait for a moment to allow the server to start and register endpoints | ||
| time.sleep(0.5) | ||
| else: | ||
| # If server is already running, wait a moment to make sure the endpoint is registered | ||
| time.sleep(0.1) | ||
| print(f"🔌 Available endpoints on port {port}: {', '.join(list(_registered_agents[port].keys()))}") | ||
|
|
||
| # Get the stack frame to check if this is the last launch() call in the script | ||
| import inspect | ||
| stack = inspect.stack() | ||
|
|
||
| # If this is called from a Python script (not interactive), try to detect if it's the last launch call | ||
| if len(stack) > 1 and stack[1].filename.endswith('.py'): | ||
| caller_frame = stack[1] | ||
| caller_line = caller_frame.lineno | ||
| # If something goes wrong with detection, block anyway to be safe | ||
| logging.error(f"Error in launch detection: {e}") | ||
| try: | ||
| print("\nKeeping HTTP servers alive. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
| return None | ||
|
|
||
| elif protocol == "mcp": | ||
| try: | ||
| # Read the file to check if there are more launch calls after this one | ||
| with open(caller_frame.filename, 'r') as f: | ||
| lines = f.readlines() | ||
| import uvicorn | ||
| from mcp.server.fastmcp import FastMCP | ||
| from mcp.server.sse import SseServerTransport | ||
| from starlette.applications import Starlette | ||
| from starlette.requests import Request | ||
| from starlette.routing import Mount, Route | ||
| from mcp.server import Server as MCPServer # Alias to avoid conflict | ||
| import threading | ||
| import time | ||
| import inspect | ||
| import asyncio # Ensure asyncio is imported | ||
| # logging is already imported at the module level | ||
|
|
||
| # Check if there are more launch() calls after the current line | ||
| has_more_launches = False | ||
| for line in lines[caller_line:]: | ||
| if '.launch(' in line and not line.strip().startswith('#'): | ||
| has_more_launches = True | ||
| break | ||
|
|
||
| # If this is the last launch call, block the main thread | ||
| if not has_more_launches: | ||
| except ImportError as e: | ||
| missing_module = str(e).split("No module named '")[-1].rstrip("'") | ||
| display_error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| print(f"\nTo add MCP capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module} mcp praison-mcp starlette uvicorn") # Added mcp, praison-mcp, starlette, uvicorn | ||
| print("\nOr install all MCP dependencies with relevant packages.") | ||
| return None | ||
|
|
||
| mcp_server_instance_name = f"{self.name}_mcp_server" if self.name else "agent_mcp_server" | ||
| mcp = FastMCP(mcp_server_instance_name) | ||
|
|
||
| # Determine the MCP tool name based on self.name | ||
| actual_mcp_tool_name = f"execute_{self.name.lower().replace(' ', '_').replace('-', '_')}_task" if self.name \ | ||
| else "execute_task" | ||
|
|
||
| @mcp.tool(name=actual_mcp_tool_name) | ||
| async def execute_agent_task(prompt: str) -> str: | ||
| """Executes the agent's primary task with the given prompt.""" | ||
| logging.info(f"MCP tool '{actual_mcp_tool_name}' called with prompt: {prompt}") | ||
| try: | ||
| # Ensure self.achat is used as it's the async version and pass its tools | ||
| if hasattr(self, 'achat') and asyncio.iscoroutinefunction(self.achat): | ||
| response = await self.achat(prompt, tools=self.tools) | ||
| elif hasattr(self, 'chat'): # Fallback for synchronous chat | ||
| loop = asyncio.get_event_loop() | ||
| response = await loop.run_in_executor(None, lambda p=prompt: self.chat(p, tools=self.tools)) | ||
| else: | ||
| logging.error(f"Agent {self.name} has no suitable chat or achat method for MCP tool.") | ||
| return f"Error: Agent {self.name} misconfigured for MCP." | ||
| return response if response is not None else "Agent returned no response." | ||
| except Exception as e: | ||
| logging.error(f"Error in MCP tool '{actual_mcp_tool_name}': {e}", exc_info=True) | ||
| return f"Error executing task: {str(e)}" | ||
|
|
||
| # Normalize base_path for MCP routes | ||
| base_path = path.rstrip('/') | ||
| sse_path = f"{base_path}/sse" | ||
| messages_path_prefix = f"{base_path}/messages" # Prefix for message posting | ||
|
|
||
| # Ensure messages_path ends with a slash for Mount | ||
| if not messages_path_prefix.endswith('/'): | ||
| messages_path_prefix += '/' | ||
|
|
||
|
|
||
| sse_transport = SseServerTransport(messages_path_prefix) # Pass the full prefix | ||
|
|
||
| async def handle_sse_connection(request: Request) -> None: | ||
| logging.debug(f"SSE connection request received from {request.client} for path {request.url.path}") | ||
| async with sse_transport.connect_sse( | ||
| request.scope, | ||
| request.receive, | ||
| request._send, # noqa: SLF001 | ||
| ) as (read_stream, write_stream): | ||
| await mcp._mcp_server.run( # Use the underlying server from FastMCP | ||
| read_stream, | ||
| write_stream, | ||
| mcp._mcp_server.create_initialization_options(), | ||
| ) | ||
|
|
||
| starlette_app = Starlette( | ||
| debug=debug, | ||
| routes=[ | ||
| Route(sse_path, endpoint=handle_sse_connection), | ||
| Mount(messages_path_prefix, app=sse_transport.handle_post_message), | ||
| ], | ||
| ) | ||
|
|
||
| print(f"🚀 Agent '{self.name}' MCP server starting on http://{host}:{port}") | ||
| print(f"📡 MCP SSE endpoint available at {sse_path}") | ||
| print(f"📢 MCP messages post to {messages_path_prefix}") | ||
| # Instead of trying to extract tool names, hardcode the known tool name | ||
| tool_names = [actual_mcp_tool_name] # Use the determined dynamic tool name | ||
| print(f"🛠️ Available MCP tools: {', '.join(tool_names)}") | ||
|
|
||
| # Uvicorn server running logic (similar to HTTP mode but standalone for MCP) | ||
| def run_mcp_server(): | ||
| try: | ||
| uvicorn.run(starlette_app, host=host, port=port, log_level="debug" if debug else "info") | ||
| except Exception as e: | ||
| logging.error(f"Error starting MCP server: {str(e)}", exc_info=True) | ||
| print(f"❌ Error starting MCP server: {str(e)}") | ||
|
|
||
| server_thread = threading.Thread(target=run_mcp_server, daemon=True) | ||
| server_thread.start() | ||
| time.sleep(0.5) # Allow server to start | ||
|
|
||
| # Blocking logic for MCP mode | ||
| import inspect # Already imported but good for clarity | ||
| stack = inspect.stack() | ||
| if len(stack) > 1 and stack[1].filename.endswith('.py'): | ||
| caller_frame = stack[1] | ||
| caller_line = caller_frame.lineno | ||
| try: | ||
| with open(caller_frame.filename, 'r') as f: | ||
| lines = f.readlines() | ||
| has_more_launches = False | ||
| for line_content in lines[caller_line:]: # renamed line to line_content | ||
| if '.launch(' in line_content and not line_content.strip().startswith('#'): | ||
| has_more_launches = True | ||
| break | ||
| if not has_more_launches: | ||
| try: | ||
| print("\nAgent MCP server running. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nMCP Server stopped") | ||
| except Exception as e: | ||
| logging.error(f"Error in MCP launch detection: {e}") | ||
| try: | ||
| print("\nAll agents registered. Press Ctrl+C to stop the servers.") | ||
| print("\nKeeping MCP server alive. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
| except Exception as e: | ||
| # If something goes wrong with detection, block anyway to be safe | ||
| logging.error(f"Error in launch detection: {e}") | ||
| try: | ||
| print("\nKeeping servers alive. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
|
|
||
| return None No newline at end of file | ||
| print("\nMCP Server stopped") | ||
| return None | ||
| else: | ||
| display_error(f"Invalid protocol: {protocol}. Choose 'http' or 'mcp'.") | ||
| return None No newline at end of file |
There was a problem hiding this comment.
The launch method has grown quite large and handles logic for both HTTP and MCP protocols, including server setup, path management, and request handling. For better maintainability and readability, would you consider refactoring the core logic for each protocol into separate private helper methods (e.g., _launch_http_server(...) and _launch_mcp_server(...))? This could make the main launch method a cleaner dispatcher.
| display_error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| print(f"\nTo add MCP capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module} mcp praison-mcp starlette uvicorn") # Added mcp, praison-mcp, starlette, uvicorn |
There was a problem hiding this comment.
The installation instruction for MCP dependencies suggests pip install {missing_module} mcp praison-mcp starlette uvicorn. Could you clarify if praison-mcp is a publicly available package or an internal one? If it's not public, or if mcp (which seems to refer to model-context-protocol) is the primary package, updating this instruction might prevent confusion for users trying to set up MCP mode.
| def launch(self, path: str = '/agents', port: int = 8000, host: str = '0.0.0.0', debug: bool = False, protocol: str = "http"): | ||
| """ | ||
| Launch all agents as a single API endpoint. The endpoint accepts a query and processes it through | ||
| all agents in sequence, with the output of each agent feeding into the next. | ||
| Launch all agents as a single API endpoint (HTTP) or an MCP server. | ||
| In HTTP mode, the endpoint accepts a query and processes it through all agents in sequence. | ||
| In MCP mode, an MCP server is started, exposing a tool to run the agent workflow. | ||
|
|
||
| Args: | ||
| path: API endpoint path (default: '/agents') | ||
| path: API endpoint path (default: '/agents') for HTTP, or base path for MCP. | ||
| port: Server port (default: 8000) | ||
| host: Server host (default: '0.0.0.0') | ||
| debug: Enable debug mode for uvicorn (default: False) | ||
| protocol: "http" to launch as FastAPI, "mcp" to launch as MCP server. | ||
|
|
||
| Returns: | ||
| None | ||
| """ | ||
| global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps | ||
|
|
||
| if not self.agents: | ||
| logging.warning("No agents to launch. Add agents to the Agents instance first.") | ||
| return | ||
| if protocol == "http": | ||
| global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps | ||
|
|
||
| # Try to import FastAPI dependencies - lazy loading | ||
| try: | ||
| import uvicorn | ||
| from fastapi import FastAPI, HTTPException, Request | ||
| from fastapi.responses import JSONResponse | ||
| from pydantic import BaseModel | ||
| import threading | ||
| import time | ||
|
|
||
| # Define the request model here since we need pydantic | ||
| class AgentQuery(BaseModel): | ||
| query: str | ||
| if not self.agents: | ||
| logging.warning("No agents to launch for HTTP mode. Add agents to the Agents instance first.") | ||
| return | ||
|
|
||
| except ImportError as e: | ||
| # Check which specific module is missing | ||
| missing_module = str(e).split("No module named '")[-1].rstrip("'") | ||
| display_error(f"Missing dependency: {missing_module}. Required for launch() method.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method.") | ||
| print(f"\nTo add API capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module}") | ||
| print("\nOr install all API dependencies with:") | ||
| print("pip install 'praisonaiagents[api]'") | ||
| return None | ||
|
|
||
| # Initialize port-specific collections if needed | ||
| if port not in _agents_registered_endpoints: | ||
| _agents_registered_endpoints[port] = {} | ||
| # Try to import FastAPI dependencies - lazy loading | ||
| try: | ||
| import uvicorn | ||
| from fastapi import FastAPI, HTTPException, Request | ||
| from fastapi.responses import JSONResponse | ||
| from pydantic import BaseModel | ||
| import threading | ||
| import time | ||
| import asyncio # Ensure asyncio is imported for HTTP mode too | ||
|
|
||
| # Define the request model here since we need pydantic | ||
| class AgentQuery(BaseModel): | ||
| query: str | ||
|
|
||
| except ImportError as e: | ||
| # Check which specific module is missing | ||
| missing_module = str(e).split("No module named '")[-1].rstrip("'") | ||
| display_error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") | ||
| print(f"\nTo add API capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module}") | ||
| print("\nOr install all API dependencies with:") | ||
| print("pip install 'praisonaiagents[api]'") | ||
| return None | ||
|
|
||
| # Initialize shared FastAPI app if not already created for this port | ||
| if _agents_shared_apps.get(port) is None: | ||
| _agents_shared_apps[port] = FastAPI( | ||
| title=f"PraisonAI Agents API (Port {port})", | ||
| description="API for interacting with multiple PraisonAI Agents" | ||
| ) | ||
| # Initialize port-specific collections if needed | ||
| if port not in _agents_registered_endpoints: | ||
| _agents_registered_endpoints[port] = {} | ||
|
|
||
| # Initialize shared FastAPI app if not already created for this port | ||
| if _agents_shared_apps.get(port) is None: | ||
| _agents_shared_apps[port] = FastAPI( | ||
| title=f"PraisonAI Agents API (Port {port})", | ||
| description="API for interacting with multiple PraisonAI Agents" | ||
| ) | ||
|
|
||
| # Add a root endpoint with a welcome message | ||
| @_agents_shared_apps[port].get("/") | ||
| async def root(): | ||
| return { | ||
| "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", | ||
| "endpoints": list(_agents_registered_endpoints[port].keys()) | ||
| } | ||
|
|
||
| # Add healthcheck endpoint | ||
| @_agents_shared_apps[port].get("/health") | ||
| async def healthcheck(): | ||
| return { | ||
| "status": "ok", | ||
| "endpoints": list(_agents_registered_endpoints[port].keys()) | ||
| } | ||
|
|
||
| # Add a root endpoint with a welcome message | ||
| @_agents_shared_apps[port].get("/") | ||
| async def root(): | ||
| return { | ||
| "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", | ||
| "endpoints": list(_agents_registered_endpoints[port].keys()) | ||
| } | ||
| # Normalize path to ensure it starts with / | ||
| if not path.startswith('/'): | ||
| path = f'/{path}' | ||
|
|
||
| # Check if path is already registered for this port | ||
| if path in _agents_registered_endpoints[port]: | ||
| logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") | ||
| print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") | ||
| # Use a modified path to avoid conflicts | ||
| original_path = path | ||
| instance_id = str(uuid.uuid4())[:6] | ||
| path = f"{path}_{instance_id}" | ||
| logging.warning(f"Using '{path}' instead of '{original_path}'") | ||
| print(f"🔄 Using '{path}' instead") | ||
|
|
||
| # Add healthcheck endpoint | ||
| @_agents_shared_apps[port].get("/health") | ||
| async def healthcheck(): | ||
| return { | ||
| "status": "ok", | ||
| "endpoints": list(_agents_registered_endpoints[port].keys()) | ||
| } | ||
|
|
||
| # Normalize path to ensure it starts with / | ||
| if not path.startswith('/'): | ||
| path = f'/{path}' | ||
| # Generate a unique ID for this agent group's endpoint | ||
| endpoint_id = str(uuid.uuid4()) | ||
| _agents_registered_endpoints[port][path] = endpoint_id | ||
|
|
||
| # Check if path is already registered for this port | ||
| if path in _agents_registered_endpoints[port]: | ||
| logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") | ||
| print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") | ||
| # Use a modified path to avoid conflicts | ||
| original_path = path | ||
| instance_id = str(uuid.uuid4())[:6] | ||
| path = f"{path}_{instance_id}" | ||
| logging.warning(f"Using '{path}' instead of '{original_path}'") | ||
| print(f"🔄 Using '{path}' instead") | ||
|
|
||
| # Generate a unique ID for this agent group's endpoint | ||
| endpoint_id = str(uuid.uuid4()) | ||
| _agents_registered_endpoints[port][path] = endpoint_id | ||
|
|
||
| # Define the endpoint handler | ||
| @_agents_shared_apps[port].post(path) | ||
| async def handle_query(request: Request, query_data: Optional[AgentQuery] = None): | ||
| # Handle both direct JSON with query field and form data | ||
| if query_data is None: | ||
| # Define the endpoint handler | ||
| @_agents_shared_apps[port].post(path) | ||
| async def handle_query(request: Request, query_data: Optional[AgentQuery] = None): | ||
| # Handle both direct JSON with query field and form data | ||
| if query_data is None: | ||
| try: | ||
| request_data = await request.json() | ||
| if "query" not in request_data: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| query = request_data["query"] | ||
| except: | ||
| # Fallback to form data or query params | ||
| form_data = await request.form() | ||
| if "query" in form_data: | ||
| query = form_data["query"] | ||
| else: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| else: | ||
| query = query_data.query | ||
| try: | ||
| request_data = await request.json() | ||
| if "query" not in request_data: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| query = request_data["query"] | ||
| except: | ||
| # Fallback to form data or query params | ||
| form_data = await request.form() | ||
| if "query" in form_data: | ||
| query = form_data["query"] | ||
| else: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| # Process the query sequentially through all agents | ||
| current_input = query | ||
| results = [] | ||
|
|
||
| for agent_instance in self.agents: # Corrected variable name to agent_instance | ||
| try: | ||
| # Use async version if available, otherwise use sync version | ||
| if asyncio.iscoroutinefunction(agent_instance.chat): | ||
| response = await agent_instance.achat(current_input) | ||
| else: | ||
| # Run sync function in a thread to avoid blocking | ||
| loop = asyncio.get_event_loop() | ||
| # Correctly pass current_input to the lambda for closure | ||
| response = await loop.run_in_executor(None, lambda ci=current_input: agent_instance.chat(ci)) | ||
|
|
||
| # Store this agent's result | ||
| results.append({ | ||
| "agent": agent_instance.name, | ||
| "response": response | ||
| }) | ||
|
|
||
| # Use this response as input to the next agent | ||
| current_input = response | ||
| except Exception as e: | ||
| logging.error(f"Error with agent {agent_instance.name}: {str(e)}", exc_info=True) | ||
| results.append({ | ||
| "agent": agent_instance.name, | ||
| "error": str(e) | ||
| }) | ||
| # Decide error handling: continue with original input, last good input, or stop? | ||
| # For now, let's continue with the last successful 'current_input' or original 'query' if first agent fails | ||
| # This part might need refinement based on desired behavior. | ||
| # If an agent fails, its 'response' might be None or an error string. | ||
| # current_input will carry that forward. Or, we could choose to halt or use last good input. | ||
|
|
||
| # Return all results and the final output | ||
| return { | ||
| "query": query, | ||
| "results": results, | ||
| "final_response": current_input | ||
| } | ||
| except Exception as e: | ||
| logging.error(f"Error processing query: {str(e)}", exc_info=True) | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={"error": f"Error processing query: {str(e)}"} | ||
| ) | ||
|
|
||
| print(f"🚀 Multi-Agent HTTP API available at http://{host}:{port}{path}") | ||
| agent_names = ", ".join([agent.name for agent in self.agents]) | ||
| print(f"📊 Available agents for this endpoint ({len(self.agents)}): {agent_names}") | ||
|
|
||
| # Start the server if it's not already running for this port | ||
| if not _agents_server_started.get(port, False): | ||
| # Mark the server as started first to prevent duplicate starts | ||
| _agents_server_started[port] = True | ||
|
|
||
| # Start the server in a separate thread | ||
| def run_server(): | ||
| try: | ||
| print(f"✅ FastAPI server started at http://{host}:{port}") | ||
| print(f"📚 API documentation available at http://{host}:{port}/docs") | ||
| print(f"🔌 Registered HTTP endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") | ||
| uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") | ||
| except Exception as e: | ||
| logging.error(f"Error starting server: {str(e)}", exc_info=True) | ||
| print(f"❌ Error starting server: {str(e)}") | ||
|
|
||
| # Run server in a background thread | ||
| server_thread = threading.Thread(target=run_server, daemon=True) | ||
| server_thread.start() | ||
|
|
||
| # Wait for a moment to allow the server to start and register endpoints | ||
| time.sleep(0.5) | ||
| else: | ||
| query = query_data.query | ||
| # If server is already running, wait a moment to make sure the endpoint is registered | ||
| time.sleep(0.1) | ||
| print(f"🔌 Registered HTTP endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") | ||
|
|
||
| # Get the stack frame to check if this is the last launch() call in the script | ||
| import inspect | ||
| stack = inspect.stack() | ||
|
|
||
| # If this is called from a Python script (not interactive), try to detect if it's the last launch call | ||
| if len(stack) > 1 and stack[1].filename.endswith('.py'): | ||
| caller_frame = stack[1] | ||
| caller_line = caller_frame.lineno | ||
|
|
||
| try: | ||
| # Read the file to check if there are more launch calls after this one | ||
| with open(caller_frame.filename, 'r') as f: | ||
| lines = f.readlines() | ||
|
|
||
| # Check if there are more launch() calls after the current line | ||
| has_more_launches = False | ||
| for line_content in lines[caller_line:]: # Renamed variable | ||
| if '.launch(' in line_content and not line_content.strip().startswith('#'): | ||
| has_more_launches = True | ||
| break | ||
|
|
||
| # If this is the last launch call, block the main thread | ||
| if not has_more_launches: | ||
| try: | ||
| print("\nAll agent groups registered for HTTP mode. Press Ctrl+C to stop the servers.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
| except Exception as e: | ||
| # If something goes wrong with detection, block anyway to be safe | ||
| logging.error(f"Error in HTTP launch detection: {e}") | ||
| try: | ||
| print("\nKeeping HTTP servers alive. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
| return None | ||
|
|
||
| elif protocol == "mcp": | ||
| if not self.agents: | ||
| logging.warning("No agents to launch for MCP mode. Add agents to the Agents instance first.") | ||
| return | ||
|
|
||
| try: | ||
| # Process the query sequentially through all agents | ||
| current_input = query | ||
| results = [] | ||
| import uvicorn | ||
| from mcp.server.fastmcp import FastMCP | ||
| from mcp.server.sse import SseServerTransport | ||
| from starlette.applications import Starlette | ||
| from starlette.requests import Request | ||
| from starlette.routing import Mount, Route | ||
| # from mcp.server import Server as MCPServer # Not directly needed if using FastMCP's server | ||
| import threading | ||
| import time | ||
| import inspect | ||
| import asyncio | ||
| # logging is already imported at the module level | ||
|
|
||
| for agent in self.agents: | ||
| except ImportError as e: | ||
| missing_module = str(e).split("No module named '")[-1].rstrip("'") | ||
| display_error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| print(f"\nTo add MCP capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module} mcp praison-mcp starlette uvicorn") | ||
| print("\nOr install all MCP dependencies with relevant packages.") | ||
| return None | ||
|
|
||
| mcp_instance = FastMCP("praisonai_workflow_mcp_server") | ||
|
|
||
| # Determine the MCP tool name for the workflow based on self.name | ||
| actual_mcp_tool_name = (f"execute_{self.name.lower().replace(' ', '_').replace('-', '_')}_workflow" if self.name | ||
| else "execute_workflow") | ||
|
|
||
| @mcp_instance.tool(name=actual_mcp_tool_name) | ||
| async def execute_workflow_tool(query: str) -> str: # Renamed for clarity | ||
| """Executes the defined agent workflow with the given query.""" | ||
| logging.info(f"MCP tool '{actual_mcp_tool_name}' called with query: {query}") | ||
| current_input = query | ||
| final_response = "No agents in workflow or workflow did not produce a final response." | ||
|
|
||
| for agent_instance in self.agents: | ||
| try: | ||
| # Use async version if available, otherwise use sync version | ||
| if asyncio.iscoroutinefunction(agent.chat): | ||
| response = await agent.achat(current_input) | ||
| else: | ||
| # Run sync function in a thread to avoid blocking | ||
| logging.debug(f"Processing with agent: {agent_instance.name}") | ||
| if hasattr(agent_instance, 'achat') and asyncio.iscoroutinefunction(agent_instance.achat): | ||
| response = await agent_instance.achat(current_input, tools=agent_instance.tools) | ||
| elif hasattr(agent_instance, 'chat'): # Fallback to sync chat if achat not suitable | ||
| loop = asyncio.get_event_loop() | ||
| response = await loop.run_in_executor(None, lambda: agent.chat(current_input)) | ||
|
|
||
| # Store this agent's result | ||
| results.append({ | ||
| "agent": agent.name, | ||
| "response": response | ||
| }) | ||
| response = await loop.run_in_executor(None, lambda ci=current_input: agent_instance.chat(ci, tools=agent_instance.tools)) | ||
| else: | ||
| logging.warning(f"Agent {agent_instance.name} has no suitable chat or achat method.") | ||
| response = f"Error: Agent {agent_instance.name} has no callable chat method." | ||
|
|
||
| # Use this response as input to the next agent | ||
| current_input = response | ||
| current_input = response if response is not None else "Agent returned no response." | ||
| final_response = current_input # Keep track of the last valid response | ||
| logging.debug(f"Agent {agent_instance.name} responded. Current intermediate output: {current_input}") | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Error with agent {agent.name}: {str(e)}", exc_info=True) | ||
| results.append({ | ||
| "agent": agent.name, | ||
| "error": str(e) | ||
| }) | ||
| # Continue with original input if there's an error | ||
| logging.error(f"Error during agent {agent_instance.name} execution in MCP workflow: {str(e)}", exc_info=True) | ||
| current_input = f"Error from agent {agent_instance.name}: {str(e)}" | ||
| final_response = current_input # Update final response to show error | ||
| # Optionally break or continue based on desired error handling for the workflow | ||
| # For now, we continue, and the error is passed to the next agent or returned. | ||
|
|
||
| # Return all results and the final output | ||
| return { | ||
| "query": query, | ||
| "results": results, | ||
| "final_response": current_input | ||
| } | ||
| except Exception as e: | ||
| logging.error(f"Error processing query: {str(e)}", exc_info=True) | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={"error": f"Error processing query: {str(e)}"} | ||
| ) | ||
|
|
||
| print(f"🚀 Multi-Agent API available at http://{host}:{port}{path}") | ||
| agent_names = ", ".join([agent.name for agent in self.agents]) | ||
| print(f"📊 Available agents ({len(self.agents)}): {agent_names}") | ||
|
|
||
| # Start the server if it's not already running for this port | ||
| if not _agents_server_started.get(port, False): | ||
| # Mark the server as started first to prevent duplicate starts | ||
| _agents_server_started[port] = True | ||
| logging.info(f"MCP tool '{actual_mcp_tool_name}' completed. Final response: {final_response}") | ||
| return final_response | ||
|
|
||
| base_mcp_path = path.rstrip('/') | ||
| sse_mcp_path = f"{base_mcp_path}/sse" | ||
| messages_mcp_path_prefix = f"{base_mcp_path}/messages" | ||
| if not messages_mcp_path_prefix.endswith('/'): | ||
| messages_mcp_path_prefix += '/' | ||
|
|
||
| sse_transport_mcp = SseServerTransport(messages_mcp_path_prefix) | ||
|
|
||
| async def handle_mcp_sse_connection(request: Request) -> None: | ||
| logging.debug(f"MCP SSE connection request from {request.client} for path {request.url.path}") | ||
| async with sse_transport_mcp.connect_sse( | ||
| request.scope, request.receive, request._send, | ||
| ) as (read_stream, write_stream): | ||
| await mcp_instance._mcp_server.run( | ||
| read_stream, write_stream, mcp_instance._mcp_server.create_initialization_options(), | ||
| ) | ||
|
|
||
| # Start the server in a separate thread | ||
| def run_server(): | ||
| starlette_mcp_app = Starlette( | ||
| debug=debug, | ||
| routes=[ | ||
| Route(sse_mcp_path, endpoint=handle_mcp_sse_connection), | ||
| Mount(messages_mcp_path_prefix, app=sse_transport_mcp.handle_post_message), | ||
| ], | ||
| ) | ||
|
|
||
| print(f"🚀 PraisonAIAgents MCP Workflow server starting on http://{host}:{port}") | ||
| print(f"📡 MCP SSE endpoint available at {sse_mcp_path}") | ||
| print(f"📢 MCP messages post to {messages_mcp_path_prefix}") | ||
| # Instead of trying to extract tool names, hardcode the known tool name | ||
| mcp_tool_names = [actual_mcp_tool_name] # Use the determined dynamic tool name | ||
| print(f"🛠️ Available MCP tools: {', '.join(mcp_tool_names)}") | ||
| agent_names_in_workflow = ", ".join([a.name for a in self.agents]) | ||
| print(f"🔄 Agents in MCP workflow: {agent_names_in_workflow}") | ||
|
|
||
| def run_praison_mcp_server(): | ||
| try: | ||
| print(f"✅ FastAPI server started at http://{host}:{port}") | ||
| print(f"📚 API documentation available at http://{host}:{port}/docs") | ||
| print(f"🔌 Available endpoints: {', '.join(list(_agents_registered_endpoints[port].keys()))}") | ||
| uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") | ||
| uvicorn.run(starlette_mcp_app, host=host, port=port, log_level="debug" if debug else "info") | ||
| except Exception as e: | ||
| logging.error(f"Error starting server: {str(e)}", exc_info=True) | ||
| print(f"❌ Error starting server: {str(e)}") | ||
|
|
||
| # Run server in a background thread | ||
| server_thread = threading.Thread(target=run_server, daemon=True) | ||
| server_thread.start() | ||
|
|
||
| # Wait for a moment to allow the server to start and register endpoints | ||
| time.sleep(0.5) | ||
| else: | ||
| # If server is already running, wait a moment to make sure the endpoint is registered | ||
| time.sleep(0.1) | ||
| print(f"🔌 Available endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") | ||
|
|
||
| # Get the stack frame to check if this is the last launch() call in the script | ||
| import inspect | ||
| stack = inspect.stack() | ||
|
|
||
| # If this is called from a Python script (not interactive), try to detect if it's the last launch call | ||
| if len(stack) > 1 and stack[1].filename.endswith('.py'): | ||
| caller_frame = stack[1] | ||
| caller_line = caller_frame.lineno | ||
|
|
||
| try: | ||
| # Read the file to check if there are more launch calls after this one | ||
| with open(caller_frame.filename, 'r') as f: | ||
| lines = f.readlines() | ||
|
|
||
| # Check if there are more launch() calls after the current line | ||
| has_more_launches = False | ||
| for line in lines[caller_line:]: | ||
| if '.launch(' in line and not line.strip().startswith('#'): | ||
| has_more_launches = True | ||
| break | ||
|
|
||
| # If this is the last launch call, block the main thread | ||
| if not has_more_launches: | ||
| logging.error(f"Error starting PraisonAIAgents MCP server: {str(e)}", exc_info=True) | ||
| print(f"❌ Error starting PraisonAIAgents MCP server: {str(e)}") | ||
|
|
||
| mcp_server_thread = threading.Thread(target=run_praison_mcp_server, daemon=True) | ||
| mcp_server_thread.start() | ||
| time.sleep(0.5) | ||
|
|
||
| import inspect | ||
| stack = inspect.stack() | ||
| if len(stack) > 1 and stack[1].filename.endswith('.py'): | ||
| caller_frame = stack[1] | ||
| caller_line = caller_frame.lineno | ||
| try: | ||
| with open(caller_frame.filename, 'r') as f: | ||
| lines = f.readlines() | ||
| has_more_launches = False | ||
| for line_content in lines[caller_line:]: | ||
| if '.launch(' in line_content and not line_content.strip().startswith('#'): | ||
| has_more_launches = True | ||
| break | ||
| if not has_more_launches: | ||
| try: | ||
| print("\nPraisonAIAgents MCP server running. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nPraisonAIAgents MCP Server stopped") | ||
| except Exception as e: | ||
| logging.error(f"Error in PraisonAIAgents MCP launch detection: {e}") | ||
| try: | ||
| print("\nAll agents registered. Press Ctrl+C to stop the servers.") | ||
| print("\nKeeping PraisonAIAgents MCP server alive. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
| except Exception as e: | ||
| # If something goes wrong with detection, block anyway to be safe | ||
| logging.error(f"Error in launch detection: {e}") | ||
| try: | ||
| print("\nKeeping servers alive. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
|
|
||
| return None No newline at end of file | ||
| print("\nPraisonAIAgents MCP Server stopped") | ||
| return None | ||
| else: | ||
| display_error(f"Invalid protocol: {protocol}. Choose 'http' or 'mcp'.") | ||
| return None No newline at end of file |
There was a problem hiding this comment.
| display_error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") | ||
| print(f"\nTo add MCP capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module} mcp praison-mcp starlette uvicorn") |
There was a problem hiding this comment.
The pip install suggestion for MCP dependencies includes praison-mcp. As with the similar comment in agent.py, it would be helpful to clarify if this is a public package or if the instruction should point to a different package like mcp-model-context-protocol to avoid potential installation issues for users.
| print("\nOr install all MCP dependencies with relevant packages.") | ||
| return None | ||
|
|
||
| mcp_instance = FastMCP("praisonai_workflow_mcp_server") |
There was a problem hiding this comment.
The MCP server instance name is hardcoded as "praisonai_workflow_mcp_server". For consistency and better identification when multiple PraisonAIAgents instances might be launched, would it be better to make this name dynamic, perhaps incorporating self.name if it's provided? For example: mcp_instance_name = f"{self.name}_mcp_workflow_server" if self.name else "praisonai_workflow_mcp_server".
| mcp_instance = FastMCP("praisonai_workflow_mcp_server") | |
| mcp_instance_name = f"{self.name}_mcp_workflow_server" if self.name else "praisonai_workflow_mcp_server" | |
| mcp_instance = FastMCP(mcp_instance_name) |
2.1.6
Summary by CodeRabbit
New Features
Documentation
Bug Fixes
Chores