-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathtest_error_handling.py
More file actions
299 lines (255 loc) · 10.5 KB
/
test_error_handling.py
File metadata and controls
299 lines (255 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
"""
Tests for OAuth error handling in the auth handlers.
"""
import unittest.mock
from typing import Any
from urllib.parse import parse_qs, urlparse
import httpx
import pytest
from httpx import ASGITransport
from pydantic import AnyHttpUrl
from starlette.applications import Starlette
from mcp.server.auth.provider import AuthorizeError, RegistrationError, TokenError
from mcp.server.auth.routes import create_auth_routes
from tests.server.fastmcp.auth.test_auth_integration import MockOAuthProvider
@pytest.fixture
def oauth_provider():
"""Return a MockOAuthProvider instance that can be configured to raise errors."""
return MockOAuthProvider()
@pytest.fixture
def app(oauth_provider: MockOAuthProvider):
from mcp.server.auth.settings import ClientRegistrationOptions, RevocationOptions
# Enable client registration
client_registration_options = ClientRegistrationOptions(enabled=True)
revocation_options = RevocationOptions(enabled=True)
# Create auth routes
auth_routes = create_auth_routes(
oauth_provider,
issuer_url=AnyHttpUrl("http://localhost"),
client_registration_options=client_registration_options,
revocation_options=revocation_options,
)
# Create Starlette app with routes directly
return Starlette(routes=auth_routes)
@pytest.fixture
def client(app: Starlette):
transport = ASGITransport(app=app)
# Use base_url without a path since routes are directly on the app
return httpx.AsyncClient(transport=transport, base_url="http://localhost")
@pytest.fixture
def pkce_challenge():
"""Create a PKCE challenge with code_verifier and code_challenge."""
import base64
import hashlib
import secrets
# Generate a code verifier
code_verifier = secrets.token_urlsafe(64)[:128]
# Create code challenge using S256 method
code_verifier_bytes = code_verifier.encode("ascii")
sha256 = hashlib.sha256(code_verifier_bytes).digest()
code_challenge = base64.urlsafe_b64encode(sha256).decode().rstrip("=")
return {"code_verifier": code_verifier, "code_challenge": code_challenge}
@pytest.fixture
async def registered_client(client: httpx.AsyncClient) -> dict[str, Any]:
"""Create and register a test client."""
# Default client metadata
client_metadata = {
"redirect_uris": ["https://client.example.com/callback"],
"token_endpoint_auth_method": "client_secret_post",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"client_name": "Test Client",
}
response = await client.post("/register", json=client_metadata)
assert response.status_code == 201, f"Failed to register client: {response.content}"
client_info = response.json()
return client_info
class TestRegistrationErrorHandling:
@pytest.mark.anyio
async def test_registration_error_handling(self, client: httpx.AsyncClient, oauth_provider: MockOAuthProvider):
# Mock the register_client method to raise a registration error
with unittest.mock.patch.object(
oauth_provider,
"register_client",
side_effect=RegistrationError(
error="invalid_redirect_uri",
error_description="The redirect URI is invalid",
),
):
# Prepare a client registration request
client_data = {
"redirect_uris": ["https://client.example.com/callback"],
"token_endpoint_auth_method": "client_secret_post",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"client_name": "Test Client",
}
# Send the registration request
response = await client.post(
"/register",
json=client_data,
)
# Verify the response
assert response.status_code == 400, response.content
data = response.json()
assert data["error"] == "invalid_redirect_uri"
assert data["error_description"] == "The redirect URI is invalid"
class TestAuthorizeErrorHandling:
@pytest.mark.anyio
async def test_authorize_error_handling(
self,
client: httpx.AsyncClient,
oauth_provider: MockOAuthProvider,
registered_client: dict[str, Any],
pkce_challenge: dict[str, str],
):
# Mock the authorize method to raise an authorize error
with unittest.mock.patch.object(
oauth_provider,
"authorize",
side_effect=AuthorizeError(error="access_denied", error_description="The user denied the request"),
):
# Register the client
client_id = registered_client["client_id"]
redirect_uri = registered_client["redirect_uris"][0]
# Prepare an authorization request
params = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"code_challenge": pkce_challenge["code_challenge"],
"code_challenge_method": "S256",
"state": "test_state",
}
# Send the authorization request
response = await client.get("/authorize", params=params)
# Verify the response is a redirect with error parameters
assert response.status_code == 302
redirect_url = response.headers["location"]
parsed_url = urlparse(redirect_url)
query_params = parse_qs(parsed_url.query)
assert query_params["error"][0] == "access_denied"
assert "error_description" in query_params
assert query_params["state"][0] == "test_state"
class TestTokenErrorHandling:
@pytest.mark.anyio
async def test_token_error_handling_auth_code(
self,
client: httpx.AsyncClient,
oauth_provider: MockOAuthProvider,
registered_client: dict[str, Any],
pkce_challenge: dict[str, str],
):
# Register the client and get an auth code
client_id = registered_client["client_id"]
client_secret = registered_client["client_secret"]
redirect_uri = registered_client["redirect_uris"][0]
# First get an authorization code
auth_response = await client.get(
"/authorize",
params={
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"code_challenge": pkce_challenge["code_challenge"],
"code_challenge_method": "S256",
"state": "test_state",
},
)
redirect_url = auth_response.headers["location"]
parsed_url = urlparse(redirect_url)
query_params = parse_qs(parsed_url.query)
code = query_params["code"][0]
# Mock the exchange_authorization_code method to raise a token error
with unittest.mock.patch.object(
oauth_provider,
"exchange_authorization_code",
side_effect=TokenError(
error="invalid_grant",
error_description="The authorization code is invalid",
),
):
# Try to exchange the code for tokens
token_response = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"client_secret": client_secret,
"code_verifier": pkce_challenge["code_verifier"],
},
)
# Verify the response
assert token_response.status_code == 400
data = token_response.json()
assert data["error"] == "invalid_grant"
assert data["error_description"] == "The authorization code is invalid"
@pytest.mark.anyio
async def test_token_error_handling_refresh_token(
self,
client: httpx.AsyncClient,
oauth_provider: MockOAuthProvider,
registered_client: dict[str, Any],
pkce_challenge: dict[str, str],
):
# Register the client and get tokens
client_id = registered_client["client_id"]
client_secret = registered_client["client_secret"]
redirect_uri = registered_client["redirect_uris"][0]
# First get an authorization code
auth_response = await client.get(
"/authorize",
params={
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"code_challenge": pkce_challenge["code_challenge"],
"code_challenge_method": "S256",
"state": "test_state",
},
)
assert auth_response.status_code == 302, auth_response.content
redirect_url = auth_response.headers["location"]
parsed_url = urlparse(redirect_url)
query_params = parse_qs(parsed_url.query)
code = query_params["code"][0]
# Exchange the code for tokens
token_response = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"client_secret": client_secret,
"code_verifier": pkce_challenge["code_verifier"],
},
)
tokens = token_response.json()
refresh_token = tokens["refresh_token"]
# Mock the exchange_refresh_token method to raise a token error
with unittest.mock.patch.object(
oauth_provider,
"exchange_refresh_token",
side_effect=TokenError(
error="invalid_scope",
error_description="The requested scope is invalid",
),
):
# Try to use the refresh token
refresh_response = await client.post(
"/token",
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
"client_secret": client_secret,
},
)
# Verify the response
assert refresh_response.status_code == 400
data = refresh_response.json()
assert data["error"] == "invalid_scope"
assert data["error_description"] == "The requested scope is invalid"