-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathauthorize.py
More file actions
244 lines (214 loc) · 9.54 KB
/
authorize.py
File metadata and controls
244 lines (214 loc) · 9.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import logging
from dataclasses import dataclass
from typing import Any, Literal
from pydantic import AnyHttpUrl, AnyUrl, BaseModel, Field, RootModel, ValidationError
from starlette.datastructures import FormData, QueryParams
from starlette.requests import Request
from starlette.responses import RedirectResponse, Response
from mcp.server.auth.errors import (
stringify_pydantic_error,
)
from mcp.server.auth.json_response import PydanticJSONResponse
from mcp.server.auth.provider import (
AuthorizationErrorCode,
AuthorizationParams,
AuthorizeError,
OAuthAuthorizationServerProvider,
construct_redirect_uri,
)
from mcp.shared.auth import (
InvalidRedirectUriError,
InvalidScopeError,
)
logger = logging.getLogger(__name__)
class AuthorizationRequest(BaseModel):
# See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.1
client_id: str = Field(..., description="The client ID")
redirect_uri: AnyHttpUrl | None = Field(
None, description="URL to redirect to after authorization"
)
# see OAuthClientMetadata; we only support `code`
response_type: Literal["code"] = Field(
..., description="Must be 'code' for authorization code flow"
)
code_challenge: str = Field(..., description="PKCE code challenge")
code_challenge_method: Literal["S256"] = Field(
"S256", description="PKCE code challenge method, must be S256"
)
state: str | None = Field(None, description="Optional state parameter")
scope: str | None = Field(
None,
description="Optional scope; if specified, should be "
"a space-separated list of scope strings",
)
class AuthorizationErrorResponse(BaseModel):
error: AuthorizationErrorCode
error_description: str | None
error_uri: AnyUrl | None = None
# must be set if provided in the request
state: str | None = None
def best_effort_extract_string(
key: str, params: None | FormData | QueryParams
) -> str | None:
if params is None:
return None
value = params.get(key)
if isinstance(value, str):
return value
return None
class AnyHttpUrlModel(RootModel[AnyHttpUrl]):
root: AnyHttpUrl
@dataclass
class AuthorizationHandler:
provider: OAuthAuthorizationServerProvider[Any, Any, Any]
async def handle(self, request: Request) -> Response:
# implements authorization requests for grant_type=code;
# see https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.1
state = None
redirect_uri = None
client = None
params = None
async def error_response(
error: AuthorizationErrorCode,
error_description: str | None,
attempt_load_client: bool = True,
):
# Error responses take two different formats:
# 1. The request has a valid client ID & redirect_uri: we issue a redirect
# back to the redirect_uri with the error response fields as query
# parameters. This allows the client to be notified of the error.
# 2. Otherwise, we return an error response directly to the end user;
# we choose to do so in JSON, but this is left undefined in the
# specification.
# See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.2.1
#
# This logic is a bit awkward to handle, because the error might be thrown
# very early in request validation, before we've done the usual Pydantic
# validation, loaded the client, etc. To handle this, error_response()
# contains fallback logic which attempts to load the parameters directly
# from the request.
nonlocal client, redirect_uri, state
if client is None and attempt_load_client:
# make last-ditch attempt to load the client
client_id = best_effort_extract_string("client_id", params)
client = client_id and await self.provider.get_client(client_id)
if redirect_uri is None and client:
# make last-ditch effort to load the redirect uri
try:
if params is not None and "redirect_uri" not in params:
raw_redirect_uri = None
else:
raw_redirect_uri = AnyHttpUrlModel.model_validate(
best_effort_extract_string("redirect_uri", params)
).root
redirect_uri = client.validate_redirect_uri(raw_redirect_uri)
except (ValidationError, InvalidRedirectUriError):
# if the redirect URI is invalid, ignore it & just return the
# initial error
pass
# the error response MUST contain the state specified by the client, if any
if state is None:
# make last-ditch effort to load state
state = best_effort_extract_string("state", params)
error_resp = AuthorizationErrorResponse(
error=error,
error_description=error_description,
state=state,
)
if redirect_uri and client:
return RedirectResponse(
url=construct_redirect_uri(
str(redirect_uri), **error_resp.model_dump(exclude_none=True)
),
status_code=302,
headers={"Cache-Control": "no-store"},
)
else:
return PydanticJSONResponse(
status_code=400,
content=error_resp,
headers={"Cache-Control": "no-store"},
)
try:
# Parse request parameters
if request.method == "GET":
# Convert query_params to dict for pydantic validation
params = request.query_params
else:
# Parse form data for POST requests
params = await request.form()
# Save state if it exists, even before validation
state = best_effort_extract_string("state", params)
try:
auth_request = AuthorizationRequest.model_validate(params)
state = auth_request.state # Update with validated state
except ValidationError as validation_error:
error: AuthorizationErrorCode = "invalid_request"
for e in validation_error.errors():
if e["loc"] == ("response_type",) and e["type"] == "literal_error":
error = "unsupported_response_type"
break
return await error_response(
error, stringify_pydantic_error(validation_error)
)
# Get client information
client = await self.provider.get_client(
auth_request.client_id,
)
if not client:
# For client_id validation errors, return direct error (no redirect)
return await error_response(
error="invalid_request",
error_description=f"Client ID '{auth_request.client_id}' not found",
attempt_load_client=False,
)
# Validate redirect_uri against client's registered URIs
try:
redirect_uri = client.validate_redirect_uri(auth_request.redirect_uri)
except InvalidRedirectUriError as validation_error:
# For redirect_uri validation errors, return direct error (no redirect)
return await error_response(
error="invalid_request",
error_description=validation_error.message,
)
# Validate scope - for scope errors, we can redirect
try:
scopes = client.validate_scope(auth_request.scope)
except InvalidScopeError as validation_error:
# For scope errors, redirect with error parameters
return await error_response(
error="invalid_scope",
error_description=validation_error.message,
)
# Setup authorization parameters
auth_params = AuthorizationParams(
state=state,
scopes=scopes,
code_challenge=auth_request.code_challenge,
redirect_uri=redirect_uri,
redirect_uri_provided_explicitly=auth_request.redirect_uri is not None,
)
try:
# Let the provider pick the next URI to redirect to
return RedirectResponse(
url=await self.provider.authorize(
client,
auth_params,
),
status_code=302,
headers={"Cache-Control": "no-store"},
)
except AuthorizeError as e:
# Handle authorization errors as defined in RFC 6749 Section 4.1.2.1
return await error_response(
error=e.error,
error_description=e.error_description,
)
except Exception as validation_error:
# Catch-all for unexpected errors
logger.exception(
"Unexpected error in authorization_handler", exc_info=validation_error
)
return await error_response(
error="server_error", error_description="An unexpected error occurred"
)