forked from lastmile-ai/mcp-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrouter_llm.py
More file actions
374 lines (310 loc) · 12.6 KB
/
Copy pathrouter_llm.py
File metadata and controls
374 lines (310 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
from typing import Callable, List, Literal, Optional, TYPE_CHECKING
from opentelemetry import trace
from pydantic import BaseModel
from mcp_agent.agents.agent import Agent
from mcp_agent.tracing.semconv import GEN_AI_REQUEST_TOP_K
from mcp_agent.tracing.telemetry import get_tracer
from mcp_agent.workflows.llm.augmented_llm import AugmentedLLM
from mcp_agent.workflows.router.router_base import ResultT, Router, RouterResult
from mcp_agent.logging.logger import get_logger
if TYPE_CHECKING:
from mcp_agent.core.context import Context
logger = get_logger(__name__)
DEFAULT_ROUTING_INSTRUCTION = """
You are a highly accurate request router that directs incoming requests to the most appropriate category.
A category is a specialized destination, such as a Function, an MCP Server (a collection of tools/functions), or an Agent (a collection of servers).
Below are the available routing categories, each with their capabilities and descriptions:
{context}
Your task is to analyze the following request and determine the most appropriate categories from the options above. Consider:
- The specific capabilities and tools each destination offers
- How well the request matches the category's description
- Whether the request might benefit from multiple categories (up to {top_k})
Request: {request}
Respond in JSON format:
{{
"categories": [
{{
"category": <category name>,
"confidence": <high, medium or low>,
"reasoning": <brief explanation>
}}
]
}}
Only include categories that are truly relevant. You may return fewer than {top_k} if appropriate.
If none of the categories are relevant, return an empty list.
"""
class LLMRouterResult(RouterResult[ResultT]):
"""A class that represents the result of an LLMRouter.route request"""
confidence: Literal["high", "medium", "low"]
"""The confidence level of the routing decision."""
reasoning: str | None = None
"""
A brief explanation of the routing decision.
This is optional and may only be provided if the router is an LLM
"""
class StructuredResponseCategory(BaseModel):
"""A class that represents a single category returned by an LLM router"""
category: str
"""The name of the category (i.e. MCP server, Agent or function) to route the input to."""
confidence: Literal["high", "medium", "low"]
"""The confidence level of the routing decision."""
reasoning: str | None = None
"""A brief explanation of the routing decision."""
class StructuredResponse(BaseModel):
"""A class that represents the structured response of an LLM router"""
categories: List[StructuredResponseCategory]
"""A list of categories to route the input to."""
class LLMRouter(Router):
"""
A router that uses an LLM to route an input to a specific category.
"""
def __init__(
self,
llm: AugmentedLLM,
server_names: List[str] | None = None,
agents: List[Agent] | None = None,
functions: List[Callable] | None = None,
routing_instruction: str | None = None,
context: Optional["Context"] = None,
**kwargs,
):
super().__init__(
server_names=server_names,
agents=agents,
functions=functions,
routing_instruction=routing_instruction,
context=context,
**kwargs,
)
self.llm = llm
@classmethod
async def create(
cls,
llm: AugmentedLLM,
server_names: List[str] | None = None,
agents: List[Agent] | None = None,
functions: List[Callable] | None = None,
routing_instruction: str | None = None,
context: Optional["Context"] = None,
) -> "LLMRouter":
"""
Factory method to create and initialize a router.
Use this instead of constructor since we need async initialization.
"""
instance = cls(
llm=llm,
server_names=server_names,
agents=agents,
functions=functions,
routing_instruction=routing_instruction,
context=context,
)
await instance.initialize()
return instance
async def route(
self, request: str, top_k: int = 1
) -> List[LLMRouterResult[str | Agent | Callable]]:
tracer = get_tracer(self.context)
with tracer.start_as_current_span(f"{self.__class__.__name__}.route") as span:
self._annotate_span_for_route_request(span, request, top_k)
if not self.initialized:
await self.initialize()
res = await self._route_with_llm(request, top_k)
self._annotate_span_for_router_result(span, res)
return res
async def route_to_server(
self, request: str, top_k: int = 1
) -> List[LLMRouterResult[str]]:
tracer = get_tracer(self.context)
with tracer.start_as_current_span(
f"{self.__class__.__name__}.route_to_server"
) as span:
self._annotate_span_for_route_request(span, request, top_k)
if not self.initialized:
await self.initialize()
res = await self._route_with_llm(
request,
top_k,
include_servers=True,
include_agents=False,
include_functions=False,
)
self._annotate_span_for_router_result(span, res)
return res
async def route_to_agent(
self, request: str, top_k: int = 1
) -> List[LLMRouterResult[Agent]]:
tracer = get_tracer(self.context)
with tracer.start_as_current_span(
f"{self.__class__.__name__}.route_to_agent"
) as span:
self._annotate_span_for_route_request(span, request, top_k)
if not self.initialized:
await self.initialize()
res = await self._route_with_llm(
request,
top_k,
include_servers=False,
include_agents=True,
include_functions=False,
)
self._annotate_span_for_router_result(span, res)
return res
async def route_to_function(
self, request: str, top_k: int = 1
) -> List[LLMRouterResult[Callable]]:
tracer = get_tracer(self.context)
with tracer.start_as_current_span(
f"{self.__class__.__name__}.route_to_function"
) as span:
self._annotate_span_for_route_request(span, request, top_k)
if not self.initialized:
await self.initialize()
res = await self._route_with_llm(
request,
top_k,
include_servers=False,
include_agents=False,
include_functions=True,
)
self._annotate_span_for_router_result(span, res)
return res
async def _route_with_llm(
self,
request: str,
top_k: int = 1,
include_servers: bool = True,
include_agents: bool = True,
include_functions: bool = True,
) -> List[LLMRouterResult]:
tracer = get_tracer(self.context)
with tracer.start_as_current_span(
f"{self.__class__.__name__}._route_with_llm"
) as span:
self._annotate_span_for_route_request(span, request, top_k)
if not self.initialized:
await self.initialize()
routing_instruction = (
self.routing_instruction or DEFAULT_ROUTING_INSTRUCTION
)
# Generate the categories context
context = self._generate_context(
include_servers=include_servers,
include_agents=include_agents,
include_functions=include_functions,
)
# logger.debug(
# f"Requesting routing from LLM, \nrequest: {request} \ntop_k: {top_k} \nrouting_instruction: {routing_instruction} \ncontext={context}",
# data={"progress_action": "Routing", "agent_name": "LLM Router"},
# )
# Format the prompt with all the necessary information
prompt = routing_instruction.format(
context=context, request=request, top_k=top_k
)
# Get routes from LLM
response = await self.llm.generate_structured(
message=prompt,
response_model=StructuredResponse,
)
if self.context.tracing_enabled:
response_categories_data = {}
for i, r in enumerate(response.categories):
response_categories_data[f"category.{i}.category"] = r.category
response_categories_data[f"category.{i}.confidence"] = r.confidence
if r.reasoning:
response_categories_data[f"category.{i}.reasoning"] = (
r.reasoning
)
span.add_event(
"routing.response",
{
"prompt": prompt,
**response_categories_data,
},
)
# logger.debug(
# "Routing Response received",
# data={"progress_action": "Finished", "agent_name": "LLM Router"},
# )
# Construct the result
if not response or not response.categories:
return []
result: List[LLMRouterResult] = []
for r in response.categories:
router_category = self.categories.get(r.category)
if not router_category:
# Skip invalid categories
# TODO: saqadri - log or raise an error
continue
result.append(
LLMRouterResult(
result=router_category.category,
confidence=r.confidence,
reasoning=r.reasoning,
)
)
self._annotate_span_for_router_result(span, result)
return result[:top_k]
def _annotate_span_for_route_request(
self,
span: trace.Span,
request: str,
top_k: int,
):
"""Annotate the span with the request and top_k."""
if not self.context.tracing_enabled:
return
span.set_attribute("request", request)
span.set_attribute(GEN_AI_REQUEST_TOP_K, top_k)
span.set_attribute("llm", self.llm.name)
span.set_attribute(
"agents", [a.name for a in self.agents] if self.agents else []
)
span.set_attribute("servers", self.server_names or [])
span.set_attribute(
"functions", [f.__name__ for f in self.functions] if self.functions else []
)
def _annotate_span_for_router_result(
self,
span: trace.Span,
result: List[LLMRouterResult],
):
"""Annotate the span with the router result."""
if not self.context.tracing_enabled:
return
for i, res in enumerate(result):
span.set_attribute(f"result.{i}.confidence", res.confidence)
if res.reasoning:
span.set_attribute(f"result.{i}.reasoning", res.reasoning)
if res.p_score:
span.set_attribute(f"result.{i}.p_score", res.p_score)
result_key = f"result.{i}.result"
if isinstance(res.result, str):
span.set_attribute(result_key, res.result)
elif isinstance(res.result, Agent):
span.set_attribute(result_key, res.result.name)
elif callable(res.result):
span.set_attribute(result_key, res.result.__name__)
def _generate_context(
self,
include_servers: bool = True,
include_agents: bool = True,
include_functions: bool = True,
) -> str:
"""Generate a formatted context list of categories."""
context_list = []
idx = 1
# Format all categories
if include_servers:
for category in self.server_categories.values():
context_list.append(self.format_category(category, idx))
idx += 1
if include_agents:
for category in self.agent_categories.values():
context_list.append(self.format_category(category, idx))
idx += 1
if include_functions:
for category in self.function_categories.values():
context_list.append(self.format_category(category, idx))
idx += 1
return "\n\n".join(context_list)