forked from modelcontextprotocol/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase.py
More file actions
131 lines (99 loc) · 4.1 KB
/
base.py
File metadata and controls
131 lines (99 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import logging
from typing import Protocol, runtime_checkable
from uuid import UUID
import mcp.types as types
logger = logging.getLogger(__name__)
@runtime_checkable
class MessageQueue(Protocol):
"""Abstract interface for an SSE message queue.
This interface allows messages to be queued and processed by any SSE server instance
enabling multiple servers to handle requests for the same session.
"""
async def add_message(
self, session_id: UUID, message: types.JSONRPCMessage | Exception
) -> bool:
"""Add a message to the queue for the specified session.
Args:
session_id: The UUID of the session this message is for
message: The message to queue
Returns:
bool: True if message was accepted, False if session not found
"""
...
async def get_message(
self, session_id: UUID, timeout: float = 0.1
) -> types.JSONRPCMessage | Exception | None:
"""Get the next message for the specified session.
Args:
session_id: The UUID of the session to get messages for
timeout: Maximum time to wait for a message, in seconds
Returns:
The next message or None if no message is available
"""
...
async def register_session(self, session_id: UUID) -> None:
"""Register a new session with the queue.
Args:
session_id: The UUID of the new session to register
"""
...
async def unregister_session(self, session_id: UUID) -> None:
"""Unregister a session when it's closed.
Args:
session_id: The UUID of the session to unregister
"""
...
async def session_exists(self, session_id: UUID) -> bool:
"""Check if a session exists.
Args:
session_id: The UUID of the session to check
Returns:
bool: True if the session is active, False otherwise
"""
...
class InMemoryMessageQueue:
"""Default in-memory implementation of the MessageQueue interface.
This implementation keeps messages in memory for
each session until they're retrieved.
"""
def __init__(self) -> None:
self._message_queues: dict[UUID, list[types.JSONRPCMessage | Exception]] = {}
self._active_sessions: set[UUID] = set()
async def add_message(
self, session_id: UUID, message: types.JSONRPCMessage | Exception
) -> bool:
"""Add a message to the queue for the specified session."""
if session_id not in self._active_sessions:
logger.warning(f"Message received for unknown session {session_id}")
return False
if session_id not in self._message_queues:
self._message_queues[session_id] = []
self._message_queues[session_id].append(message)
logger.debug(f"Added message to queue for session {session_id}")
return True
async def get_message(
self, session_id: UUID, timeout: float = 0.1
) -> types.JSONRPCMessage | Exception | None:
"""Get the next message for the specified session."""
if session_id not in self._active_sessions:
return None
queue = self._message_queues.get(session_id, [])
if not queue:
return None
message = queue.pop(0)
if not queue: # Clean up empty queue
del self._message_queues[session_id]
return message
async def register_session(self, session_id: UUID) -> None:
"""Register a new session with the queue."""
self._active_sessions.add(session_id)
logger.debug(f"Registered session {session_id}")
async def unregister_session(self, session_id: UUID) -> None:
"""Unregister a session when it's closed."""
self._active_sessions.discard(session_id)
if session_id in self._message_queues:
del self._message_queues[session_id]
logger.debug(f"Unregistered session {session_id}")
async def session_exists(self, session_id: UUID) -> bool:
"""Check if a session exists."""
return session_id in self._active_sessions