From e50db48c7cffcbc924a81d8177031cdf090eb0ac Mon Sep 17 00:00:00 2001 From: MervinPraison Date: Wed, 14 May 2025 12:49:58 +0100 Subject: [PATCH] Add multi-agent API functionality in `multi-agents-api.py` and `multi-agents-group-api.py`, increment version to 0.0.80 in `pyproject.toml` and `uv.lock`, and enhance agent launch logic in `agents.py` with improved endpoint management and healthcheck capabilities. --- src/praisonai-agents/multi-agents-api.py | 6 + .../multi-agents-group-api.py | 8 + .../praisonaiagents/agents/agents.py | 233 +++++++++++++++++- src/praisonai-agents/pyproject.toml | 2 +- src/praisonai-agents/uv.lock | 2 +- 5 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 src/praisonai-agents/multi-agents-api.py create mode 100644 src/praisonai-agents/multi-agents-group-api.py diff --git a/src/praisonai-agents/multi-agents-api.py b/src/praisonai-agents/multi-agents-api.py new file mode 100644 index 000000000..8f6ac2444 --- /dev/null +++ b/src/praisonai-agents/multi-agents-api.py @@ -0,0 +1,6 @@ +from praisonaiagents import Agent, Agents, Tools + +research_agent = Agent(name="Research", instructions="You are a research agent to search internet about AI 2024", tools=[Tools.internet_search]) +summarise_agent = Agent(name="Summarise", instructions="You are a summarize agent to summarise in points") +agents = Agents(agents=[research_agent, summarise_agent]) +agents.launch(path="/agents", port=3030) \ No newline at end of file diff --git a/src/praisonai-agents/multi-agents-group-api.py b/src/praisonai-agents/multi-agents-group-api.py new file mode 100644 index 000000000..8888cb1cf --- /dev/null +++ b/src/praisonai-agents/multi-agents-group-api.py @@ -0,0 +1,8 @@ +from praisonaiagents import Agent, Agents, Tools + +research_agent = Agent(name="Research", instructions="You are a research agent to search internet about AI 2024", tools=[Tools.internet_search]) +summarise_agent = Agent(name="Summarise", instructions="You are a summarize agent to summarise in points") +agents = Agents(agents=[research_agent, summarise_agent]) +agents2 = Agents(agents=[research_agent]) +agents.launch(path="/agents", port=3030) +agents2.launch(path="/agents2", port=3030) \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/agents/agents.py b/src/praisonai-agents/praisonaiagents/agents/agents.py index 354a90c3e..31b86481d 100644 --- a/src/praisonai-agents/praisonaiagents/agents/agents.py +++ b/src/praisonai-agents/praisonaiagents/agents/agents.py @@ -17,6 +17,11 @@ # Set up logger logger = logging.getLogger(__name__) +# Global variables for managing the shared servers +_agents_server_started = {} # Dict of port -> started boolean +_agents_registered_endpoints = {} # Dict of port -> Dict of path -> endpoint_id +_agents_shared_apps = {} # Dict of port -> FastAPI app + def encode_file_to_base64(file_path: str) -> str: """Base64-encode a file.""" import base64 @@ -878,4 +883,230 @@ def update_state(self, updates: Dict) -> None: def clear_state(self) -> None: """Clear all state values""" - self._state.clear() \ No newline at end of file + self._state.clear() + + def launch(self, path: str = '/agents', port: int = 8000, host: str = '0.0.0.0', debug: bool = False): + """ + Launch all agents as a single API endpoint. The endpoint accepts a query and processes it through + all agents in sequence, with the output of each agent feeding into the next. + + Args: + path: API endpoint path (default: '/agents') + port: Server port (default: 8000) + host: Server host (default: '0.0.0.0') + debug: Enable debug mode for uvicorn (default: False) + + Returns: + None + """ + global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps + + if not self.agents: + logging.warning("No agents to launch. Add agents to the Agents instance first.") + return + + # Try to import FastAPI dependencies - lazy loading + try: + import uvicorn + from fastapi import FastAPI, HTTPException, Request + from fastapi.responses import JSONResponse + from pydantic import BaseModel + import threading + import time + + # Define the request model here since we need pydantic + class AgentQuery(BaseModel): + query: str + + except ImportError as e: + # Check which specific module is missing + missing_module = str(e).split("No module named '")[-1].rstrip("'") + display_error(f"Missing dependency: {missing_module}. Required for launch() method.") + logging.error(f"Missing dependency: {missing_module}. Required for launch() method.") + print(f"\nTo add API capabilities, install the required dependencies:") + print(f"pip install {missing_module}") + print("\nOr install all API dependencies with:") + print("pip install 'praisonaiagents[api]'") + return None + + # Initialize port-specific collections if needed + if port not in _agents_registered_endpoints: + _agents_registered_endpoints[port] = {} + + # Initialize shared FastAPI app if not already created for this port + if _agents_shared_apps.get(port) is None: + _agents_shared_apps[port] = FastAPI( + title=f"PraisonAI Agents API (Port {port})", + description="API for interacting with multiple PraisonAI Agents" + ) + + # Add a root endpoint with a welcome message + @_agents_shared_apps[port].get("/") + async def root(): + return { + "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", + "endpoints": list(_agents_registered_endpoints[port].keys()) + } + + # Add healthcheck endpoint + @_agents_shared_apps[port].get("/health") + async def healthcheck(): + return { + "status": "ok", + "endpoints": list(_agents_registered_endpoints[port].keys()) + } + + # Normalize path to ensure it starts with / + if not path.startswith('/'): + path = f'/{path}' + + # Check if path is already registered for this port + if path in _agents_registered_endpoints[port]: + logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") + print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") + # Use a modified path to avoid conflicts + original_path = path + instance_id = str(uuid.uuid4())[:6] + path = f"{path}_{instance_id}" + logging.warning(f"Using '{path}' instead of '{original_path}'") + print(f"🔄 Using '{path}' instead") + + # Generate a unique ID for this agent group's endpoint + endpoint_id = str(uuid.uuid4()) + _agents_registered_endpoints[port][path] = endpoint_id + + # Define the endpoint handler + @_agents_shared_apps[port].post(path) + async def handle_query(request: Request, query_data: Optional[AgentQuery] = None): + # Handle both direct JSON with query field and form data + if query_data is None: + try: + request_data = await request.json() + if "query" not in request_data: + raise HTTPException(status_code=400, detail="Missing 'query' field in request") + query = request_data["query"] + except: + # Fallback to form data or query params + form_data = await request.form() + if "query" in form_data: + query = form_data["query"] + else: + raise HTTPException(status_code=400, detail="Missing 'query' field in request") + else: + query = query_data.query + + try: + # Process the query sequentially through all agents + current_input = query + results = [] + + for agent in self.agents: + try: + # Use async version if available, otherwise use sync version + if asyncio.iscoroutinefunction(agent.chat): + response = await agent.achat(current_input) + else: + # Run sync function in a thread to avoid blocking + loop = asyncio.get_event_loop() + response = await loop.run_in_executor(None, lambda: agent.chat(current_input)) + + # Store this agent's result + results.append({ + "agent": agent.name, + "response": response + }) + + # Use this response as input to the next agent + current_input = response + except Exception as e: + logging.error(f"Error with agent {agent.name}: {str(e)}", exc_info=True) + results.append({ + "agent": agent.name, + "error": str(e) + }) + # Continue with original input if there's an error + + # Return all results and the final output + return { + "query": query, + "results": results, + "final_response": current_input + } + except Exception as e: + logging.error(f"Error processing query: {str(e)}", exc_info=True) + return JSONResponse( + status_code=500, + content={"error": f"Error processing query: {str(e)}"} + ) + + print(f"🚀 Multi-Agent API available at http://{host}:{port}{path}") + agent_names = ", ".join([agent.name for agent in self.agents]) + print(f"📊 Available agents ({len(self.agents)}): {agent_names}") + + # Start the server if it's not already running for this port + if not _agents_server_started.get(port, False): + # Mark the server as started first to prevent duplicate starts + _agents_server_started[port] = True + + # Start the server in a separate thread + def run_server(): + try: + print(f"✅ FastAPI server started at http://{host}:{port}") + print(f"📚 API documentation available at http://{host}:{port}/docs") + print(f"🔌 Available endpoints: {', '.join(list(_agents_registered_endpoints[port].keys()))}") + uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") + except Exception as e: + logging.error(f"Error starting server: {str(e)}", exc_info=True) + print(f"❌ Error starting server: {str(e)}") + + # Run server in a background thread + server_thread = threading.Thread(target=run_server, daemon=True) + server_thread.start() + + # Wait for a moment to allow the server to start and register endpoints + time.sleep(0.5) + else: + # If server is already running, wait a moment to make sure the endpoint is registered + time.sleep(0.1) + print(f"🔌 Available endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") + + # Get the stack frame to check if this is the last launch() call in the script + import inspect + stack = inspect.stack() + + # If this is called from a Python script (not interactive), try to detect if it's the last launch call + if len(stack) > 1 and stack[1].filename.endswith('.py'): + caller_frame = stack[1] + caller_line = caller_frame.lineno + + try: + # Read the file to check if there are more launch calls after this one + with open(caller_frame.filename, 'r') as f: + lines = f.readlines() + + # Check if there are more launch() calls after the current line + has_more_launches = False + for line in lines[caller_line:]: + if '.launch(' in line and not line.strip().startswith('#'): + has_more_launches = True + break + + # If this is the last launch call, block the main thread + if not has_more_launches: + try: + print("\nAll agents registered. Press Ctrl+C to stop the servers.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nServers stopped") + except Exception as e: + # If something goes wrong with detection, block anyway to be safe + logging.error(f"Error in launch detection: {e}") + try: + print("\nKeeping servers alive. Press Ctrl+C to stop.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nServers stopped") + + return None \ No newline at end of file diff --git a/src/praisonai-agents/pyproject.toml b/src/praisonai-agents/pyproject.toml index 89580a9d7..0c2b1a110 100644 --- a/src/praisonai-agents/pyproject.toml +++ b/src/praisonai-agents/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "praisonaiagents" -version = "0.0.79" +version = "0.0.80" description = "Praison AI agents for completing complex tasks with Self Reflection Agents" authors = [ { name="Mervin Praison" } diff --git a/src/praisonai-agents/uv.lock b/src/praisonai-agents/uv.lock index 9ce250061..55b41153f 100644 --- a/src/praisonai-agents/uv.lock +++ b/src/praisonai-agents/uv.lock @@ -1883,7 +1883,7 @@ wheels = [ [[package]] name = "praisonaiagents" -version = "0.0.79" +version = "0.0.80" source = { editable = "." } dependencies = [ { name = "mcp" },