diff --git a/docker/Dockerfile b/docker/Dockerfile index 4c3b38c3b..a8be18d1c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.11-slim WORKDIR /app COPY . . -RUN pip install flask praisonai==2.1.5 gunicorn markdown +RUN pip install flask praisonai==2.1.6 gunicorn markdown EXPOSE 8080 CMD ["gunicorn", "-b", "0.0.0.0:8080", "api:app"] diff --git a/docker/Dockerfile.chat b/docker/Dockerfile.chat index e4f020605..828f42bfe 100644 --- a/docker/Dockerfile.chat +++ b/docker/Dockerfile.chat @@ -13,7 +13,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN pip install --no-cache-dir \ praisonaiagents>=0.0.4 \ praisonai_tools \ - "praisonai==2.1.5" \ + "praisonai==2.1.6" \ "praisonai[chat]" \ "embedchain[github,youtube]" diff --git a/docker/Dockerfile.dev b/docker/Dockerfile.dev index e10a3853e..d585f42a0 100644 --- a/docker/Dockerfile.dev +++ b/docker/Dockerfile.dev @@ -15,7 +15,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN pip install --no-cache-dir \ praisonaiagents>=0.0.4 \ praisonai_tools \ - "praisonai==2.1.5" \ + "praisonai==2.1.6" \ "praisonai[ui]" \ "praisonai[chat]" \ "praisonai[realtime]" \ diff --git a/docker/Dockerfile.ui b/docker/Dockerfile.ui index ad5ed48b7..6a03607bc 100644 --- a/docker/Dockerfile.ui +++ b/docker/Dockerfile.ui @@ -13,7 +13,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN pip install --no-cache-dir \ praisonaiagents>=0.0.4 \ praisonai_tools \ - "praisonai==2.1.5" \ + "praisonai==2.1.6" \ "praisonai[ui]" \ "praisonai[crewai]" diff --git a/docs/api/praisonai/deploy.html b/docs/api/praisonai/deploy.html index 2d5e7d1e7..681017b80 100644 --- a/docs/api/praisonai/deploy.html +++ b/docs/api/praisonai/deploy.html @@ -110,7 +110,7 @@

Raises

