diff --git a/examples/python/n8n_integration_example.py b/examples/python/n8n_integration_example.py new file mode 100644 index 000000000..f1a45c2fa --- /dev/null +++ b/examples/python/n8n_integration_example.py @@ -0,0 +1,337 @@ +"""Complete n8n Integration Example for PraisonAI + +This example demonstrates the complete bidirectional n8n ↔ PraisonAI integration: + +1. PraisonAI → n8n: Agents executing n8n workflows (using PraisonAI-Tools) +2. n8n → PraisonAI: n8n workflows invoking PraisonAI agents (using API endpoint) + +Prerequisites: +1. Install dependencies: + pip install "praisonai-tools[n8n]" fastapi uvicorn + +2. Set up n8n instance: + docker run -it --rm --name n8n -p 5678:5678 -v n8n_data:/home/node/.n8n docker.n8n.io/n8nio/n8n + +3. Environment variables: + export N8N_URL="http://localhost:5678" + export N8N_API_KEY="your-api-key" # optional for local testing + +Usage: + python examples/python/n8n_integration_example.py +""" + +import os +import asyncio +import uvicorn +from fastapi import FastAPI +from praisonaiagents import Agent + + +def setup_praisonai_agents(): + """Set up PraisonAI agents with n8n tools.""" + print("šŸš€ Setting up PraisonAI agents with n8n capabilities...") + + try: + from praisonai_tools.n8n import n8n_workflow, n8n_list_workflows + + # Create automation agent with n8n tools + automation_agent = Agent( + name="automation-agent", + instructions=""" + You are an automation specialist with access to n8n's 400+ integrations. + You can help users automate workflows across platforms like: + + - Communication: Slack, Discord, Telegram, Gmail, Teams + - Productivity: Notion, Google Sheets, Airtable, Trello + - Databases: PostgreSQL, MongoDB, MySQL, Redis + - APIs: REST, GraphQL, webhooks + + When asked to perform automation tasks: + 1. First list available n8n workflows if needed + 2. Execute the appropriate workflow with the provided data + 3. Explain what the workflow accomplished + + Always be helpful and provide clear explanations. + """, + tools=[n8n_workflow, n8n_list_workflows], + llm="gpt-4o-mini" + ) + + # Create notification agent focused on messaging + notification_agent = Agent( + name="notification-agent", + instructions=""" + You specialize in sending notifications and messages across platforms. + You have access to n8n workflows for various messaging platforms. + + When asked to send notifications: + 1. Determine the best platform for the message + 2. Use appropriate n8n workflow for that platform + 3. Include relevant context and formatting + + Be efficient and reliable with notifications. + """, + tools=[n8n_workflow], + llm="gpt-4o-mini" + ) + + return { + "automation": automation_agent, + "notification": notification_agent + } + + except ImportError as e: + print(f"āŒ Error importing n8n tools: {e}") + print("šŸ’” Install with: pip install 'praisonai-tools[n8n]'") + return {} + + +def setup_api_server_with_agents(agents): + """Set up FastAPI server with agent invoke endpoints.""" + print("🌐 Setting up API server for n8n → PraisonAI integration...") + + from praisonai.api.agent_invoke import register_agent, router + + # Create FastAPI app + app = FastAPI( + title="PraisonAI n8n Integration Server", + description="API server for n8n to invoke PraisonAI agents", + version="1.0.0" + ) + + # Include agent invoke router + app.include_router(router) + + # Register agents + for agent_id, agent in agents.items(): + register_agent(agent_id, agent) + print(f"šŸ“‹ Registered agent: {agent_id}") + + @app.get("/") + async def root(): + """Root endpoint with integration information.""" + return { + "message": "PraisonAI n8n Integration Server", + "available_agents": list(agents.keys()), + "endpoints": { + "invoke_agent": "/api/v1/agents/{agent_id}/invoke", + "list_agents": "/api/v1/agents", + "agent_info": "/api/v1/agents/{agent_id}" + }, + "n8n_integration": { + "description": "Use HTTP Request node in n8n to invoke agents", + "example_url": "http://localhost:8000/api/v1/agents/automation/invoke", + "example_body": { + "message": "Send a Slack message to #general saying 'Hello from n8n!'", + "session_id": "n8n-workflow-123" + } + } + } + + return app + + +async def demo_praisonai_to_n8n(agents): + """Demonstrate PraisonAI agents calling n8n workflows.""" + print("\nšŸ”„ Demo: PraisonAI → n8n Integration") + print("=" * 50) + + if not agents: + print("āŒ No agents available for demo") + return + + automation_agent = agents.get("automation") + if not automation_agent: + print("āŒ Automation agent not available") + return + + try: + print("šŸ“‹ Testing agent with n8n workflow listing...") + response = automation_agent.start( + "Can you list the available n8n workflows?" + ) + print(f"šŸ¤– Agent: {response}") + + print("\nšŸš€ Testing workflow execution...") + response = automation_agent.start( + "I need to send a notification that our deployment was successful. " + "Can you help me send this via Slack to the #deployments channel?" + ) + print(f"šŸ¤– Agent: {response}") + + except Exception as e: + print(f"āŒ Demo error: {e}") + + +async def demo_n8n_to_praisonai(): + """Demonstrate n8n invoking PraisonAI agents via HTTP.""" + print("\nšŸ”„ Demo: n8n → PraisonAI Integration") + print("=" * 50) + + try: + import httpx + + # Simulate n8n HTTP Request node call + print("šŸ“” Simulating n8n HTTP Request node call...") + + async with httpx.AsyncClient() as client: + response = await client.post( + "http://localhost:8000/api/v1/agents/notification/invoke", + json={ + "message": "A new user has signed up: john@example.com. Please send a welcome email.", + "session_id": "n8n-user-signup-workflow" + }, + timeout=30.0 + ) + + if response.status_code == 200: + data = response.json() + print(f"āœ… Success: {data['result']}") + print(f"šŸ“Š Session: {data['session_id']}") + else: + print(f"āŒ Error: {response.status_code} - {response.text}") + + except httpx.ConnectError: + print("āŒ Connection error: Make sure the API server is running") + print("šŸ’” Start server in another terminal: python examples/python/n8n_integration_example.py --server") + except Exception as e: + print(f"āŒ Demo error: {e}") + + +def create_n8n_workflow_examples(): + """Show examples of n8n workflow configurations.""" + print("\nšŸ“ n8n Workflow Configuration Examples") + print("=" * 50) + + print(""" +šŸ”¹ HTTP Request Node (n8n → PraisonAI): + Method: POST + URL: http://localhost:8000/api/v1/agents/automation/invoke + Body: + { + "message": "{{ $json.user_message }}", + "session_id": "{{ $json.session_id || workflow.id }}" + } + +šŸ”¹ Webhook Trigger for PraisonAI response: + URL: http://localhost:8000/webhook/praisonai-response + Method: POST + +šŸ”¹ Slack notification after agent response: + Channel: {{ $json.channel || '#general' }} + Message: Agent response: {{ $json.agent_result }} +""") + + +async def run_interactive_demo(): + """Run interactive demo of the integration.""" + print("\nšŸŽ® Interactive Demo") + print("=" * 20) + + # Set up agents + agents = setup_praisonai_agents() + if not agents: + print("āŒ Cannot run interactive demo without agents") + return + + automation_agent = agents.get("automation") + if not automation_agent: + print("āŒ Automation agent not available") + return + + print("šŸ’¬ Chat with the automation agent (type 'quit' to exit)") + print("šŸ’” Try: 'List available n8n workflows'") + print("šŸ’” Try: 'Send a Slack message to #general'") + + while True: + try: + user_input = input("\nšŸ‘¤ You: ").strip() + if user_input.lower() in ['quit', 'exit', 'q']: + break + + if not user_input: + continue + + print("šŸ¤– Agent: ", end="", flush=True) + response = automation_agent.start(user_input) + print(response) + + except KeyboardInterrupt: + print("\nšŸ‘‹ Goodbye!") + break + except Exception as e: + print(f"\nāŒ Error: {e}") + + +def run_server(port=8000): + """Run the API server for n8n integration.""" + print(f"\nšŸš€ Starting PraisonAI n8n Integration Server on port {port}...") + + # Set up agents + agents = setup_praisonai_agents() + + # Create app + app = setup_api_server_with_agents(agents) + + print(f"āœ… Server ready at http://localhost:{port}") + print(f"šŸ“Š Available agents: {list(agents.keys())}") + print("\nšŸ”— n8n HTTP Request Node Configuration:") + print(f" URL: http://localhost:{port}/api/v1/agents/{{agent_id}}/invoke") + print(" Method: POST") + print(' Body: {"message": "Your message here", "session_id": "optional"}') + + # Run server + uvicorn.run(app, host="0.0.0.0", port=port) + + +async def main(): + """Main function to run demos.""" + import argparse + + parser = argparse.ArgumentParser(description="PraisonAI n8n Integration Demo") + parser.add_argument("--server", action="store_true", help="Run API server") + parser.add_argument("--demo", action="store_true", help="Run demos") + parser.add_argument("--interactive", action="store_true", help="Run interactive demo") + parser.add_argument("--port", type=int, default=8000, help="Server port") + + args = parser.parse_args() + + if args.server: + run_server(args.port) + elif args.interactive: + await run_interactive_demo() + elif args.demo: + # Set up agents for demos + agents = setup_praisonai_agents() + + # Run demos + await demo_praisonai_to_n8n(agents) + create_n8n_workflow_examples() + print("\nšŸ’” To test n8n → PraisonAI, start the server:") + print(" python examples/python/n8n_integration_example.py --server") + print(" Then run: python examples/python/n8n_integration_example.py --test-api") + + else: + # Default: show info and run basic demo + print("šŸ”— PraisonAI ↔ n8n Bidirectional Integration") + print("=" * 50) + + print("\nšŸ“‹ Integration Components:") + print("1. šŸ”§ PraisonAI-Tools: n8n workflow execution tools") + print("2. 🌐 API Endpoint: Agent invoke endpoint for n8n") + print("3. šŸ¤– Agents: PraisonAI agents with n8n capabilities") + + print("\nšŸš€ Available Commands:") + print(" --demo Run integration demos") + print(" --server Start API server for n8n") + print(" --interactive Run interactive agent chat") + + # Quick demo + agents = setup_praisonai_agents() + if agents: + await demo_praisonai_to_n8n(agents) + create_n8n_workflow_examples() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/praisonai/praisonai/api/agent_invoke.py b/src/praisonai/praisonai/api/agent_invoke.py new file mode 100644 index 000000000..4dd211e46 --- /dev/null +++ b/src/praisonai/praisonai/api/agent_invoke.py @@ -0,0 +1,427 @@ +""" +Agent Invoke API for n8n Integration + +Provides an endpoint for n8n workflows to invoke PraisonAI agents, +enabling bidirectional n8n ↔ PraisonAI integration. +""" + +from typing import Any, Dict, Optional, Union +import inspect +import logging + +try: + from fastapi import APIRouter, HTTPException, Depends, Header, Request + from pydantic import BaseModel, Field + FASTAPI_AVAILABLE = True +except ImportError: + # Fallback for environments without FastAPI + APIRouter = None + HTTPException = None + BaseModel = object + Field = lambda *args, **kwargs: None + Depends = lambda x: x + Header = lambda *args, **kwargs: None + Request = object + FASTAPI_AVAILABLE = False + +logger = logging.getLogger(__name__) + +# Authentication +import os +CALL_SERVER_TOKEN = os.getenv('CALL_SERVER_TOKEN') + +async def verify_token( + request: Request, + authorization: Optional[str] = Header(None) +) -> None: + """Verify API token for authentication.""" + if not FASTAPI_AVAILABLE or not CALL_SERVER_TOKEN: + return # No authentication if FastAPI unavailable or no token set + + token = None + + # Check Authorization header first (Bearer or Basic) + if authorization: + if authorization.startswith("Bearer "): + token = authorization.split(" ")[1] + elif authorization.startswith("Basic "): + try: + import base64 + decoded = base64.b64decode(authorization[6:]).decode("utf-8") + if ":" in decoded: + token = decoded.split(":", 1)[1] # Use password as token + else: + token = decoded + except Exception: + pass + + # Check query param as fallback + if not token: + token = request.query_params.get("token") + + if token != CALL_SERVER_TOKEN: + raise HTTPException(status_code=401, detail="Unauthorized") + +# Request/Response Models +if FASTAPI_AVAILABLE: + class AgentInvokeRequest(BaseModel): + """Request model for agent invocation.""" + message: str = Field(..., description="Message to send to the agent") + session_id: Optional[str] = Field(None, description="Optional session ID for conversation continuity") + agent_config: Optional[Dict[str, Any]] = Field(None, description="Optional agent configuration overrides") + + class AgentInvokeResponse(BaseModel): + """Response model for agent invocation.""" + result: str = Field(..., description="Agent response") + session_id: str = Field(..., description="Session ID used for this conversation") + status: str = Field(default="success", description="Response status") + metadata: Optional[Dict[str, Any]] = Field(None, description="Optional response metadata") + + class ErrorResponse(BaseModel): + """Error response model.""" + error: str = Field(..., description="Error message") + status: str = Field(default="error", description="Error status") + code: Optional[str] = Field(None, description="Error code") +else: + # Simple dict-based fallbacks + class AgentInvokeRequest: + def __init__(self, message: str, session_id: Optional[str] = None, agent_config: Optional[Dict[str, Any]] = None): + self.message = message + self.session_id = session_id + self.agent_config = agent_config + + class AgentInvokeResponse: + def __init__(self, result: str, session_id: str, status: str = "success", metadata: Optional[Dict[str, Any]] = None): + self.result = result + self.session_id = session_id + self.status = status + self.metadata = metadata + + class ErrorResponse: + def __init__(self, error: str, status: str = "error", code: Optional[str] = None): + self.error = error + self.status = status + self.code = code + + +# Agent Registry +_agent_registry: Dict[str, Any] = {} + + +def register_agent(agent_id: str, agent: Any) -> None: + """Register an agent for invocation via API.""" + _agent_registry[agent_id] = agent + logger.debug(f"Registered agent: {agent_id}") + + +def unregister_agent(agent_id: str) -> bool: + """Unregister an agent.""" + if agent_id in _agent_registry: + del _agent_registry[agent_id] + logger.debug(f"Unregistered agent: {agent_id}") + return True + return False + + +def get_agent(agent_id: str) -> Optional[Any]: + """Get a registered agent by ID.""" + return _agent_registry.get(agent_id) + + +def list_registered_agents() -> list: + """List all registered agent IDs.""" + return list(_agent_registry.keys()) + + +def _supports_async_start(agent: Any) -> bool: + """Return True if agent.astart is a coroutine function.""" + astart = getattr(agent, "astart", None) + return inspect.iscoroutinefunction(astart) + + +def _supports_sync_start(agent: Any) -> bool: + """Return True when agent exposes a callable sync start method.""" + start = getattr(agent, "start", None) + return callable(start) + + +# FastAPI Router (if FastAPI is available) +if FASTAPI_AVAILABLE and APIRouter is not None: + router = APIRouter(prefix="/api/v1", tags=["agents"]) + + @router.post("/agents/{agent_id}/invoke") + async def invoke_agent( + agent_id: str, + request: AgentInvokeRequest, + _: None = Depends(verify_token) + ) -> Union[AgentInvokeResponse, ErrorResponse]: + """ + Invoke a PraisonAI agent with a message. + + This endpoint is designed for n8n workflows to call PraisonAI agents, + enabling bidirectional integration between n8n and PraisonAI. + + Args: + agent_id: The ID of the agent to invoke + request: The invocation request containing message and optional session_id + + Returns: + Agent response with result and session information + + Raises: + HTTPException: If agent not found or execution fails + + Example n8n HTTP Request Node configuration: + ```json + { + "method": "POST", + "url": "http://praisonai:8000/api/v1/agents/my-agent/invoke", + "body": { + "message": "{{ $json.user_input }}", + "session_id": "{{ $json.session_id }}" + } + } + ``` + """ + # Get agent from registry + agent = get_agent(agent_id) + if not agent: + logger.error(f"Agent not found: {agent_id}") + raise HTTPException( + status_code=404, + detail=f"Agent '{agent_id}' not found" + ) + + try: + # Set session ID if provided + session_id = request.session_id or "default" + + # Apply agent config overrides if provided + if request.agent_config: + # This would depend on the specific agent implementation + # For now, we'll just log it + logger.debug(f"Agent config overrides provided: {request.agent_config}") + + # Invoke agent (handle both sync and async agents) + if _supports_async_start(agent): + # Async agent + result = await agent.astart(request.message) + elif _supports_sync_start(agent): + # Sync agent - run in thread pool to avoid blocking the event loop + import asyncio + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(None, agent.start, request.message) + else: + raise AttributeError(f"Agent {agent_id} must provide start() or async astart()") + + logger.info(f"Agent {agent_id} invoked successfully") + + return AgentInvokeResponse( + result=str(result), + session_id=session_id, + status="success", + metadata={ + "agent_id": agent_id, + "message_length": len(request.message), + "response_length": len(str(result)) + } + ) + + except Exception as e: + logger.error(f"Agent {agent_id} invocation failed: {e}") + raise HTTPException( + status_code=500, + detail=f"Agent execution failed: {str(e)}" + ) + + @router.get("/agents") + async def list_agents(_: None = Depends(verify_token)) -> Dict[str, Any]: + """ + List all registered agents. + + Returns: + Dictionary containing list of available agents + """ + agents = list_registered_agents() + return { + "agents": agents, + "count": len(agents), + "status": "success" + } + + @router.post("/agents/{agent_id}/register") + async def register_agent_endpoint(agent_id: str, _: None = Depends(verify_token)) -> Dict[str, Any]: + """ + Register an agent for API access. + + Note: This is a placeholder endpoint. In practice, agents would be + registered programmatically when they are created. + """ + # In a real implementation, this might load an agent from a configuration + # or database. For now, return information about registration. + return { + "message": f"Agent registration endpoint for '{agent_id}'", + "note": "Agents are typically registered programmatically", + "status": "info" + } + + @router.delete("/agents/{agent_id}") + async def unregister_agent_endpoint(agent_id: str, _: None = Depends(verify_token)) -> Dict[str, Any]: + """ + Unregister an agent from API access. + """ + success = unregister_agent(agent_id) + if success: + return { + "message": f"Agent '{agent_id}' unregistered successfully", + "status": "success" + } + else: + raise HTTPException( + status_code=404, + detail=f"Agent '{agent_id}' not found" + ) + + @router.get("/agents/{agent_id}") + async def get_agent_info(agent_id: str, _: None = Depends(verify_token)) -> Dict[str, Any]: + """ + Get information about a registered agent. + """ + agent = get_agent(agent_id) + if not agent: + raise HTTPException( + status_code=404, + detail=f"Agent '{agent_id}' not found" + ) + + # Extract basic agent information + info = { + "agent_id": agent_id, + "status": "registered", + "type": type(agent).__name__, + } + + # Add agent-specific information if available + if hasattr(agent, 'name'): + info["name"] = agent.name + if hasattr(agent, 'instructions'): + info["instructions"] = agent.instructions[:200] + "..." if len(agent.instructions) > 200 else agent.instructions + if hasattr(agent, 'tools') and agent.tools: + info["tools"] = [getattr(tool, 'name', str(tool)) for tool in agent.tools[:5]] # First 5 tools + if len(agent.tools) > 5: + info["tools"].append(f"... and {len(agent.tools) - 5} more") + + return info + + +# Standalone function for non-FastAPI environments +async def invoke_agent_standalone( + agent_id: str, + message: str, + session_id: Optional[str] = None, + agent_config: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + """ + Standalone function to invoke an agent without FastAPI. + + This can be used in environments where FastAPI is not available + or when integrating with other web frameworks. + """ + agent = get_agent(agent_id) + if not agent: + return { + "error": f"Agent '{agent_id}' not found", + "status": "error", + "available_agents": list_registered_agents() + } + + try: + # Apply config if provided + if agent_config: + logger.debug(f"Agent config provided: {agent_config}") + + # Invoke agent + session_id = session_id or "default" + + if _supports_async_start(agent): + result = await agent.astart(message) + elif _supports_sync_start(agent): + # Sync agent - run in thread pool to avoid blocking the event loop + import asyncio + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(None, agent.start, message) + else: + raise AttributeError(f"Agent {agent_id} must provide start() or async astart()") + + return { + "result": str(result), + "session_id": session_id, + "status": "success", + "agent_id": agent_id + } + + except Exception as e: + logger.error(f"Agent {agent_id} invocation failed: {e}") + return { + "error": str(e), + "status": "error", + "agent_id": agent_id + } + + +# Example usage and helper functions +def create_example_agents(): + """Create some example agents for testing.""" + try: + from praisonaiagents import Agent + + # Create a simple assistant agent + assistant = Agent( + name="assistant", + instructions="You are a helpful assistant that provides concise, helpful responses." + ) + register_agent("assistant", assistant) + + # Create a coding agent + coder = Agent( + name="coder", + instructions="You are a coding assistant that helps with programming questions and code review." + ) + register_agent("coder", coder) + + logger.info("Example agents created and registered") + + except ImportError: + logger.warning("praisonaiagents not available, skipping example agents") + + +# Auto-registration helper +def auto_register_agents_from_config(config_file: Optional[str] = None): + """ + Auto-register agents from a configuration file. + + This is a placeholder for a more complete implementation that + could load agents from YAML, JSON, or other configuration formats. + """ + logger.info(f"Auto-registering agents from config: {config_file or 'default'}") + # In a real implementation, this would parse the config and create agents + create_example_agents() + + +if __name__ == "__main__": + # For testing without FastAPI + import asyncio + + async def test_standalone(): + """Test the standalone functionality.""" + create_example_agents() + + # Test invocation + result = await invoke_agent_standalone( + agent_id="assistant", + message="Hello, can you help me with n8n integration?" + ) + print(f"Agent response: {result}") + + # asyncio.run(test_standalone()) + print("Agent Invoke API module loaded successfully") diff --git a/src/praisonai/praisonai/api/call.py b/src/praisonai/praisonai/api/call.py index 691b6ef4c..6323b5849 100644 --- a/src/praisonai/praisonai/api/call.py +++ b/src/praisonai/praisonai/api/call.py @@ -53,7 +53,15 @@ # Set up logging logger = logging.getLogger(__name__) log_level = os.getenv("LOGLEVEL", "INFO").upper() -logger.handlers = [] +logger.handlers.clear() + +# Include agent invoke router for n8n integration +try: + from .agent_invoke import router as agent_invoke_router + app.include_router(agent_invoke_router) + logger.debug("Agent invoke router added for n8n integration") +except ImportError as e: + logger.warning(f"Could not load agent invoke router: {e}") # Try to import tools from the root directory tools = [] diff --git a/src/praisonai/tests/test_n8n_agent_invoke.py b/src/praisonai/tests/test_n8n_agent_invoke.py new file mode 100644 index 000000000..dd9ba51e8 --- /dev/null +++ b/src/praisonai/tests/test_n8n_agent_invoke.py @@ -0,0 +1,366 @@ +"""Unit tests for n8n agent invoke API endpoint.""" + +import pytest +import json +from unittest.mock import Mock, patch, AsyncMock + + +class TestAgentInvokeAPI: + """Test the agent invoke API endpoint for n8n integration.""" + + def test_import_agent_invoke(self): + """Test that agent invoke module can be imported.""" + try: + from praisonai.api.agent_invoke import ( + register_agent, + get_agent, + unregister_agent, + list_registered_agents, + invoke_agent_standalone + ) + assert callable(register_agent) + assert callable(get_agent) + assert callable(unregister_agent) + assert callable(list_registered_agents) + assert callable(invoke_agent_standalone) + except ImportError as e: + pytest.fail(f"Failed to import agent invoke module: {e}") + + def test_agent_registry_operations(self): + """Test agent registration and retrieval.""" + from praisonai.api.agent_invoke import ( + register_agent, + get_agent, + unregister_agent, + list_registered_agents + ) + + # Create mock agent + mock_agent = Mock() + mock_agent.name = "test-agent" + mock_agent.start.return_value = "Test response" + + # Test registration + register_agent("test-agent", mock_agent) + assert get_agent("test-agent") == mock_agent + assert "test-agent" in list_registered_agents() + + # Test retrieval + retrieved_agent = get_agent("test-agent") + assert retrieved_agent == mock_agent + + # Test unregistration + success = unregister_agent("test-agent") + assert success is True + assert get_agent("test-agent") is None + assert "test-agent" not in list_registered_agents() + + # Test unregistering non-existent agent + success = unregister_agent("non-existent") + assert success is False + + @pytest.mark.asyncio + async def test_invoke_agent_standalone_sync(self): + """Test standalone agent invocation with sync agent.""" + from praisonai.api.agent_invoke import ( + register_agent, + invoke_agent_standalone, + unregister_agent + ) + + # Create mock sync agent + mock_agent = Mock() + mock_agent.start.return_value = "Hello from sync agent!" + + # Register agent + register_agent("sync-agent", mock_agent) + + try: + # Test invocation + result = await invoke_agent_standalone( + agent_id="sync-agent", + message="Hello agent", + session_id="test-session" + ) + + assert result["status"] == "success" + assert result["result"] == "Hello from sync agent!" + assert result["session_id"] == "test-session" + assert result["agent_id"] == "sync-agent" + + # Verify agent was called correctly + mock_agent.start.assert_called_once_with("Hello agent") + + finally: + unregister_agent("sync-agent") + + @pytest.mark.asyncio + async def test_invoke_agent_standalone_async(self): + """Test standalone agent invocation with async agent.""" + from praisonai.api.agent_invoke import ( + register_agent, + invoke_agent_standalone, + unregister_agent + ) + + # Create mock async agent + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="Hello from async agent!") + + # Register agent + register_agent("async-agent", mock_agent) + + try: + # Test invocation + result = await invoke_agent_standalone( + agent_id="async-agent", + message="Hello async agent", + session_id="async-session" + ) + + assert result["status"] == "success" + assert result["result"] == "Hello from async agent!" + assert result["session_id"] == "async-session" + assert result["agent_id"] == "async-agent" + + # Verify agent was called correctly + mock_agent.astart.assert_called_once_with("Hello async agent") + + finally: + unregister_agent("async-agent") + + @pytest.mark.asyncio + async def test_invoke_agent_standalone_not_found(self): + """Test standalone invocation with non-existent agent.""" + from praisonai.api.agent_invoke import invoke_agent_standalone + + result = await invoke_agent_standalone( + agent_id="non-existent", + message="Hello" + ) + + assert result["status"] == "error" + assert "not found" in result["error"] + assert "available_agents" in result + + @pytest.mark.asyncio + async def test_invoke_agent_standalone_error(self): + """Test standalone invocation with agent that raises an exception.""" + from praisonai.api.agent_invoke import ( + register_agent, + invoke_agent_standalone, + unregister_agent + ) + + # Create mock agent that raises an exception + mock_agent = Mock() + mock_agent.start.side_effect = Exception("Agent error") + + # Register agent + register_agent("error-agent", mock_agent) + + try: + # Test invocation + result = await invoke_agent_standalone( + agent_id="error-agent", + message="Hello" + ) + + assert result["status"] == "error" + assert "Agent error" in result["error"] + assert result["agent_id"] == "error-agent" + + finally: + unregister_agent("error-agent") + + +@pytest.mark.skipif(not pytest.importorskip("fastapi", minversion="0.68.0"), reason="FastAPI not available") +class TestAgentInvokeFastAPI: + """Test the FastAPI agent invoke endpoints.""" + + @pytest.fixture + def client(self): + """Create a test client for the agent invoke API.""" + try: + from fastapi.testclient import TestClient + from praisonai.api.agent_invoke import router + from fastapi import FastAPI + + app = FastAPI() + app.include_router(router) + return TestClient(app) + except (ImportError, RuntimeError): + pytest.skip("FastAPI test dependencies not available") + + def test_list_agents_endpoint(self, client): + """Test the list agents endpoint.""" + from praisonai.api.agent_invoke import register_agent, unregister_agent + + # Register test agent + mock_agent = Mock() + register_agent("test-agent", mock_agent) + + try: + # Test endpoint + response = client.get("/api/v1/agents") + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "success" + assert "test-agent" in data["agents"] + assert data["count"] >= 1 + + finally: + unregister_agent("test-agent") + + def test_get_agent_info_endpoint(self, client): + """Test the get agent info endpoint.""" + from praisonai.api.agent_invoke import register_agent, unregister_agent + + # Create mock agent with attributes + mock_agent = Mock() + mock_agent.name = "Test Agent" + mock_agent.instructions = "You are a test agent for unit testing." + mock_agent.tools = [Mock(), Mock()] # Two mock tools + + register_agent("info-agent", mock_agent) + + try: + # Test endpoint + response = client.get("/api/v1/agents/info-agent") + assert response.status_code == 200 + + data = response.json() + assert data["agent_id"] == "info-agent" + assert data["name"] == "Test Agent" + assert data["status"] == "registered" + assert "instructions" in data + assert "tools" in data + + finally: + unregister_agent("info-agent") + + def test_get_agent_info_not_found(self, client): + """Test get agent info for non-existent agent.""" + response = client.get("/api/v1/agents/non-existent") + assert response.status_code == 404 + + data = response.json() + assert "not found" in data["detail"] + + def test_invoke_agent_endpoint_sync(self, client): + """Test the invoke agent endpoint with sync agent.""" + from praisonai.api.agent_invoke import register_agent, unregister_agent + + # Create mock sync agent + mock_agent = Mock() + mock_agent.start.return_value = "Hello from endpoint!" + + register_agent("endpoint-agent", mock_agent) + + try: + # Test endpoint + response = client.post( + "/api/v1/agents/endpoint-agent/invoke", + json={ + "message": "Hello endpoint", + "session_id": "endpoint-session" + } + ) + + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "success" + assert data["result"] == "Hello from endpoint!" + assert data["session_id"] == "endpoint-session" + assert "metadata" in data + + # Verify agent was called + mock_agent.start.assert_called_once_with("Hello endpoint") + + finally: + unregister_agent("endpoint-agent") + + def test_invoke_agent_endpoint_not_found(self, client): + """Test invoke endpoint with non-existent agent.""" + response = client.post( + "/api/v1/agents/non-existent/invoke", + json={"message": "Hello"} + ) + + assert response.status_code == 404 + data = response.json() + assert "not found" in data["detail"] + + def test_invoke_agent_endpoint_missing_message(self, client): + """Test invoke endpoint with missing message.""" + from praisonai.api.agent_invoke import register_agent, unregister_agent + + mock_agent = Mock() + register_agent("message-test", mock_agent) + + try: + # Test with missing message field + response = client.post( + "/api/v1/agents/message-test/invoke", + json={} + ) + + assert response.status_code == 422 # Validation error + + finally: + unregister_agent("message-test") + + def test_unregister_agent_endpoint(self, client): + """Test the unregister agent endpoint.""" + from praisonai.api.agent_invoke import register_agent + + # Register agent + mock_agent = Mock() + register_agent("unregister-test", mock_agent) + + # Test unregistration + response = client.delete("/api/v1/agents/unregister-test") + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "success" + assert "unregistered successfully" in data["message"] + + # Test unregistering again (should fail) + response = client.delete("/api/v1/agents/unregister-test") + assert response.status_code == 404 + + +def test_agent_invoke_smoke_test(): + """Smoke test to verify agent invoke module can be imported and used.""" + try: + from praisonai.api.agent_invoke import ( + register_agent, + get_agent, + list_registered_agents, + AgentInvokeRequest, + AgentInvokeResponse, + ErrorResponse + ) + + # Test that classes can be instantiated + if AgentInvokeRequest is not object: # Only if Pydantic is available + request = AgentInvokeRequest(message="test") + assert request.message == "test" + assert request.session_id is None + + # Test basic registry functions + assert callable(register_agent) + assert callable(get_agent) + assert isinstance(list_registered_agents(), list) + + except Exception as e: + pytest.fail(f"Agent invoke smoke test failed: {e}") + + +if __name__ == "__main__": + # Run smoke test when executed directly + test_agent_invoke_smoke_test() + print("āœ… Agent invoke API smoke test passed")