Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/praisonai-agents/multi-agents-api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from praisonaiagents import Agent, Agents, Tools

research_agent = Agent(name="Research", instructions="You are a research agent to search internet about AI 2024", tools=[Tools.internet_search])
summarise_agent = Agent(name="Summarise", instructions="You are a summarize agent to summarise in points")
agents = Agents(agents=[research_agent, summarise_agent])
agents.launch(path="/agents", port=3030)
8 changes: 8 additions & 0 deletions src/praisonai-agents/multi-agents-group-api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from praisonaiagents import Agent, Agents, Tools

research_agent = Agent(name="Research", instructions="You are a research agent to search internet about AI 2024", tools=[Tools.internet_search])
summarise_agent = Agent(name="Summarise", instructions="You are a summarize agent to summarise in points")
agents = Agents(agents=[research_agent, summarise_agent])
agents2 = Agents(agents=[research_agent])
agents.launch(path="/agents", port=3030)
agents2.launch(path="/agents2", port=3030)
233 changes: 232 additions & 1 deletion src/praisonai-agents/praisonaiagents/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
# Set up logger
logger = logging.getLogger(__name__)

# Global variables for managing the shared servers
_agents_server_started = {} # Dict of port -> started boolean
_agents_registered_endpoints = {} # Dict of port -> Dict of path -> endpoint_id
_agents_shared_apps = {} # Dict of port -> FastAPI app
Comment on lines +21 to +23
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider using a more descriptive name for these global variables to improve readability and avoid potential naming conflicts. Also, consider using typing.Dict for type hinting to be more explicit about the types involved.1

Style Guide References

Suggested change
_agents_server_started = {} # Dict of port -> started boolean
_agents_registered_endpoints = {} # Dict of port -> Dict of path -> endpoint_id
_agents_shared_apps = {} # Dict of port -> FastAPI app
agent_servers_started: typing.Dict[int, bool] = {}
agent_registered_endpoints: typing.Dict[int, typing.Dict[str, str]] = {}
agent_shared_apps: typing.Dict[int, FastAPI] = {}

Footnotes

  1. Use descriptive names for variables to improve readability and maintainability. Use typing.Dict for explicit type hinting.


def encode_file_to_base64(file_path: str) -> str:
"""Base64-encode a file."""
import base64
Expand Down Expand Up @@ -878,4 +883,230 @@ def update_state(self, updates: Dict) -> None:

def clear_state(self) -> None:
"""Clear all state values"""
self._state.clear()
self._state.clear()

def launch(self, path: str = '/agents', port: int = 8000, host: str = '0.0.0.0', debug: bool = False):
"""
Launch all agents as a single API endpoint. The endpoint accepts a query and processes it through
all agents in sequence, with the output of each agent feeding into the next.

Args:
path: API endpoint path (default: '/agents')
port: Server port (default: 8000)
host: Server host (default: '0.0.0.0')
debug: Enable debug mode for uvicorn (default: False)

Returns:
None
"""
global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps

if not self.agents:
logging.warning("No agents to launch. Add agents to the Agents instance first.")
return

# Try to import FastAPI dependencies - lazy loading
try:
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import threading
import time

# Define the request model here since we need pydantic
class AgentQuery(BaseModel):
query: str

except ImportError as e:
# Check which specific module is missing
missing_module = str(e).split("No module named '")[-1].rstrip("'")
display_error(f"Missing dependency: {missing_module}. Required for launch() method.")
logging.error(f"Missing dependency: {missing_module}. Required for launch() method.")
print(f"\nTo add API capabilities, install the required dependencies:")
print(f"pip install {missing_module}")
print("\nOr install all API dependencies with:")
print("pip install 'praisonaiagents[api]'")
return None

# Initialize port-specific collections if needed
if port not in _agents_registered_endpoints:
_agents_registered_endpoints[port] = {}

# Initialize shared FastAPI app if not already created for this port
if _agents_shared_apps.get(port) is None:
_agents_shared_apps[port] = FastAPI(
title=f"PraisonAI Agents API (Port {port})",
description="API for interacting with multiple PraisonAI Agents"
)

# Add a root endpoint with a welcome message
@_agents_shared_apps[port].get("/")
async def root():
return {
"message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.",
"endpoints": list(_agents_registered_endpoints[port].keys())
}

# Add healthcheck endpoint
@_agents_shared_apps[port].get("/health")
async def healthcheck():
return {
"status": "ok",
"endpoints": list(_agents_registered_endpoints[port].keys())
}

# Normalize path to ensure it starts with /
if not path.startswith('/'):
path = f'/{path}'

# Check if path is already registered for this port
if path in _agents_registered_endpoints[port]:
logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.")
print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.")
# Use a modified path to avoid conflicts
original_path = path
instance_id = str(uuid.uuid4())[:6]
path = f"{path}_{instance_id}"
logging.warning(f"Using '{path}' instead of '{original_path}'")
print(f"🔄 Using '{path}' instead")
Comment on lines +963 to +972
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for handling path conflicts could be improved. Instead of simply appending a UUID, consider implementing a more user-friendly approach, such as suggesting available paths or providing a configuration option to specify a different path.1

# Suggest available paths or provide a configuration option
logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path or configure a unique path.")
print(f"⚠️ Warning: Path '{path}' is already registered on port {port}. Please use a different path or configure a unique path.")
return  # Stop further execution to avoid conflicts

Style Guide References

Footnotes

  1. Implement user-friendly conflict resolution strategies.


# Generate a unique ID for this agent group's endpoint
endpoint_id = str(uuid.uuid4())
_agents_registered_endpoints[port][path] = endpoint_id

