-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add multi-agent API functionality in multi-agents-api.py and `multi…
#476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| from praisonaiagents import Agent, Agents, Tools | ||
|
|
||
| research_agent = Agent(name="Research", instructions="You are a research agent to search internet about AI 2024", tools=[Tools.internet_search]) | ||
| summarise_agent = Agent(name="Summarise", instructions="You are a summarize agent to summarise in points") | ||
| agents = Agents(agents=[research_agent, summarise_agent]) | ||
| agents.launch(path="/agents", port=3030) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| from praisonaiagents import Agent, Agents, Tools | ||
|
|
||
| research_agent = Agent(name="Research", instructions="You are a research agent to search internet about AI 2024", tools=[Tools.internet_search]) | ||
| summarise_agent = Agent(name="Summarise", instructions="You are a summarize agent to summarise in points") | ||
| agents = Agents(agents=[research_agent, summarise_agent]) | ||
| agents2 = Agents(agents=[research_agent]) | ||
| agents.launch(path="/agents", port=3030) | ||
| agents2.launch(path="/agents2", port=3030) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,11 @@ | |
| # Set up logger | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Global variables for managing the shared servers | ||
| _agents_server_started = {} # Dict of port -> started boolean | ||
| _agents_registered_endpoints = {} # Dict of port -> Dict of path -> endpoint_id | ||
| _agents_shared_apps = {} # Dict of port -> FastAPI app | ||
|
|
||
| def encode_file_to_base64(file_path: str) -> str: | ||
| """Base64-encode a file.""" | ||
| import base64 | ||
|
|
@@ -878,4 +883,230 @@ def update_state(self, updates: Dict) -> None: | |
|
|
||
| def clear_state(self) -> None: | ||
| """Clear all state values""" | ||
| self._state.clear() | ||
| self._state.clear() | ||
|
|
||
| def launch(self, path: str = '/agents', port: int = 8000, host: str = '0.0.0.0', debug: bool = False): | ||
| """ | ||
| Launch all agents as a single API endpoint. The endpoint accepts a query and processes it through | ||
| all agents in sequence, with the output of each agent feeding into the next. | ||
|
|
||
| Args: | ||
| path: API endpoint path (default: '/agents') | ||
| port: Server port (default: 8000) | ||
| host: Server host (default: '0.0.0.0') | ||
| debug: Enable debug mode for uvicorn (default: False) | ||
|
|
||
| Returns: | ||
| None | ||
| """ | ||
| global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps | ||
|
|
||
| if not self.agents: | ||
| logging.warning("No agents to launch. Add agents to the Agents instance first.") | ||
| return | ||
|
|
||
| # Try to import FastAPI dependencies - lazy loading | ||
| try: | ||
| import uvicorn | ||
| from fastapi import FastAPI, HTTPException, Request | ||
| from fastapi.responses import JSONResponse | ||
| from pydantic import BaseModel | ||
| import threading | ||
| import time | ||
|
|
||
| # Define the request model here since we need pydantic | ||
| class AgentQuery(BaseModel): | ||
| query: str | ||
|
|
||
| except ImportError as e: | ||
| # Check which specific module is missing | ||
| missing_module = str(e).split("No module named '")[-1].rstrip("'") | ||
| display_error(f"Missing dependency: {missing_module}. Required for launch() method.") | ||
| logging.error(f"Missing dependency: {missing_module}. Required for launch() method.") | ||
| print(f"\nTo add API capabilities, install the required dependencies:") | ||
| print(f"pip install {missing_module}") | ||
| print("\nOr install all API dependencies with:") | ||
| print("pip install 'praisonaiagents[api]'") | ||
| return None | ||
|
|
||
| # Initialize port-specific collections if needed | ||
| if port not in _agents_registered_endpoints: | ||
| _agents_registered_endpoints[port] = {} | ||
|
|
||
| # Initialize shared FastAPI app if not already created for this port | ||
| if _agents_shared_apps.get(port) is None: | ||
| _agents_shared_apps[port] = FastAPI( | ||
| title=f"PraisonAI Agents API (Port {port})", | ||
| description="API for interacting with multiple PraisonAI Agents" | ||
| ) | ||
|
|
||
| # Add a root endpoint with a welcome message | ||
| @_agents_shared_apps[port].get("/") | ||
| async def root(): | ||
| return { | ||
| "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", | ||
| "endpoints": list(_agents_registered_endpoints[port].keys()) | ||
| } | ||
|
|
||
| # Add healthcheck endpoint | ||
| @_agents_shared_apps[port].get("/health") | ||
| async def healthcheck(): | ||
| return { | ||
| "status": "ok", | ||
| "endpoints": list(_agents_registered_endpoints[port].keys()) | ||
| } | ||
|
|
||
| # Normalize path to ensure it starts with / | ||
| if not path.startswith('/'): | ||
| path = f'/{path}' | ||
|
|
||
| # Check if path is already registered for this port | ||
| if path in _agents_registered_endpoints[port]: | ||
| logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") | ||
| print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") | ||
| # Use a modified path to avoid conflicts | ||
| original_path = path | ||
| instance_id = str(uuid.uuid4())[:6] | ||
| path = f"{path}_{instance_id}" | ||
| logging.warning(f"Using '{path}' instead of '{original_path}'") | ||
| print(f"🔄 Using '{path}' instead") | ||
|
Comment on lines
+963
to
+972
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for handling path conflicts could be improved. Instead of simply appending a UUID, consider implementing a more user-friendly approach, such as suggesting available paths or providing a configuration option to specify a different path.1 # Suggest available paths or provide a configuration option
logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path or configure a unique path.")
print(f"⚠️ Warning: Path '{path}' is already registered on port {port}. Please use a different path or configure a unique path.")
return # Stop further execution to avoid conflictsStyle Guide ReferencesFootnotes
|
||
|
|
||
| # Generate a unique ID for this agent group's endpoint | ||
| endpoint_id = str(uuid.uuid4()) | ||
| _agents_registered_endpoints[port][path] = endpoint_id | ||
|
|
||
| # Define the endpoint handler | ||
| @_agents_shared_apps[port].post(path) | ||
| async def handle_query(request: Request, query_data: Optional[AgentQuery] = None): | ||
| # Handle both direct JSON with query field and form data | ||
| if query_data is None: | ||
| try: | ||
| request_data = await request.json() | ||
| if "query" not in request_data: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
| query = request_data["query"] | ||
| except: | ||
| # Fallback to form data or query params | ||
| form_data = await request.form() | ||
| if "query" in form_data: | ||
| query = form_data["query"] | ||
| else: | ||
| raise HTTPException(status_code=400, detail="Missing 'query' field in request") | ||
|
Comment on lines
+988
to
+994
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The except json.JSONDecodeError:
logging.error("Invalid JSON format in request", exc_info=True)
raise HTTPException(status_code=400, detail="Invalid JSON format in request")
except KeyError:
logging.error("Missing 'query' field in request", exc_info=True)
raise HTTPException(status_code=400, detail="Missing 'query' field in request")Style Guide ReferencesFootnotes
|
||
| else: | ||
| query = query_data.query | ||
|
|
||
| try: | ||
| # Process the query sequentially through all agents | ||
| current_input = query | ||
| results = [] | ||
|
|
||
| for agent in self.agents: | ||
| try: | ||
| # Use async version if available, otherwise use sync version | ||
| if asyncio.iscoroutinefunction(agent.chat): | ||
| response = await agent.achat(current_input) | ||
| else: | ||
| # Run sync function in a thread to avoid blocking | ||
| loop = asyncio.get_event_loop() | ||
| response = await loop.run_in_executor(None, lambda: agent.chat(current_input)) | ||
|
|
||
| # Store this agent's result | ||
| results.append({ | ||
| "agent": agent.name, | ||
| "response": response | ||
| }) | ||
|
|
||
| # Use this response as input to the next agent | ||
| current_input = response | ||
| except Exception as e: | ||
| logging.error(f"Error with agent {agent.name}: {str(e)}", exc_info=True) | ||
| results.append({ | ||
| "agent": agent.name, | ||
| "error": str(e) | ||
| }) | ||
| # Continue with original input if there's an error | ||
|
Comment on lines
+1021
to
+1027
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of continuing with the original input when an agent encounters an error, consider adding a mechanism to halt the sequence or retry the agent with a modified input. This could prevent cascading errors and improve the overall reliability of the multi-agent system.1 except Exception as e:
logging.error(f"Error with agent {agent.name}: {str(e)}", exc_info=True)
results.append({
"agent": agent.name,
"error": str(e)
})
# Option to halt or retry instead of continuing
break # Example: Halt on errorStyle Guide ReferencesFootnotes
|
||
|
|
||
| # Return all results and the final output | ||
| return { | ||
| "query": query, | ||
| "results": results, | ||
| "final_response": current_input | ||
| } | ||
| except Exception as e: | ||
| logging.error(f"Error processing query: {str(e)}", exc_info=True) | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={"error": f"Error processing query: {str(e)}"} | ||
| ) | ||
|
|
||
| print(f"🚀 Multi-Agent API available at http://{host}:{port}{path}") | ||
| agent_names = ", ".join([agent.name for agent in self.agents]) | ||
| print(f"📊 Available agents ({len(self.agents)}): {agent_names}") | ||
|
|
||
| # Start the server if it's not already running for this port | ||
| if not _agents_server_started.get(port, False): | ||
| # Mark the server as started first to prevent duplicate starts | ||
| _agents_server_started[port] = True | ||
|
|
||
| # Start the server in a separate thread | ||
| def run_server(): | ||
| try: | ||
| print(f"✅ FastAPI server started at http://{host}:{port}") | ||
| print(f"📚 API documentation available at http://{host}:{port}/docs") | ||
| print(f"🔌 Available endpoints: {', '.join(list(_agents_registered_endpoints[port].keys()))}") | ||
| uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") | ||
| except Exception as e: | ||
| logging.error(f"Error starting server: {str(e)}", exc_info=True) | ||
| print(f"❌ Error starting server: {str(e)}") | ||
|
|
||
| # Run server in a background thread | ||
| server_thread = threading.Thread(target=run_server, daemon=True) | ||
| server_thread.start() | ||
|
|
||
| # Wait for a moment to allow the server to start and register endpoints | ||
| time.sleep(0.5) | ||
| else: | ||
| # If server is already running, wait a moment to make sure the endpoint is registered | ||
| time.sleep(0.1) | ||
| print(f"🔌 Available endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") | ||
|
|
||
| # Get the stack frame to check if this is the last launch() call in the script | ||
| import inspect | ||
| stack = inspect.stack() | ||
|
|
||
| # If this is called from a Python script (not interactive), try to detect if it's the last launch call | ||
| if len(stack) > 1 and stack[1].filename.endswith('.py'): | ||
| caller_frame = stack[1] | ||
| caller_line = caller_frame.lineno | ||
|
|
||
| try: | ||
| # Read the file to check if there are more launch calls after this one | ||
| with open(caller_frame.filename, 'r') as f: | ||
| lines = f.readlines() | ||
|
|
||
| # Check if there are more launch() calls after the current line | ||
| has_more_launches = False | ||
| for line in lines[caller_line:]: | ||
| if '.launch(' in line and not line.strip().startswith('#'): | ||
| has_more_launches = True | ||
| break | ||
|
|
||
| # If this is the last launch call, block the main thread | ||
| if not has_more_launches: | ||
| try: | ||
| print("\nAll agents registered. Press Ctrl+C to stop the servers.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
| except Exception as e: | ||
| # If something goes wrong with detection, block anyway to be safe | ||
| logging.error(f"Error in launch detection: {e}") | ||
| try: | ||
| print("\nKeeping servers alive. Press Ctrl+C to stop.") | ||
| while True: | ||
| time.sleep(1) | ||
| except KeyboardInterrupt: | ||
| print("\nServers stopped") | ||
|
Comment on lines
+1077
to
+1110
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for detecting the last # Provide a dedicated function or configuration option
print("\nAll agents registered. Press Ctrl+C to stop the servers.")
while True:
time.sleep(1)Style Guide ReferencesFootnotes
|
||
|
|
||
| return None | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more descriptive name for these global variables to improve readability and avoid potential naming conflicts. Also, consider using
typing.Dictfor type hinting to be more explicit about the types involved.1Style Guide References
Footnotes
Use descriptive names for variables to improve readability and maintainability. Use
typing.Dictfor explicit type hinting. ↩