-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathtemplates.py
More file actions
150 lines (128 loc) · 6.06 KB
/
templates.py
File metadata and controls
150 lines (128 loc) · 6.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""Resource template functionality."""
from __future__ import annotations
import inspect
import re
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote
from pydantic import BaseModel, Field, validate_call
from mcp.server.mcpserver.resources.types import FunctionResource, Resource
from mcp.server.mcpserver.utilities.context_injection import find_context_parameter, inject_context
from mcp.server.mcpserver.utilities.func_metadata import func_metadata
from mcp.types import Annotations, Icon
if TYPE_CHECKING:
from mcp.server.context import LifespanContextT, RequestT
from mcp.server.mcpserver.context import Context
# Regex used for each URI template parameter segment.
# Decoded values must still satisfy this constraint to prevent
# encoded path-separator injection (e.g. ``%2F`` → ``/``).
_SEGMENT_RE = re.compile(r"[^/]+")
class ResourceTemplate(BaseModel):
"""A template for dynamically creating resources."""
uri_template: str = Field(description="URI template with parameters (e.g. weather://{city}/current)")
name: str = Field(description="Name of the resource")
title: str | None = Field(description="Human-readable title of the resource", default=None)
description: str | None = Field(description="Description of what the resource does")
mime_type: str = Field(default="text/plain", description="MIME type of the resource content")
icons: list[Icon] | None = Field(default=None, description="Optional list of icons for the resource template")
annotations: Annotations | None = Field(default=None, description="Optional annotations for the resource template")
meta: dict[str, Any] | None = Field(default=None, description="Optional metadata for this resource template")
fn: Callable[..., Any] = Field(exclude=True)
parameters: dict[str, Any] = Field(description="JSON schema for function parameters")
context_kwarg: str | None = Field(None, description="Name of the kwarg that should receive context")
@classmethod
def from_function(
cls,
fn: Callable[..., Any],
uri_template: str,
name: str | None = None,
title: str | None = None,
description: str | None = None,
mime_type: str | None = None,
icons: list[Icon] | None = None,
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
context_kwarg: str | None = None,
) -> ResourceTemplate:
"""Create a template from a function."""
func_name = name or fn.__name__
if func_name == "<lambda>":
raise ValueError("You must provide a name for lambda functions") # pragma: no cover
# Find context parameter if it exists
if context_kwarg is None: # pragma: no branch
context_kwarg = find_context_parameter(fn)
# Get schema from func_metadata, excluding context parameter
func_arg_metadata = func_metadata(
fn,
skip_names=[context_kwarg] if context_kwarg is not None else [],
)
parameters = func_arg_metadata.arg_model.model_json_schema()
# ensure the arguments are properly cast
fn = validate_call(fn)
return cls(
uri_template=uri_template,
name=func_name,
title=title,
description=description or fn.__doc__ or "",
mime_type=mime_type or "text/plain",
icons=icons,
annotations=annotations,
meta=meta,
fn=fn,
parameters=parameters,
context_kwarg=context_kwarg,
)
def matches(self, uri: str) -> dict[str, Any] | None:
"""Check if URI matches template and extract parameters.
Extracted parameters are URL-decoded to handle percent-encoded characters.
After decoding, each value is re-validated to ensure it does not contain
a ``/`` character, which would indicate an encoded path separator bypass
(e.g. ``%2F``). Rejecting such values prevents path-traversal attacks
where an attacker could send ``..%2F..%2Fetc%2Fpasswd`` to escape the
intended path segment.
"""
# Convert template to regex pattern
pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
match = re.match(f"^{pattern}$", uri)
if match:
# URL-decode all extracted parameter values
decoded = {key: unquote(value) for key, value in match.groupdict().items()}
# Reject any decoded value that would not have matched the
# original ``[^/]+`` segment constraint. This blocks encoded
# slash injection (``%2F`` → ``/``) which could allow path
# traversal when the parameter is used in file-system operations.
for value in decoded.values():
if not _SEGMENT_RE.fullmatch(value):
return None
return decoded
return None
async def create_resource(
self,
uri: str,
params: dict[str, Any],
context: Context[LifespanContextT, RequestT],
) -> Resource:
"""Create a resource from the template with the given parameters.
Raises:
ValueError: If creating the resource fails.
"""
try:
# Add context to params if needed
params = inject_context(self.fn, params, context, self.context_kwarg)
# Call function and check if result is a coroutine
result = self.fn(**params)
if inspect.iscoroutine(result):
result = await result
return FunctionResource(
uri=uri, # type: ignore
name=self.name,
title=self.title,
description=self.description,
mime_type=self.mime_type,
icons=self.icons,
annotations=self.annotations,
meta=self.meta,
fn=lambda: result, # Capture result in closure
)
except Exception as e:
raise ValueError(f"Error creating resource from template: {e}")