# Define the endpoint handler
@_agents_shared_apps[port].post(path)
async def handle_query(request: Request, query_data: Optional[AgentQuery] = None):
# Handle both direct JSON with query field and form data
if query_data is None:
try:
request_data = await request.json()
if "query" not in request_data:
raise HTTPException(status_code=400, detail="Missing 'query' field in request")
query = request_data["query"]
except:
# Fallback to form data or query params
form_data = await request.form()
if "query" in form_data:
query = form_data["query"]
else:
raise HTTPException(status_code=400, detail="Missing 'query' field in request")
Comment on lines +988 to +994
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The except block is too broad. It should catch specific exceptions like json.JSONDecodeError and KeyError to handle different error scenarios more precisely. This will help in debugging and prevent masking unexpected errors.1

except json.JSONDecodeError:
    logging.error("Invalid JSON format in request", exc_info=True)
    raise HTTPException(status_code=400, detail="Invalid JSON format in request")
except KeyError:
    logging.error("Missing 'query' field in request", exc_info=True)
    raise HTTPException(status_code=400, detail="Missing 'query' field in request")

Style Guide References

Footnotes

  1. Catch specific exceptions to handle different error scenarios more precisely.

else:
query = query_data.query

try:
# Process the query sequentially through all agents
current_input = query
results = []

for agent in self.agents:
try:
# Use async version if available, otherwise use sync version
if asyncio.iscoroutinefunction(agent.chat):
response = await agent.achat(current_input)
else:
# Run sync function in a thread to avoid blocking
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, lambda: agent.chat(current_input))

# Store this agent's result
results.append({
"agent": agent.name,
"response": response
})

# Use this response as input to the next agent
current_input = response
except Exception as e:
logging.error(f"Error with agent {agent.name}: {str(e)}", exc_info=True)
results.append({
"agent": agent.name,
"error": str(e)
})
# Continue with original input if there's an error
Comment on lines +1021 to +1027
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of continuing with the original input when an agent encounters an error, consider adding a mechanism to halt the sequence or retry the agent with a modified input. This could prevent cascading errors and improve the overall reliability of the multi-agent system.1

except Exception as e:
    logging.error(f"Error with agent {agent.name}: {str(e)}", exc_info=True)
    results.append({
        "agent": agent.name,
        "error": str(e)
    })
    # Option to halt or retry instead of continuing
    break  # Example: Halt on error

Style Guide References

Footnotes

  1. Implement mechanisms to handle agent errors and prevent cascading failures.


# Return all results and the final output
return {
"query": query,
"results": results,
"final_response": current_input
}
except Exception as e:
logging.error(f"Error processing query: {str(e)}", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": f"Error processing query: {str(e)}"}
)

print(f"🚀 Multi-Agent API available at http://{host}:{port}{path}")
agent_names = ", ".join([agent.name for agent in self.agents])
print(f"📊 Available agents ({len(self.agents)}): {agent_names}")

# Start the server if it's not already running for this port
if not _agents_server_started.get(port, False):
# Mark the server as started first to prevent duplicate starts
_agents_server_started[port] = True

# Start the server in a separate thread
def run_server():
try:
print(f"✅ FastAPI server started at http://{host}:{port}")
print(f"📚 API documentation available at http://{host}:{port}/docs")
print(f"🔌 Available endpoints: {', '.join(list(_agents_registered_endpoints[port].keys()))}")
uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info")
except Exception as e:
logging.error(f"Error starting server: {str(e)}", exc_info=True)
print(f"❌ Error starting server: {str(e)}")

# Run server in a background thread
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()

# Wait for a moment to allow the server to start and register endpoints
time.sleep(0.5)
else:
# If server is already running, wait a moment to make sure the endpoint is registered
time.sleep(0.1)
print(f"🔌 Available endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}")

# Get the stack frame to check if this is the last launch() call in the script
import inspect
stack = inspect.stack()

# If this is called from a Python script (not interactive), try to detect if it's the last launch call
if len(stack) > 1 and stack[1].filename.endswith('.py'):
caller_frame = stack[1]
caller_line = caller_frame.lineno

try:
# Read the file to check if there are more launch calls after this one
with open(caller_frame.filename, 'r') as f:
lines = f.readlines()

# Check if there are more launch() calls after the current line
has_more_launches = False
for line in lines[caller_line:]:
if '.launch(' in line and not line.strip().startswith('#'):
has_more_launches = True
break

# If this is the last launch call, block the main thread
if not has_more_launches:
try:
print("\nAll agents registered. Press Ctrl+C to stop the servers.")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nServers stopped")
except Exception as e:
# If something goes wrong with detection, block anyway to be safe
logging.error(f"Error in launch detection: {e}")
try:
print("\nKeeping servers alive. Press Ctrl+C to stop.")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nServers stopped")
Comment on lines +1077 to +1110
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for detecting the last launch() call is complex and might not be reliable in all scenarios. Consider providing a more explicit way for the user to signal that all agents have been registered, such as a dedicated function call or a configuration option. This would make the code more robust and easier to understand.1

# Provide a dedicated function or configuration option
print("\nAll agents registered. Press Ctrl+C to stop the servers.")
while True:
    time.sleep(1)

Style Guide References

Footnotes

  1. Provide explicit mechanisms for signaling the completion of agent registration.


return None
2 changes: 1 addition & 1 deletion src/praisonai-agents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "praisonaiagents"
version = "0.0.79"
version = "0.0.80"
description = "Praison AI agents for completing complex tasks with Self Reflection Agents"
authors = [
{ name="Mervin Praison" }
Expand Down
2 changes: 1 addition & 1 deletion src/praisonai-agents/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading