-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathhosted_server.py
More file actions
197 lines (157 loc) · 6.42 KB
/
hosted_server.py
File metadata and controls
197 lines (157 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
Hosted MCP Server implementation for PraisonAI Agents.
This module provides a base class for creating hosted MCP servers
that can handle requests and integrate with the MCP protocol.
Note: This is an example implementation. To use it, ensure you have installed:
pip install praisonaiagents[mcp] starlette>=0.27.0
"""
import asyncio
import logging
from typing import Dict, Any, Optional, List, Callable
import json
try:
from mcp.server.fastmcp import FastMCP
from mcp.server import Server
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Mount, Route
from mcp.server.sse import SseServerTransport
import uvicorn
except ImportError:
raise ImportError(
"MCP server dependencies not installed. "
"Please install with: pip install praisonaiagents[mcp] starlette>=0.27.0"
)
logger = logging.getLogger(__name__)
class HostedMCPServer:
"""
Base class for creating hosted MCP servers.
This class provides a foundation for building MCP servers that can:
- Handle incoming requests
- Define custom tools
- Support SSE transport
- Be extended with custom functionality like latency tracking
"""
def __init__(self, name: str = "hosted-mcp-server", host: str = "localhost", port: int = 8080):
"""
Initialize the hosted MCP server.
Args:
name: Server name for identification
host: Host to bind to (default: localhost)
port: Port to listen on (default: 8080)
"""
self.name = name
self.host = host
self.port = port
self.mcp = FastMCP(name)
self._tools: Dict[str, Callable] = {}
self._server: Optional[Server] = None
self._app: Optional[Starlette] = None
def handle_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle incoming MCP requests.
This method can be overridden in subclasses to add custom request handling,
such as latency tracking, authentication, or request modification.
Args:
request_data: The incoming request data
Returns:
Response data
"""
# Default implementation - can be overridden
method = request_data.get('method', '')
request_id = request_data.get('id', 'unknown')
logger.debug(f"Handling request {request_id}: {method}")
# Basic response structure
response = {
'id': request_id,
'jsonrpc': '2.0',
'result': {}
}
return response
def add_tool(self, func: Callable, name: Optional[str] = None, description: Optional[str] = None):
"""
Add a tool to the MCP server.
Args:
func: The function to expose as a tool
name: Optional name for the tool (defaults to function name)
description: Optional description for the tool
"""
tool_name = name or func.__name__
# Register with FastMCP
if asyncio.iscoroutinefunction(func):
# Already async
self.mcp.tool(name=tool_name)(func)
else:
# Wrap sync function in async
async def async_wrapper(*args, **kwargs):
return func(*args, **kwargs)
async_wrapper.__name__ = func.__name__
async_wrapper.__doc__ = description or func.__doc__
self.mcp.tool(name=tool_name)(async_wrapper)
self._tools[tool_name] = func
logger.info(f"Added tool: {tool_name}")
def create_app(self, debug: bool = False) -> Starlette:
"""
Create a Starlette application for serving the MCP server.
Args:
debug: Enable debug mode
Returns:
Starlette application instance
Raises:
RuntimeError: If the MCP server is not properly initialized
"""
if not self._server:
if not hasattr(self.mcp, '_mcp_server'):
raise RuntimeError("MCP server not properly initialized. Ensure FastMCP is correctly set up.")
self._server = self.mcp._mcp_server
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
logger.debug(f"SSE connection from {request.client}")
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await self._server.run(
read_stream,
write_stream,
self._server.create_initialization_options(),
)
self._app = Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
return self._app
def start(self, debug: bool = False, **uvicorn_kwargs):
"""
Start the MCP server.
Args:
debug: Enable debug mode
**uvicorn_kwargs: Additional arguments to pass to uvicorn
"""
app = self.create_app(debug=debug)
print(f"Starting {self.name} MCP server on {self.host}:{self.port}")
print(f"Available tools: {', '.join(self._tools.keys())}")
print(f"SSE endpoint: http://{self.host}:{self.port}/sse")
uvicorn.run(app, host=self.host, port=self.port, **uvicorn_kwargs)
async def start_async(self, debug: bool = False):
"""
Start the MCP server asynchronously.
Args:
debug: Enable debug mode
"""
app = self.create_app(debug=debug)
config = uvicorn.Config(app, host=self.host, port=self.port)
server = uvicorn.Server(config)
print(f"Starting {self.name} MCP server on {self.host}:{self.port}")
print(f"Available tools: {', '.join(self._tools.keys())}")
await server.serve()
def get_tools(self) -> List[str]:
"""Get list of available tool names."""
return list(self._tools.keys())
def get_endpoint(self) -> str:
"""Get the SSE endpoint URL."""
return f"http://{self.host}:{self.port}/sse"