file.write("FROM python:3.11-slim\n") file.write("WORKDIR /app\n") file.write("COPY . .\n") - file.write("RUN pip install flask praisonai==2.1.5 gunicorn markdown\n") + file.write("RUN pip install flask praisonai==2.1.6 gunicorn markdown\n") file.write("EXPOSE 8080\n") file.write('CMD ["gunicorn", "-b", "0.0.0.0:8080", "api:app"]\n') diff --git a/docs/deploy/mcp-server-deploy.mdx b/docs/deploy/mcp-server-deploy.mdx new file mode 100644 index 000000000..c538606c9 --- /dev/null +++ b/docs/deploy/mcp-server-deploy.mdx @@ -0,0 +1,371 @@ +--- +title: "Deploying MCP Servers" +sidebarTitle: "MCP Servers" +description: "Learn how to deploy Model Context Protocol (MCP) servers for production environments" +icon: "server-rack" +--- + +# Deploying MCP Servers + +This guide focuses on deploying Model Context Protocol (MCP) servers for production environments. MCP servers allow AI models to access tools and external systems through a standardized protocol. + +## Quick Start + + + + Make sure you have the required packages installed: + ```bash + pip install "praisonaiagents[mcp]>=0.0.81" + ``` + + For the multi-agent example with search capabilities: + ```bash + pip install "praisonaiagents[mcp]>=0.0.81" duckduckgo-search + ``` + + + **Single Agent MCP Server** + + Create a file named `simple-mcp-server.py`: + ```python + from praisonaiagents import Agent + + agent = Agent(name="TweetAgent", instructions="Create a Tweet based on the topic provided") + agent.launch(port=8080, host="0.0.0.0", protocol="mcp") + ``` + + **Multi-Agent MCP Server with Custom Tools** + + Create a file named `simple-mcp-multi-agents-server.py`: + ```python + from praisonaiagents import Agent, Agents + from duckduckgo_search import DDGS + + def internet_search_tool(query: str): + results = [] + ddgs = DDGS() + for result in ddgs.text(keywords=query, max_results=5): + results.append({ + "title": result.get("title", ""), + "url": result.get("href", ""), + "snippet": result.get("body", "") + }) + return results + + agent = Agent(name="SearchAgent", instructions="You Search the internet for information", tools=[internet_search_tool]) + agent2 = Agent(name="SummariseAgent", instructions="You Summarise the information") + + agents = Agents(name="MultiAgents", agents=[agent, agent2]) + agents.launch(port=8080, host="0.0.0.0", protocol="mcp") + ``` + + + +## Containerization with Docker + + + + ```dockerfile + FROM python:3.11-slim + + WORKDIR /app + + # Install dependencies + COPY requirements.txt . + RUN pip install --no-cache-dir -r requirements.txt + + # Copy application code + COPY . . + + # Expose the port + EXPOSE 8080 + + # Run the MCP server + CMD ["python", "simple-mcp-server.py"] + ``` + + Create a `requirements.txt` file: + ``` + praisonaiagents[mcp]>=0.0.81 + duckduckgo-search # Only needed for the multi-agent example + ``` + + + ```bash + # Build the Docker image + docker build -t mcp-server . + + # Run the container + docker run -p 8080:8080 -e OPENAI_API_KEY=your_api_key mcp-server + ``` + + + +## Cloud Deployment + +### AWS Elastic Container Service (ECS) + + + + ```bash + # Create an ECR repository + aws ecr create-repository --repository-name mcp-server + + # Authenticate Docker to ECR + aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin YOUR_AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com + + # Tag and push the image + docker tag mcp-server:latest YOUR_AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/mcp-server:latest + docker push YOUR_AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/mcp-server:latest + ``` + + + ```json + { + "family": "mcp-server", + "networkMode": "awsvpc", + "executionRoleArn": "arn:aws:iam::YOUR_AWS_ACCOUNT_ID:role/ecsTaskExecutionRole", + "containerDefinitions": [ + { + "name": "mcp-server", + "image": "YOUR_AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/mcp-server:latest", + "essential": true, + "portMappings": [ + { + "containerPort": 8080, + "hostPort": 8080, + "protocol": "tcp" + } + ], + "environment": [ + { + "name": "OPENAI_API_KEY", + "value": "your_api_key" + } + ], + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/mcp-server", + "awslogs-region": "us-east-1", + "awslogs-stream-prefix": "ecs" + } + } + } + ], + "requiresCompatibilities": [ + "FARGATE" + ], + "cpu": "256", + "memory": "512" + } + ``` + + + ```bash + # Create a service + aws ecs create-service \ + --cluster your-cluster \ + --service-name mcp-server \ + --task-definition mcp-server:1 \ + --desired-count 1 \ + --launch-type FARGATE \ + --network-configuration "awsvpcConfiguration={subnets=[subnet-12345678],securityGroups=[sg-12345678],assignPublicIp=ENABLED}" \ + --load-balancers "targetGroupArn=arn:aws:elasticloadbalancing:us-east-1:YOUR_AWS_ACCOUNT_ID:targetgroup/mcp-server-tg/1234567890abcdef,containerName=mcp-server,containerPort=8080" + ``` + + + +### Google Cloud Run + + + + ```bash + # Configure Docker to use Google Cloud credentials + gcloud auth configure-docker + + # Build and tag the image + docker build -t gcr.io/YOUR_PROJECT_ID/mcp-server . + + # Push the image + docker push gcr.io/YOUR_PROJECT_ID/mcp-server + ``` + + + ```bash + gcloud run deploy mcp-server \ + --image gcr.io/YOUR_PROJECT_ID/mcp-server \ + --platform managed \ + --region us-central1 \ + --allow-unauthenticated \ + --set-env-vars="OPENAI_API_KEY=your_api_key" + ``` + + + +## Production Configuration + +### Security + +For production deployments, implement these security measures: + +1. **API Key Authentication**: + ```python + agent.launch(port=8080, host="0.0.0.0", protocol="mcp", api_key="your-secret-key") + ``` + +2. **HTTPS with SSL/TLS**: + Set up a reverse proxy like Nginx with SSL certificates: + ```nginx + server { + listen 443 ssl; + server_name your-mcp-server.com; + + ssl_certificate /path/to/cert.pem; + ssl_certificate_key /path/to/key.pem; + + location / { + proxy_pass http://localhost:8080; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + } + } + ``` + +3. **Secret Management**: + Use environment variables or a secrets manager for API keys: + ```bash + # AWS Secrets Manager + aws secretsmanager create-secret \ + --name OPENAI_API_KEY \ + --secret-string "your_api_key" + + # Google Secret Manager + echo -n "your_api_key" | gcloud secrets create openai-api-key --data-file=- + ``` + +### Scaling + +For high-traffic MCP servers, consider these scaling strategies: + +1. **Load Balancing**: + Deploy multiple instances behind a load balancer. + +2. **Auto Scaling**: + Configure auto-scaling based on CPU/memory usage or request count. + +3. **Resource Allocation**: + Allocate sufficient CPU and memory for your MCP servers: + ```bash + # AWS ECS + aws ecs update-service \ + --cluster your-cluster \ + --service mcp-server \ + --desired-count 3 + + # Google Cloud Run + gcloud run services update mcp-server \ + --min-instances=2 \ + --max-instances=10 \ + --memory=2Gi \ + --cpu=1 + ``` + +## Monitoring and Logging + +Set up comprehensive monitoring for your MCP servers: + +1. **Application Logging**: + ```python + import logging + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("mcp-server.log"), + logging.StreamHandler() + ] + ) + + logger = logging.getLogger("mcp-server") + ``` + +2. **Health Checks**: + Create a health check endpoint: + ```python + from flask import Flask + + app = Flask(__name__) + + @app.route('/health') + def health_check(): + return {"status": "healthy"}, 200 + + if __name__ == "__main__": + app.run(host="0.0.0.0", port=8081) + ``` + +3. **Metrics Collection**: + Use Prometheus or similar tools to collect metrics. + +## Testing MCP Servers + +Before deploying to production, thoroughly test your MCP server: + +```python +import requests +import json + +def test_mcp_server(): + url = "http://localhost:8080/v1/chat/completions" + + headers = { + "Content-Type": "application/json" + } + + data = { + "messages": [ + {"role": "user", "content": "Create a tweet about artificial intelligence"} + ] + } + + response = requests.post(url, headers=headers, data=json.dumps(data)) + + print(f"Status Code: {response.status_code}") + print(f"Response: {response.json()}") + +if __name__ == "__main__": + test_mcp_server() +``` + +## Deployment Checklist + +Before going live with your MCP server, ensure you've addressed these items: + +- [ ] Implemented proper authentication +- [ ] Set up HTTPS with valid SSL certificates +- [ ] Configured proper logging and monitoring +- [ ] Tested the server under load +- [ ] Implemented rate limiting +- [ ] Secured all API keys and credentials +- [ ] Set up automated backups +- [ ] Created a disaster recovery plan +- [ ] Documented the deployment process + +## Features + + + + Package your MCP servers in Docker containers for consistent deployment. + + + Deploy to AWS, Google Cloud, or other cloud providers with ease. + + + Scale your MCP servers to handle production workloads. + + + Implement best practices for secure MCP server deployments. + + diff --git a/docs/developers/local-development.mdx b/docs/developers/local-development.mdx index e9ab16c00..c3f30dc45 100644 --- a/docs/developers/local-development.mdx +++ b/docs/developers/local-development.mdx @@ -27,7 +27,7 @@ WORKDIR /app COPY . . -RUN pip install flask praisonai==2.1.5 watchdog +RUN pip install flask praisonai==2.1.6 watchdog EXPOSE 5555 diff --git a/docs/mcp/mcp-server.mdx b/docs/mcp/mcp-server.mdx new file mode 100644 index 000000000..ca3e9d70d --- /dev/null +++ b/docs/mcp/mcp-server.mdx @@ -0,0 +1,196 @@ +--- +title: "Creating MCP Servers" +sidebarTitle: "MCP Servers" +description: "Learn how to create Model Context Protocol (MCP) servers with PraisonAI agents" +icon: "server" +--- + +# Creating MCP Servers + +This guide demonstrates how to create Model Context Protocol (MCP) servers using PraisonAI agents. MCP is a protocol that enables AI models to use tools and communicate with external systems in a standardized way. + +## Single Agent MCP Server + +The simplest way to create an MCP server is with a single agent. This approach is ideal for specialized tasks where you need just one agent with a specific capability. + + + + Make sure you have the required packages installed: + ```bash + pip install "praisonaiagents[mcp]>=0.0.81" + ``` + + + Create a file named `simple-mcp-server.py` with the following code: + ```python + from praisonaiagents import Agent + + agent = Agent(name="TweetAgent", instructions="Create a Tweet based on the topic provided") + agent.launch(port=8080, protocol="mcp") + ``` + + + ```bash + python simple-mcp-server.py + ``` + + Your MCP server will be available at `http://localhost:8080` + + + +## Multi-Agent MCP Server with Custom Tools + +For more complex scenarios, you can create an MCP server with multiple agents and custom tools. This approach allows for collaborative problem-solving and specialized capabilities. + + + + ```bash + pip install "praisonaiagents[mcp]>=0.0.81" duckduckgo-search + ``` + + + Create a file named `simple-mcp-multi-agents-server.py` with the following code: + ```python + from praisonaiagents import Agent, Agents + from duckduckgo_search import DDGS + + def internet_search_tool(query: str): + results = [] + ddgs = DDGS() + for result in ddgs.text(keywords=query, max_results=5): + results.append({ + "title": result.get("title", ""), + "url": result.get("href", ""), + "snippet": result.get("body", "") + }) + return results + + agent = Agent(name="SearchAgent", instructions="You Search the internet for information", tools=[internet_search_tool]) + agent2 = Agent(name="SummariseAgent", instructions="You Summarise the information") + + agents = Agents(name="MultiAgents", agents=[agent, agent2]) + agents.launch(port=8080, protocol="mcp") + ``` + + + ```bash + python simple-mcp-multi-agents-server.py + ``` + + Your multi-agent MCP server will be available at `http://localhost:8080` + + + +## Connecting to MCP Servers + +You can connect to MCP servers using various clients: + +### Using PraisonAI Agents + +```python +from praisonaiagents import Agent, MCP + +client_agent = Agent( + instructions="Use the MCP server to complete tasks", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080") +) + +response = client_agent.start("Create a tweet about artificial intelligence") +print(response) +``` + +### Using JavaScript/TypeScript + +```typescript +import { MCPClient } from '@modelcontextprotocol/client'; + +async function main() { + const client = new MCPClient('http://localhost:8080'); + + const response = await client.chat([ + { role: 'user', content: 'Create a tweet about artificial intelligence' } + ]); + + console.log(response.choices[0].message.content); +} + +main(); +``` + +## Advanced Configuration + +### Custom Port and Host + +```python +agent.launch(port=9000, host="0.0.0.0", protocol="mcp") +``` + +### Authentication + +```python +agent.launch(port=8080, protocol="mcp", api_key="your-secret-key") +``` + +### CORS Configuration + +```python +agent.launch(port=8080, protocol="mcp", cors_origins=["https://yourdomain.com"]) +``` + +## Deployment Options + +For production deployments, consider: + +1. **Docker Containerization**: + ```dockerfile + FROM python:3.11-slim + + WORKDIR /app + + COPY requirements.txt . + RUN pip install --no-cache-dir -r requirements.txt + + COPY . . + + EXPOSE 8080 + + CMD ["python", "simple-mcp-server.py"] + ``` + +2. **Cloud Deployment**: Deploy to AWS, Google Cloud, or Azure using their container services. + +3. **Kubernetes**: For scalable deployments, use Kubernetes to manage your MCP server containers. + +## Security Considerations + +1. **API Authentication**: Always use API keys in production +2. **Rate Limiting**: Implement rate limiting to prevent abuse +3. **Input Validation**: Validate all incoming requests +4. **HTTPS**: Use SSL/TLS for all production deployments +5. **Tool Permissions**: Limit what custom tools can access + +## Features and Benefits + + + + MCP provides a standardized way for AI models to interact with tools and services. + + + Easily integrate custom tools like web search, database access, or API calls. + + + Create systems where multiple specialized agents collaborate on complex tasks. + + + Connect to MCP servers from any programming language that supports HTTP. + + + +## Best Practices + +1. **Agent Instructions**: Provide clear, specific instructions for each agent +2. **Tool Documentation**: Document your custom tools thoroughly +3. **Error Handling**: Implement robust error handling in your tools +4. **Monitoring**: Set up logging and monitoring for your MCP servers +5. **Testing**: Test your MCP servers thoroughly before deployment diff --git a/docs/mint.json b/docs/mint.json index 575d4d45d..eacc47f7a 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -240,6 +240,7 @@ "mcp/airbnb", "mcp/stdio", "mcp/sse", + "mcp/mcp-server", "mcp/ollama", "mcp/groq", "mcp/openrouter", @@ -377,6 +378,7 @@ "pages": [ "deploy/deploy", "deploy/multi-agents-deploy", + "deploy/mcp-server-deploy", "deploy/aws", "deploy/googlecloud" ] diff --git a/docs/ui/chat.mdx b/docs/ui/chat.mdx index 02878517b..8b243b846 100644 --- a/docs/ui/chat.mdx +++ b/docs/ui/chat.mdx @@ -155,7 +155,7 @@ To facilitate local development with live reload, you can use Docker. Follow the COPY . . - RUN pip install flask praisonai==2.1.5 watchdog + RUN pip install flask praisonai==2.1.6 watchdog EXPOSE 5555 diff --git a/docs/ui/code.mdx b/docs/ui/code.mdx index 5224cd58e..3d1cd3d9f 100644 --- a/docs/ui/code.mdx +++ b/docs/ui/code.mdx @@ -208,7 +208,7 @@ To facilitate local development with live reload, you can use Docker. Follow the COPY . . - RUN pip install flask praisonai==2.1.5 watchdog + RUN pip install flask praisonai==2.1.6 watchdog EXPOSE 5555 diff --git a/examples/api/mcp-sse.py b/examples/api/mcp-sse.py new file mode 100644 index 000000000..7cb92b995 --- /dev/null +++ b/examples/api/mcp-sse.py @@ -0,0 +1,9 @@ +from praisonaiagents import Agent, MCP + +qa_agent = Agent( + instructions="""You are a Question Answering Agent.""", + llm="openai/gpt-4o-mini", + tools=MCP("http://localhost:8080/agents/sse") +) + +qa_agent.start("AI in 2025") \ No newline at end of file diff --git a/examples/api/simple-mcp-multi-agents-server.py b/examples/api/simple-mcp-multi-agents-server.py new file mode 100644 index 000000000..fea430d40 --- /dev/null +++ b/examples/api/simple-mcp-multi-agents-server.py @@ -0,0 +1,19 @@ +from praisonaiagents import Agent, Agents +from duckduckgo_search import DDGS + +def internet_search_tool(query: str): + results = [] + ddgs = DDGS() + for result in ddgs.text(keywords=query, max_results=5): + results.append({ + "title": result.get("title", ""), + "url": result.get("href", ""), + "snippet": result.get("body", "") + }) + return results + +agent = Agent(name="SearchAgent", instructions="You Search the internet for information", tools=[internet_search_tool]) +agent2 = Agent(name="SummariseAgent", instructions="You Summarise the information") + +agents = Agents(name="MultiAgents", agents=[agent, agent2]) +agents.launch(port=8080, protocol="mcp") \ No newline at end of file diff --git a/examples/api/simple-mcp-server.py b/examples/api/simple-mcp-server.py new file mode 100644 index 000000000..74b20f5ec --- /dev/null +++ b/examples/api/simple-mcp-server.py @@ -0,0 +1,4 @@ +from praisonaiagents import Agent + +agent = Agent(name="TweetAgent", instructions="Create a Tweet based on the topic provided") +agent.launch(port=8080, protocol="mcp") \ No newline at end of file diff --git a/praisonai/deploy.py b/praisonai/deploy.py index d6069ab6f..bc2b09d70 100644 --- a/praisonai/deploy.py +++ b/praisonai/deploy.py @@ -56,7 +56,7 @@ def create_dockerfile(self): file.write("FROM python:3.11-slim\n") file.write("WORKDIR /app\n") file.write("COPY . .\n") - file.write("RUN pip install flask praisonai==2.1.5 gunicorn markdown\n") + file.write("RUN pip install flask praisonai==2.1.6 gunicorn markdown\n") file.write("EXPOSE 8080\n") file.write('CMD ["gunicorn", "-b", "0.0.0.0:8080", "api:app"]\n') diff --git a/pyproject.toml b/pyproject.toml index 29b14bff8..9495e380d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "PraisonAI" -version = "2.1.5" +version = "2.1.6" description = "PraisonAI is an AI Agents Framework with Self Reflection. PraisonAI application combines PraisonAI Agents, AutoGen, and CrewAI into a low-code solution for building and managing multi-agent LLM systems, focusing on simplicity, customisation, and efficient human-agent collaboration." readme = "README.md" license = "" @@ -12,7 +12,7 @@ dependencies = [ "rich>=13.7", "markdown>=3.5", "pyparsing>=3.0.0", - "praisonaiagents>=0.0.81", + "praisonaiagents>=0.0.82", "python-dotenv>=0.19.0", "instructor>=1.3.3", "PyYAML>=6.0", @@ -89,7 +89,7 @@ autogen = ["pyautogen>=0.2.19", "praisonai-tools>=0.0.15", "crewai"] [tool.poetry] name = "PraisonAI" -version = "2.1.5" +version = "2.1.6" description = "PraisonAI is an AI Agents Framework with Self Reflection. PraisonAI application combines PraisonAI Agents, AutoGen, and CrewAI into a low-code solution for building and managing multi-agent LLM systems, focusing on simplicity, customisation, and efficient human-agent collaboration." authors = ["Mervin Praison"] license = "" @@ -107,7 +107,7 @@ python = ">=3.10,<3.13" rich = ">=13.7" markdown = ">=3.5" pyparsing = ">=3.0.0" -praisonaiagents = ">=0.0.81" +praisonaiagents = ">=0.0.82" python-dotenv = ">=0.19.0" instructor = ">=1.3.3" PyYAML = ">=6.0" diff --git a/src/praisonai-agents/mcp-sse.py b/src/praisonai-agents/mcp-sse.py new file mode 100644 index 000000000..7cb92b995 --- /dev/null +++ b/src/praisonai-agents/mcp-sse.py @@ -0,0 +1,9 @@ +from praisonaiagents import Agent, MCP + +qa_agent = Agent( + instructions="""You are a Question Answering Agent.""", + llm="openai/gpt-4o-mini", + tools=MCP("http://localhost:8080/agents/sse") +) + +qa_agent.start("AI in 2025") \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index a3d0150d3..7428b5583 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -1095,6 +1095,10 @@ async def achat(self, prompt: str, temperature=0.2, tools=None, output_json=None start_time = time.time() reasoning_steps = reasoning_steps or self.reasoning_steps try: + # Default to self.tools if tools argument is None + if tools is None: + tools = self.tools + # Search for existing knowledge if any knowledge is provided if self.knowledge: search_results = self.knowledge.search(prompt, agent_id=self.agent_id) @@ -1408,192 +1412,329 @@ async def execute_tool_async(self, function_name: str, arguments: Dict[str, Any] logging.error(f"Error in execute_tool_async: {str(e)}", exc_info=True) return {"error": f"Error in execute_tool_async: {str(e)}"} - def launch(self, path: str = '/', port: int = 8000, host: str = '0.0.0.0', debug: bool = False): + def launch(self, path: str = '/', port: int = 8000, host: str = '0.0.0.0', debug: bool = False, protocol: str = "http"): """ - Launch the agent as an HTTP API endpoint. + Launch the agent as an HTTP API endpoint or an MCP server. Args: - path: API endpoint path (default: '/') + path: API endpoint path (default: '/') for HTTP, or base path for MCP. port: Server port (default: 8000) host: Server host (default: '0.0.0.0') debug: Enable debug mode for uvicorn (default: False) + protocol: "http" to launch as FastAPI, "mcp" to launch as MCP server. Returns: None """ - global _server_started, _registered_agents, _shared_apps - - # Try to import FastAPI dependencies - lazy loading - try: - import uvicorn - from fastapi import FastAPI, HTTPException, Request - from fastapi.responses import JSONResponse - from pydantic import BaseModel - import threading - import time + if protocol == "http": + global _server_started, _registered_agents, _shared_apps - # Define the request model here since we need pydantic - class AgentQuery(BaseModel): - query: str + # Try to import FastAPI dependencies - lazy loading + try: + import uvicorn + from fastapi import FastAPI, HTTPException, Request + from fastapi.responses import JSONResponse + from pydantic import BaseModel + import threading + import time - except ImportError as e: - # Check which specific module is missing - missing_module = str(e).split("No module named '")[-1].rstrip("'") - display_error(f"Missing dependency: {missing_module}. Required for launch() method.") - logging.error(f"Missing dependency: {missing_module}. Required for launch() method.") - print(f"\nTo add API capabilities, install the required dependencies:") - print(f"pip install {missing_module}") - print("\nOr install all API dependencies with:") - print("pip install 'praisonaiagents[api]'") - return None - - # Initialize port-specific collections if needed - if port not in _registered_agents: - _registered_agents[port] = {} - - # Initialize shared FastAPI app if not already created for this port - if _shared_apps.get(port) is None: - _shared_apps[port] = FastAPI( - title=f"PraisonAI Agents API (Port {port})", - description="API for interacting with PraisonAI Agents" - ) + # Define the request model here since we need pydantic + class AgentQuery(BaseModel): + query: str + + except ImportError as e: + # Check which specific module is missing + missing_module = str(e).split("No module named '")[-1].rstrip("'") + display_error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") + logging.error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") + print(f"\nTo add API capabilities, install the required dependencies:") + print(f"pip install {missing_module}") + print("\nOr install all API dependencies with:") + print("pip install 'praisonaiagents[api]'") + return None + + # Initialize port-specific collections if needed + if port not in _registered_agents: + _registered_agents[port] = {} + + # Initialize shared FastAPI app if not already created for this port + if _shared_apps.get(port) is None: + _shared_apps[port] = FastAPI( + title=f"PraisonAI Agents API (Port {port})", + description="API for interacting with PraisonAI Agents" + ) + + # Add a root endpoint with a welcome message + @_shared_apps[port].get("/") + async def root(): + return { + "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", + "endpoints": list(_registered_agents[port].keys()) + } + + # Add healthcheck endpoint + @_shared_apps[port].get("/health") + async def healthcheck(): + return { + "status": "ok", + "endpoints": list(_registered_agents[port].keys()) + } - # Add a root endpoint with a welcome message - @_shared_apps[port].get("/") - async def root(): - return { - "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", - "endpoints": list(_registered_agents[port].keys()) - } + # Normalize path to ensure it starts with / + if not path.startswith('/'): + path = f'/{path}' + + # Check if path is already registered for this port + if path in _registered_agents[port]: + logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") + print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") + # Use a modified path to avoid conflicts + original_path = path + path = f"{path}_{self.agent_id[:6]}" + logging.warning(f"Using '{path}' instead of '{original_path}'") + print(f"🔄 Using '{path}' instead") - # Add healthcheck endpoint - @_shared_apps[port].get("/health") - async def healthcheck(): - return { - "status": "ok", - "endpoints": list(_registered_agents[port].keys()) - } - - # Normalize path to ensure it starts with / - if not path.startswith('/'): - path = f'/{path}' + # Register the agent to this path + _registered_agents[port][path] = self.agent_id - # Check if path is already registered for this port - if path in _registered_agents[port]: - logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") - print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") - # Use a modified path to avoid conflicts - original_path = path - path = f"{path}_{self.agent_id[:6]}" - logging.warning(f"Using '{path}' instead of '{original_path}'") - print(f"🔄 Using '{path}' instead") - - # Register the agent to this path - _registered_agents[port][path] = self.agent_id - - # Define the endpoint handler - @_shared_apps[port].post(path) - async def handle_agent_query(request: Request, query_data: Optional[AgentQuery] = None): - # Handle both direct JSON with query field and form data - if query_data is None: + # Define the endpoint handler + @_shared_apps[port].post(path) + async def handle_agent_query(request: Request, query_data: Optional[AgentQuery] = None): + # Handle both direct JSON with query field and form data + if query_data is None: + try: + request_data = await request.json() + if "query" not in request_data: + raise HTTPException(status_code=400, detail="Missing 'query' field in request") + query = request_data["query"] + except: + # Fallback to form data or query params + form_data = await request.form() + if "query" in form_data: + query = form_data["query"] + else: + raise HTTPException(status_code=400, detail="Missing 'query' field in request") + else: + query = query_data.query + try: - request_data = await request.json() - if "query" not in request_data: - raise HTTPException(status_code=400, detail="Missing 'query' field in request") - query = request_data["query"] - except: - # Fallback to form data or query params - form_data = await request.form() - if "query" in form_data: - query = form_data["query"] + # Use async version if available, otherwise use sync version + if asyncio.iscoroutinefunction(self.chat): + response = await self.achat(query) else: - raise HTTPException(status_code=400, detail="Missing 'query' field in request") - else: - query = query_data.query + # Run sync function in a thread to avoid blocking + loop = asyncio.get_event_loop() + response = await loop.run_in_executor(None, lambda p=query: self.chat(p)) + + return {"response": response} + except Exception as e: + logging.error(f"Error processing query: {str(e)}", exc_info=True) + return JSONResponse( + status_code=500, + content={"error": f"Error processing query: {str(e)}"} + ) + + print(f"🚀 Agent '{self.name}' available at http://{host}:{port}") + + # Start the server if it's not already running for this port + if not _server_started.get(port, False): + # Mark the server as started first to prevent duplicate starts + _server_started[port] = True - try: - # Use async version if available, otherwise use sync version - if asyncio.iscoroutinefunction(self.chat): - response = await self.achat(query) - else: - # Run sync function in a thread to avoid blocking - loop = asyncio.get_event_loop() - response = await loop.run_in_executor(None, lambda: self.chat(query)) + # Start the server in a separate thread + def run_server(): + try: + print(f"✅ FastAPI server started at http://{host}:{port}") + print(f"📚 API documentation available at http://{host}:{port}/docs") + print(f"🔌 Available endpoints: {', '.join(list(_registered_agents[port].keys()))}") + uvicorn.run(_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") + except Exception as e: + logging.error(f"Error starting server: {str(e)}", exc_info=True) + print(f"❌ Error starting server: {str(e)}") - return {"response": response} - except Exception as e: - logging.error(f"Error processing query: {str(e)}", exc_info=True) - return JSONResponse( - status_code=500, - content={"error": f"Error processing query: {str(e)}"} - ) - - print(f"🚀 Agent '{self.name}' available at http://{host}:{port}{path}") - - # Start the server if it's not already running for this port - if not _server_started.get(port, False): - # Mark the server as started first to prevent duplicate starts - _server_started[port] = True + # Run server in a background thread + server_thread = threading.Thread(target=run_server, daemon=True) + server_thread.start() + + # Wait for a moment to allow the server to start and register endpoints + time.sleep(0.5) + else: + # If server is already running, wait a moment to make sure the endpoint is registered + time.sleep(0.1) + print(f"🔌 Available endpoints on port {port}: {', '.join(list(_registered_agents[port].keys()))}") + + # Get the stack frame to check if this is the last launch() call in the script + import inspect + stack = inspect.stack() - # Start the server in a separate thread - def run_server(): + # If this is called from a Python script (not interactive), try to detect if it's the last launch call + if len(stack) > 1 and stack[1].filename.endswith('.py'): + caller_frame = stack[1] + caller_line = caller_frame.lineno + try: - print(f"✅ FastAPI server started at http://{host}:{port}") - print(f"📚 API documentation available at http://{host}:{port}/docs") - print(f"🔌 Available endpoints: {', '.join(list(_registered_agents[port].keys()))}") - uvicorn.run(_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") + # Read the file to check if there are more launch calls after this one + with open(caller_frame.filename, 'r') as f: + lines = f.readlines() + + # Check if there are more launch() calls after the current line + has_more_launches = False + for line_content in lines[caller_line:]: # renamed line to line_content + if '.launch(' in line_content and not line_content.strip().startswith('#'): + has_more_launches = True + break + + # If this is the last launch call, block the main thread + if not has_more_launches: + try: + print("\nAll agents registered for HTTP mode. Press Ctrl+C to stop the servers.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nServers stopped") except Exception as e: - logging.error(f"Error starting server: {str(e)}", exc_info=True) - print(f"❌ Error starting server: {str(e)}") - - # Run server in a background thread - server_thread = threading.Thread(target=run_server, daemon=True) - server_thread.start() - - # Wait for a moment to allow the server to start and register endpoints - time.sleep(0.5) - else: - # If server is already running, wait a moment to make sure the endpoint is registered - time.sleep(0.1) - print(f"🔌 Available endpoints on port {port}: {', '.join(list(_registered_agents[port].keys()))}") - - # Get the stack frame to check if this is the last launch() call in the script - import inspect - stack = inspect.stack() - - # If this is called from a Python script (not interactive), try to detect if it's the last launch call - if len(stack) > 1 and stack[1].filename.endswith('.py'): - caller_frame = stack[1] - caller_line = caller_frame.lineno + # If something goes wrong with detection, block anyway to be safe + logging.error(f"Error in launch detection: {e}") + try: + print("\nKeeping HTTP servers alive. Press Ctrl+C to stop.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nServers stopped") + return None + elif protocol == "mcp": try: - # Read the file to check if there are more launch calls after this one - with open(caller_frame.filename, 'r') as f: - lines = f.readlines() + import uvicorn + from mcp.server.fastmcp import FastMCP + from mcp.server.sse import SseServerTransport + from starlette.applications import Starlette + from starlette.requests import Request + from starlette.routing import Mount, Route + from mcp.server import Server as MCPServer # Alias to avoid conflict + import threading + import time + import inspect + import asyncio # Ensure asyncio is imported + # logging is already imported at the module level - # Check if there are more launch() calls after the current line - has_more_launches = False - for line in lines[caller_line:]: - if '.launch(' in line and not line.strip().startswith('#'): - has_more_launches = True - break - - # If this is the last launch call, block the main thread - if not has_more_launches: + except ImportError as e: + missing_module = str(e).split("No module named '")[-1].rstrip("'") + display_error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") + logging.error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") + print(f"\nTo add MCP capabilities, install the required dependencies:") + print(f"pip install {missing_module} mcp praison-mcp starlette uvicorn") # Added mcp, praison-mcp, starlette, uvicorn + print("\nOr install all MCP dependencies with relevant packages.") + return None + + mcp_server_instance_name = f"{self.name}_mcp_server" if self.name else "agent_mcp_server" + mcp = FastMCP(mcp_server_instance_name) + + # Determine the MCP tool name based on self.name + actual_mcp_tool_name = f"execute_{self.name.lower().replace(' ', '_').replace('-', '_')}_task" if self.name \ + else "execute_task" + + @mcp.tool(name=actual_mcp_tool_name) + async def execute_agent_task(prompt: str) -> str: + """Executes the agent's primary task with the given prompt.""" + logging.info(f"MCP tool '{actual_mcp_tool_name}' called with prompt: {prompt}") + try: + # Ensure self.achat is used as it's the async version and pass its tools + if hasattr(self, 'achat') and asyncio.iscoroutinefunction(self.achat): + response = await self.achat(prompt, tools=self.tools) + elif hasattr(self, 'chat'): # Fallback for synchronous chat + loop = asyncio.get_event_loop() + response = await loop.run_in_executor(None, lambda p=prompt: self.chat(p, tools=self.tools)) + else: + logging.error(f"Agent {self.name} has no suitable chat or achat method for MCP tool.") + return f"Error: Agent {self.name} misconfigured for MCP." + return response if response is not None else "Agent returned no response." + except Exception as e: + logging.error(f"Error in MCP tool '{actual_mcp_tool_name}': {e}", exc_info=True) + return f"Error executing task: {str(e)}" + + # Normalize base_path for MCP routes + base_path = path.rstrip('/') + sse_path = f"{base_path}/sse" + messages_path_prefix = f"{base_path}/messages" # Prefix for message posting + + # Ensure messages_path ends with a slash for Mount + if not messages_path_prefix.endswith('/'): + messages_path_prefix += '/' + + + sse_transport = SseServerTransport(messages_path_prefix) # Pass the full prefix + + async def handle_sse_connection(request: Request) -> None: + logging.debug(f"SSE connection request received from {request.client} for path {request.url.path}") + async with sse_transport.connect_sse( + request.scope, + request.receive, + request._send, # noqa: SLF001 + ) as (read_stream, write_stream): + await mcp._mcp_server.run( # Use the underlying server from FastMCP + read_stream, + write_stream, + mcp._mcp_server.create_initialization_options(), + ) + + starlette_app = Starlette( + debug=debug, + routes=[ + Route(sse_path, endpoint=handle_sse_connection), + Mount(messages_path_prefix, app=sse_transport.handle_post_message), + ], + ) + + print(f"🚀 Agent '{self.name}' MCP server starting on http://{host}:{port}") + print(f"📡 MCP SSE endpoint available at {sse_path}") + print(f"📢 MCP messages post to {messages_path_prefix}") + # Instead of trying to extract tool names, hardcode the known tool name + tool_names = [actual_mcp_tool_name] # Use the determined dynamic tool name + print(f"🛠️ Available MCP tools: {', '.join(tool_names)}") + + # Uvicorn server running logic (similar to HTTP mode but standalone for MCP) + def run_mcp_server(): + try: + uvicorn.run(starlette_app, host=host, port=port, log_level="debug" if debug else "info") + except Exception as e: + logging.error(f"Error starting MCP server: {str(e)}", exc_info=True) + print(f"❌ Error starting MCP server: {str(e)}") + + server_thread = threading.Thread(target=run_mcp_server, daemon=True) + server_thread.start() + time.sleep(0.5) # Allow server to start + + # Blocking logic for MCP mode + import inspect # Already imported but good for clarity + stack = inspect.stack() + if len(stack) > 1 and stack[1].filename.endswith('.py'): + caller_frame = stack[1] + caller_line = caller_frame.lineno + try: + with open(caller_frame.filename, 'r') as f: + lines = f.readlines() + has_more_launches = False + for line_content in lines[caller_line:]: # renamed line to line_content + if '.launch(' in line_content and not line_content.strip().startswith('#'): + has_more_launches = True + break + if not has_more_launches: + try: + print("\nAgent MCP server running. Press Ctrl+C to stop.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nMCP Server stopped") + except Exception as e: + logging.error(f"Error in MCP launch detection: {e}") try: - print("\nAll agents registered. Press Ctrl+C to stop the servers.") + print("\nKeeping MCP server alive. Press Ctrl+C to stop.") while True: time.sleep(1) except KeyboardInterrupt: - print("\nServers stopped") - except Exception as e: - # If something goes wrong with detection, block anyway to be safe - logging.error(f"Error in launch detection: {e}") - try: - print("\nKeeping servers alive. Press Ctrl+C to stop.") - while True: - time.sleep(1) - except KeyboardInterrupt: - print("\nServers stopped") - - return None \ No newline at end of file + print("\nMCP Server stopped") + return None + else: + display_error(f"Invalid protocol: {protocol}. Choose 'http' or 'mcp'.") + return None \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/agents/agents.py b/src/praisonai-agents/praisonaiagents/agents/agents.py index 31b86481d..03b49c558 100644 --- a/src/praisonai-agents/praisonaiagents/agents/agents.py +++ b/src/praisonai-agents/praisonaiagents/agents/agents.py @@ -50,7 +50,7 @@ def process_video(video_path: str, seconds_per_frame=2): return base64_frames class PraisonAIAgents: - def __init__(self, agents, tasks=None, verbose=0, completion_checker=None, max_retries=5, process="sequential", manager_llm=None, memory=False, memory_config=None, embedder=None, user_id=None, max_iter=10, stream=True): + def __init__(self, agents, tasks=None, verbose=0, completion_checker=None, max_retries=5, process="sequential", manager_llm=None, memory=False, memory_config=None, embedder=None, user_id=None, max_iter=10, stream=True, name: Optional[str] = None): # Add check at the start if memory is requested if memory: try: @@ -83,6 +83,7 @@ def __init__(self, agents, tasks=None, verbose=0, completion_checker=None, max_r self.max_retries = max_retries self.process = process self.stream = stream + self.name = name # Store the name for the Agents collection # Check for manager_llm in environment variable if not provided self.manager_llm = manager_llm or os.getenv('OPENAI_MODEL_NAME', 'gpt-4o') @@ -885,228 +886,381 @@ def clear_state(self) -> None: """Clear all state values""" self._state.clear() - def launch(self, path: str = '/agents', port: int = 8000, host: str = '0.0.0.0', debug: bool = False): + def launch(self, path: str = '/agents', port: int = 8000, host: str = '0.0.0.0', debug: bool = False, protocol: str = "http"): """ - Launch all agents as a single API endpoint. The endpoint accepts a query and processes it through - all agents in sequence, with the output of each agent feeding into the next. + Launch all agents as a single API endpoint (HTTP) or an MCP server. + In HTTP mode, the endpoint accepts a query and processes it through all agents in sequence. + In MCP mode, an MCP server is started, exposing a tool to run the agent workflow. Args: - path: API endpoint path (default: '/agents') + path: API endpoint path (default: '/agents') for HTTP, or base path for MCP. port: Server port (default: 8000) host: Server host (default: '0.0.0.0') debug: Enable debug mode for uvicorn (default: False) + protocol: "http" to launch as FastAPI, "mcp" to launch as MCP server. Returns: None """ - global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps - - if not self.agents: - logging.warning("No agents to launch. Add agents to the Agents instance first.") - return + if protocol == "http": + global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps - # Try to import FastAPI dependencies - lazy loading - try: - import uvicorn - from fastapi import FastAPI, HTTPException, Request - from fastapi.responses import JSONResponse - from pydantic import BaseModel - import threading - import time - - # Define the request model here since we need pydantic - class AgentQuery(BaseModel): - query: str + if not self.agents: + logging.warning("No agents to launch for HTTP mode. Add agents to the Agents instance first.") + return - except ImportError as e: - # Check which specific module is missing - missing_module = str(e).split("No module named '")[-1].rstrip("'") - display_error(f"Missing dependency: {missing_module}. Required for launch() method.") - logging.error(f"Missing dependency: {missing_module}. Required for launch() method.") - print(f"\nTo add API capabilities, install the required dependencies:") - print(f"pip install {missing_module}") - print("\nOr install all API dependencies with:") - print("pip install 'praisonaiagents[api]'") - return None - - # Initialize port-specific collections if needed - if port not in _agents_registered_endpoints: - _agents_registered_endpoints[port] = {} + # Try to import FastAPI dependencies - lazy loading + try: + import uvicorn + from fastapi import FastAPI, HTTPException, Request + from fastapi.responses import JSONResponse + from pydantic import BaseModel + import threading + import time + import asyncio # Ensure asyncio is imported for HTTP mode too + + # Define the request model here since we need pydantic + class AgentQuery(BaseModel): + query: str + + except ImportError as e: + # Check which specific module is missing + missing_module = str(e).split("No module named '")[-1].rstrip("'") + display_error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") + logging.error(f"Missing dependency: {missing_module}. Required for launch() method with HTTP mode.") + print(f"\nTo add API capabilities, install the required dependencies:") + print(f"pip install {missing_module}") + print("\nOr install all API dependencies with:") + print("pip install 'praisonaiagents[api]'") + return None - # Initialize shared FastAPI app if not already created for this port - if _agents_shared_apps.get(port) is None: - _agents_shared_apps[port] = FastAPI( - title=f"PraisonAI Agents API (Port {port})", - description="API for interacting with multiple PraisonAI Agents" - ) + # Initialize port-specific collections if needed + if port not in _agents_registered_endpoints: + _agents_registered_endpoints[port] = {} + + # Initialize shared FastAPI app if not already created for this port + if _agents_shared_apps.get(port) is None: + _agents_shared_apps[port] = FastAPI( + title=f"PraisonAI Agents API (Port {port})", + description="API for interacting with multiple PraisonAI Agents" + ) + + # Add a root endpoint with a welcome message + @_agents_shared_apps[port].get("/") + async def root(): + return { + "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", + "endpoints": list(_agents_registered_endpoints[port].keys()) + } + + # Add healthcheck endpoint + @_agents_shared_apps[port].get("/health") + async def healthcheck(): + return { + "status": "ok", + "endpoints": list(_agents_registered_endpoints[port].keys()) + } - # Add a root endpoint with a welcome message - @_agents_shared_apps[port].get("/") - async def root(): - return { - "message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.", - "endpoints": list(_agents_registered_endpoints[port].keys()) - } + # Normalize path to ensure it starts with / + if not path.startswith('/'): + path = f'/{path}' + + # Check if path is already registered for this port + if path in _agents_registered_endpoints[port]: + logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") + print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") + # Use a modified path to avoid conflicts + original_path = path + instance_id = str(uuid.uuid4())[:6] + path = f"{path}_{instance_id}" + logging.warning(f"Using '{path}' instead of '{original_path}'") + print(f"🔄 Using '{path}' instead") - # Add healthcheck endpoint - @_agents_shared_apps[port].get("/health") - async def healthcheck(): - return { - "status": "ok", - "endpoints": list(_agents_registered_endpoints[port].keys()) - } - - # Normalize path to ensure it starts with / - if not path.startswith('/'): - path = f'/{path}' + # Generate a unique ID for this agent group's endpoint + endpoint_id = str(uuid.uuid4()) + _agents_registered_endpoints[port][path] = endpoint_id - # Check if path is already registered for this port - if path in _agents_registered_endpoints[port]: - logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.") - print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.") - # Use a modified path to avoid conflicts - original_path = path - instance_id = str(uuid.uuid4())[:6] - path = f"{path}_{instance_id}" - logging.warning(f"Using '{path}' instead of '{original_path}'") - print(f"🔄 Using '{path}' instead") - - # Generate a unique ID for this agent group's endpoint - endpoint_id = str(uuid.uuid4()) - _agents_registered_endpoints[port][path] = endpoint_id - - # Define the endpoint handler - @_agents_shared_apps[port].post(path) - async def handle_query(request: Request, query_data: Optional[AgentQuery] = None): - # Handle both direct JSON with query field and form data - if query_data is None: + # Define the endpoint handler + @_agents_shared_apps[port].post(path) + async def handle_query(request: Request, query_data: Optional[AgentQuery] = None): + # Handle both direct JSON with query field and form data + if query_data is None: + try: + request_data = await request.json() + if "query" not in request_data: + raise HTTPException(status_code=400, detail="Missing 'query' field in request") + query = request_data["query"] + except: + # Fallback to form data or query params + form_data = await request.form() + if "query" in form_data: + query = form_data["query"] + else: + raise HTTPException(status_code=400, detail="Missing 'query' field in request") + else: + query = query_data.query + try: - request_data = await request.json() - if "query" not in request_data: - raise HTTPException(status_code=400, detail="Missing 'query' field in request") - query = request_data["query"] - except: - # Fallback to form data or query params - form_data = await request.form() - if "query" in form_data: - query = form_data["query"] - else: - raise HTTPException(status_code=400, detail="Missing 'query' field in request") + # Process the query sequentially through all agents + current_input = query + results = [] + + for agent_instance in self.agents: # Corrected variable name to agent_instance + try: + # Use async version if available, otherwise use sync version + if asyncio.iscoroutinefunction(agent_instance.chat): + response = await agent_instance.achat(current_input) + else: + # Run sync function in a thread to avoid blocking + loop = asyncio.get_event_loop() + # Correctly pass current_input to the lambda for closure + response = await loop.run_in_executor(None, lambda ci=current_input: agent_instance.chat(ci)) + + # Store this agent's result + results.append({ + "agent": agent_instance.name, + "response": response + }) + + # Use this response as input to the next agent + current_input = response + except Exception as e: + logging.error(f"Error with agent {agent_instance.name}: {str(e)}", exc_info=True) + results.append({ + "agent": agent_instance.name, + "error": str(e) + }) + # Decide error handling: continue with original input, last good input, or stop? + # For now, let's continue with the last successful 'current_input' or original 'query' if first agent fails + # This part might need refinement based on desired behavior. + # If an agent fails, its 'response' might be None or an error string. + # current_input will carry that forward. Or, we could choose to halt or use last good input. + + # Return all results and the final output + return { + "query": query, + "results": results, + "final_response": current_input + } + except Exception as e: + logging.error(f"Error processing query: {str(e)}", exc_info=True) + return JSONResponse( + status_code=500, + content={"error": f"Error processing query: {str(e)}"} + ) + + print(f"🚀 Multi-Agent HTTP API available at http://{host}:{port}{path}") + agent_names = ", ".join([agent.name for agent in self.agents]) + print(f"📊 Available agents for this endpoint ({len(self.agents)}): {agent_names}") + + # Start the server if it's not already running for this port + if not _agents_server_started.get(port, False): + # Mark the server as started first to prevent duplicate starts + _agents_server_started[port] = True + + # Start the server in a separate thread + def run_server(): + try: + print(f"✅ FastAPI server started at http://{host}:{port}") + print(f"📚 API documentation available at http://{host}:{port}/docs") + print(f"🔌 Registered HTTP endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") + uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") + except Exception as e: + logging.error(f"Error starting server: {str(e)}", exc_info=True) + print(f"❌ Error starting server: {str(e)}") + + # Run server in a background thread + server_thread = threading.Thread(target=run_server, daemon=True) + server_thread.start() + + # Wait for a moment to allow the server to start and register endpoints + time.sleep(0.5) else: - query = query_data.query + # If server is already running, wait a moment to make sure the endpoint is registered + time.sleep(0.1) + print(f"🔌 Registered HTTP endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") + + # Get the stack frame to check if this is the last launch() call in the script + import inspect + stack = inspect.stack() + # If this is called from a Python script (not interactive), try to detect if it's the last launch call + if len(stack) > 1 and stack[1].filename.endswith('.py'): + caller_frame = stack[1] + caller_line = caller_frame.lineno + + try: + # Read the file to check if there are more launch calls after this one + with open(caller_frame.filename, 'r') as f: + lines = f.readlines() + + # Check if there are more launch() calls after the current line + has_more_launches = False + for line_content in lines[caller_line:]: # Renamed variable + if '.launch(' in line_content and not line_content.strip().startswith('#'): + has_more_launches = True + break + + # If this is the last launch call, block the main thread + if not has_more_launches: + try: + print("\nAll agent groups registered for HTTP mode. Press Ctrl+C to stop the servers.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nServers stopped") + except Exception as e: + # If something goes wrong with detection, block anyway to be safe + logging.error(f"Error in HTTP launch detection: {e}") + try: + print("\nKeeping HTTP servers alive. Press Ctrl+C to stop.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nServers stopped") + return None + + elif protocol == "mcp": + if not self.agents: + logging.warning("No agents to launch for MCP mode. Add agents to the Agents instance first.") + return + try: - # Process the query sequentially through all agents - current_input = query - results = [] + import uvicorn + from mcp.server.fastmcp import FastMCP + from mcp.server.sse import SseServerTransport + from starlette.applications import Starlette + from starlette.requests import Request + from starlette.routing import Mount, Route + # from mcp.server import Server as MCPServer # Not directly needed if using FastMCP's server + import threading + import time + import inspect + import asyncio + # logging is already imported at the module level - for agent in self.agents: + except ImportError as e: + missing_module = str(e).split("No module named '")[-1].rstrip("'") + display_error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") + logging.error(f"Missing dependency: {missing_module}. Required for launch() method with MCP mode.") + print(f"\nTo add MCP capabilities, install the required dependencies:") + print(f"pip install {missing_module} mcp praison-mcp starlette uvicorn") + print("\nOr install all MCP dependencies with relevant packages.") + return None + + mcp_instance = FastMCP("praisonai_workflow_mcp_server") + + # Determine the MCP tool name for the workflow based on self.name + actual_mcp_tool_name = (f"execute_{self.name.lower().replace(' ', '_').replace('-', '_')}_workflow" if self.name + else "execute_workflow") + + @mcp_instance.tool(name=actual_mcp_tool_name) + async def execute_workflow_tool(query: str) -> str: # Renamed for clarity + """Executes the defined agent workflow with the given query.""" + logging.info(f"MCP tool '{actual_mcp_tool_name}' called with query: {query}") + current_input = query + final_response = "No agents in workflow or workflow did not produce a final response." + + for agent_instance in self.agents: try: - # Use async version if available, otherwise use sync version - if asyncio.iscoroutinefunction(agent.chat): - response = await agent.achat(current_input) - else: - # Run sync function in a thread to avoid blocking + logging.debug(f"Processing with agent: {agent_instance.name}") + if hasattr(agent_instance, 'achat') and asyncio.iscoroutinefunction(agent_instance.achat): + response = await agent_instance.achat(current_input, tools=agent_instance.tools) + elif hasattr(agent_instance, 'chat'): # Fallback to sync chat if achat not suitable loop = asyncio.get_event_loop() - response = await loop.run_in_executor(None, lambda: agent.chat(current_input)) - - # Store this agent's result - results.append({ - "agent": agent.name, - "response": response - }) + response = await loop.run_in_executor(None, lambda ci=current_input: agent_instance.chat(ci, tools=agent_instance.tools)) + else: + logging.warning(f"Agent {agent_instance.name} has no suitable chat or achat method.") + response = f"Error: Agent {agent_instance.name} has no callable chat method." - # Use this response as input to the next agent - current_input = response + current_input = response if response is not None else "Agent returned no response." + final_response = current_input # Keep track of the last valid response + logging.debug(f"Agent {agent_instance.name} responded. Current intermediate output: {current_input}") + except Exception as e: - logging.error(f"Error with agent {agent.name}: {str(e)}", exc_info=True) - results.append({ - "agent": agent.name, - "error": str(e) - }) - # Continue with original input if there's an error + logging.error(f"Error during agent {agent_instance.name} execution in MCP workflow: {str(e)}", exc_info=True) + current_input = f"Error from agent {agent_instance.name}: {str(e)}" + final_response = current_input # Update final response to show error + # Optionally break or continue based on desired error handling for the workflow + # For now, we continue, and the error is passed to the next agent or returned. - # Return all results and the final output - return { - "query": query, - "results": results, - "final_response": current_input - } - except Exception as e: - logging.error(f"Error processing query: {str(e)}", exc_info=True) - return JSONResponse( - status_code=500, - content={"error": f"Error processing query: {str(e)}"} - ) - - print(f"🚀 Multi-Agent API available at http://{host}:{port}{path}") - agent_names = ", ".join([agent.name for agent in self.agents]) - print(f"📊 Available agents ({len(self.agents)}): {agent_names}") - - # Start the server if it's not already running for this port - if not _agents_server_started.get(port, False): - # Mark the server as started first to prevent duplicate starts - _agents_server_started[port] = True + logging.info(f"MCP tool '{actual_mcp_tool_name}' completed. Final response: {final_response}") + return final_response + + base_mcp_path = path.rstrip('/') + sse_mcp_path = f"{base_mcp_path}/sse" + messages_mcp_path_prefix = f"{base_mcp_path}/messages" + if not messages_mcp_path_prefix.endswith('/'): + messages_mcp_path_prefix += '/' + + sse_transport_mcp = SseServerTransport(messages_mcp_path_prefix) + + async def handle_mcp_sse_connection(request: Request) -> None: + logging.debug(f"MCP SSE connection request from {request.client} for path {request.url.path}") + async with sse_transport_mcp.connect_sse( + request.scope, request.receive, request._send, + ) as (read_stream, write_stream): + await mcp_instance._mcp_server.run( + read_stream, write_stream, mcp_instance._mcp_server.create_initialization_options(), + ) - # Start the server in a separate thread - def run_server(): + starlette_mcp_app = Starlette( + debug=debug, + routes=[ + Route(sse_mcp_path, endpoint=handle_mcp_sse_connection), + Mount(messages_mcp_path_prefix, app=sse_transport_mcp.handle_post_message), + ], + ) + + print(f"🚀 PraisonAIAgents MCP Workflow server starting on http://{host}:{port}") + print(f"📡 MCP SSE endpoint available at {sse_mcp_path}") + print(f"📢 MCP messages post to {messages_mcp_path_prefix}") + # Instead of trying to extract tool names, hardcode the known tool name + mcp_tool_names = [actual_mcp_tool_name] # Use the determined dynamic tool name + print(f"🛠️ Available MCP tools: {', '.join(mcp_tool_names)}") + agent_names_in_workflow = ", ".join([a.name for a in self.agents]) + print(f"🔄 Agents in MCP workflow: {agent_names_in_workflow}") + + def run_praison_mcp_server(): try: - print(f"✅ FastAPI server started at http://{host}:{port}") - print(f"📚 API documentation available at http://{host}:{port}/docs") - print(f"🔌 Available endpoints: {', '.join(list(_agents_registered_endpoints[port].keys()))}") - uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") + uvicorn.run(starlette_mcp_app, host=host, port=port, log_level="debug" if debug else "info") except Exception as e: - logging.error(f"Error starting server: {str(e)}", exc_info=True) - print(f"❌ Error starting server: {str(e)}") - - # Run server in a background thread - server_thread = threading.Thread(target=run_server, daemon=True) - server_thread.start() - - # Wait for a moment to allow the server to start and register endpoints - time.sleep(0.5) - else: - # If server is already running, wait a moment to make sure the endpoint is registered - time.sleep(0.1) - print(f"🔌 Available endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}") - - # Get the stack frame to check if this is the last launch() call in the script - import inspect - stack = inspect.stack() - - # If this is called from a Python script (not interactive), try to detect if it's the last launch call - if len(stack) > 1 and stack[1].filename.endswith('.py'): - caller_frame = stack[1] - caller_line = caller_frame.lineno - - try: - # Read the file to check if there are more launch calls after this one - with open(caller_frame.filename, 'r') as f: - lines = f.readlines() - - # Check if there are more launch() calls after the current line - has_more_launches = False - for line in lines[caller_line:]: - if '.launch(' in line and not line.strip().startswith('#'): - has_more_launches = True - break - - # If this is the last launch call, block the main thread - if not has_more_launches: + logging.error(f"Error starting PraisonAIAgents MCP server: {str(e)}", exc_info=True) + print(f"❌ Error starting PraisonAIAgents MCP server: {str(e)}") + + mcp_server_thread = threading.Thread(target=run_praison_mcp_server, daemon=True) + mcp_server_thread.start() + time.sleep(0.5) + + import inspect + stack = inspect.stack() + if len(stack) > 1 and stack[1].filename.endswith('.py'): + caller_frame = stack[1] + caller_line = caller_frame.lineno + try: + with open(caller_frame.filename, 'r') as f: + lines = f.readlines() + has_more_launches = False + for line_content in lines[caller_line:]: + if '.launch(' in line_content and not line_content.strip().startswith('#'): + has_more_launches = True + break + if not has_more_launches: + try: + print("\nPraisonAIAgents MCP server running. Press Ctrl+C to stop.") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nPraisonAIAgents MCP Server stopped") + except Exception as e: + logging.error(f"Error in PraisonAIAgents MCP launch detection: {e}") try: - print("\nAll agents registered. Press Ctrl+C to stop the servers.") + print("\nKeeping PraisonAIAgents MCP server alive. Press Ctrl+C to stop.") while True: time.sleep(1) except KeyboardInterrupt: - print("\nServers stopped") - except Exception as e: - # If something goes wrong with detection, block anyway to be safe - logging.error(f"Error in launch detection: {e}") - try: - print("\nKeeping servers alive. Press Ctrl+C to stop.") - while True: - time.sleep(1) - except KeyboardInterrupt: - print("\nServers stopped") - - return None \ No newline at end of file + print("\nPraisonAIAgents MCP Server stopped") + return None + else: + display_error(f"Invalid protocol: {protocol}. Choose 'http' or 'mcp'.") + return None \ No newline at end of file diff --git a/src/praisonai-agents/pyproject.toml b/src/praisonai-agents/pyproject.toml index 182111113..e5bfdaaa9 100644 --- a/src/praisonai-agents/pyproject.toml +++ b/src/praisonai-agents/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "praisonaiagents" -version = "0.0.81" +version = "0.0.82" description = "Praison AI agents for completing complex tasks with Self Reflection Agents" authors = [ { name="Mervin Praison" } diff --git a/src/praisonai-agents/simple-mcp-multi-agents-server.py b/src/praisonai-agents/simple-mcp-multi-agents-server.py new file mode 100644 index 000000000..fea430d40 --- /dev/null +++ b/src/praisonai-agents/simple-mcp-multi-agents-server.py @@ -0,0 +1,19 @@ +from praisonaiagents import Agent, Agents +from duckduckgo_search import DDGS + +def internet_search_tool(query: str): + results = [] + ddgs = DDGS() + for result in ddgs.text(keywords=query, max_results=5): + results.append({ + "title": result.get("title", ""), + "url": result.get("href", ""), + "snippet": result.get("body", "") + }) + return results + +agent = Agent(name="SearchAgent", instructions="You Search the internet for information", tools=[internet_search_tool]) +agent2 = Agent(name="SummariseAgent", instructions="You Summarise the information") + +agents = Agents(name="MultiAgents", agents=[agent, agent2]) +agents.launch(port=8080, protocol="mcp") \ No newline at end of file diff --git a/src/praisonai-agents/simple-mcp-server.py b/src/praisonai-agents/simple-mcp-server.py new file mode 100644 index 000000000..74b20f5ec --- /dev/null +++ b/src/praisonai-agents/simple-mcp-server.py @@ -0,0 +1,4 @@ +from praisonaiagents import Agent + +agent = Agent(name="TweetAgent", instructions="Create a Tweet based on the topic provided") +agent.launch(port=8080, protocol="mcp") \ No newline at end of file diff --git a/src/praisonai-agents/uv.lock b/src/praisonai-agents/uv.lock index 56f851752..b642c1a23 100644 --- a/src/praisonai-agents/uv.lock +++ b/src/praisonai-agents/uv.lock @@ -1883,7 +1883,7 @@ wheels = [ [[package]] name = "praisonaiagents" -version = "0.0.81" +version = "0.0.82" source = { editable = "." } dependencies = [ { name = "mcp" }, diff --git a/uv.lock b/uv.lock index aeeb5306c..37e76e730 100644 --- a/uv.lock +++ b/uv.lock @@ -3614,7 +3614,7 @@ wheels = [ [[package]] name = "praisonai" -version = "2.1.5" +version = "2.1.6" source = { editable = "." } dependencies = [ { name = "instructor" }, @@ -3756,7 +3756,7 @@ requires-dist = [ { name = "plotly", marker = "extra == 'realtime'", specifier = ">=5.24.0" }, { name = "praisonai-tools", marker = "extra == 'autogen'", specifier = ">=0.0.15" }, { name = "praisonai-tools", marker = "extra == 'crewai'", specifier = ">=0.0.15" }, - { name = "praisonaiagents", specifier = ">=0.0.81" }, + { name = "praisonaiagents", specifier = ">=0.0.82" }, { name = "pyautogen", marker = "extra == 'autogen'", specifier = ">=0.2.19" }, { name = "pydantic", marker = "extra == 'chat'", specifier = "<=2.10.1" }, { name = "pydantic", marker = "extra == 'code'", specifier = "<=2.10.1" }, @@ -3813,7 +3813,7 @@ wheels = [ [[package]] name = "praisonaiagents" -version = "0.0.81" +version = "0.0.82" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "mcp" }, @@ -3821,9 +3821,9 @@ dependencies = [ { name = "pydantic" }, { name = "rich" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/aa/e8/5c254f6e097db086f6c72652a81edfbdd4426daa91d80f9eb020fc1d8e2d/praisonaiagents-0.0.81.tar.gz", hash = "sha256:daec5dc448bd9221f5b2ec1ea9af0a942eb0d0f19fb0e0e588842e3d0259bddd", size = 121033 } +sdist = { url = "https://files.pythonhosted.org/packages/94/de/1367fb037ca05bf0e6c15b9e6d17e101255f17e709de98c75ff14f6bbd31/praisonaiagents-0.0.82.tar.gz", hash = "sha256:c5c9344ee6e1868ea108209f3a200a12f5ffe9a242c00de14fcd3c8c92a85563", size = 124546 } wheels = [ - { url = "https://files.pythonhosted.org/packages/a5/4d/c49811749f579d11b84e683cc0c2685f81bcbda6ed7edec907863087d1cb/praisonaiagents-0.0.81-py3-none-any.whl", hash = "sha256:41252168220aa2326c77dbaff53918e09fcc62f059ff4ab9633fc95b6b8bc4f7", size = 140816 }, + { url = "https://files.pythonhosted.org/packages/a7/b2/b187b3aee121404a5cd8badb88d8f7b15295fde7c2cc6a6253e648bbe0be/praisonaiagents-0.0.82-py3-none-any.whl", hash = "sha256:d43458fd5da3c2adf245dd388de107dee3b93accd2f04f5a3f8144560aef7f28", size = 144231 }, ] [[package]]