-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathclient_auth.py
More file actions
115 lines (90 loc) · 4.67 KB
/
client_auth.py
File metadata and controls
115 lines (90 loc) · 4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import base64
import binascii
import hmac
import time
from typing import Any
from urllib.parse import unquote
from starlette.requests import Request
from mcp.server.auth.provider import OAuthAuthorizationServerProvider
from mcp.shared.auth import OAuthClientInformationFull
class AuthenticationError(Exception):
def __init__(self, message: str):
self.message = message # pragma: no cover
class ClientAuthenticator:
"""
ClientAuthenticator is a callable which validates requests from a client
application, used to verify /token calls.
If, during registration, the client requested to be issued a secret, the
authenticator asserts that /token calls must be authenticated with
that same token.
NOTE: clients can opt for no authentication during registration, in which case this
logic is skipped.
"""
def __init__(self, provider: OAuthAuthorizationServerProvider[Any, Any, Any]):
"""
Initialize the dependency.
Args:
provider: Provider to look up client information
"""
self.provider = provider
async def authenticate_request(self, request: Request) -> OAuthClientInformationFull:
"""
Authenticate a client from an HTTP request.
Extracts client credentials from the appropriate location based on the
client's registered authentication method and validates them.
Args:
request: The HTTP request containing client credentials
Returns:
The authenticated client information
Raises:
AuthenticationError: If authentication fails
"""
form_data = await request.form()
client_id = form_data.get("client_id")
if not client_id:
raise AuthenticationError("Missing client_id")
client = await self.provider.get_client(str(client_id))
if not client:
raise AuthenticationError("Invalid client_id") # pragma: no cover
request_client_secret: str | None = None
auth_header = request.headers.get("Authorization", "")
if client.token_endpoint_auth_method == "client_secret_basic":
if not auth_header.startswith("Basic "):
raise AuthenticationError("Missing or invalid Basic authentication in Authorization header")
try:
encoded_credentials = auth_header[6:] # Remove "Basic " prefix
decoded = base64.b64decode(encoded_credentials).decode("utf-8")
if ":" not in decoded:
raise ValueError("Invalid Basic auth format")
basic_client_id, request_client_secret = decoded.split(":", 1)
# URL-decode both parts per RFC 6749 Section 2.3.1
basic_client_id = unquote(basic_client_id)
request_client_secret = unquote(request_client_secret)
if basic_client_id != client_id:
raise AuthenticationError("Client ID mismatch in Basic auth")
except (ValueError, UnicodeDecodeError, binascii.Error):
raise AuthenticationError("Invalid Basic authentication header")
elif client.token_endpoint_auth_method == "client_secret_post":
raw_form_data = form_data.get("client_secret")
# form_data.get() can return a UploadFile or None, so we need to check if it's a string
if isinstance(raw_form_data, str):
request_client_secret = str(raw_form_data)
elif client.token_endpoint_auth_method == "none":
request_client_secret = None
else:
raise AuthenticationError( # pragma: no cover
f"Unsupported auth method: {client.token_endpoint_auth_method}"
)
# If client from the store expects a secret and the auth method requires it,
# validate that the request provides that secret
if client.token_endpoint_auth_method != "none" and client.client_secret:
if not request_client_secret:
raise AuthenticationError("Client secret is required") # pragma: no cover
# hmac.compare_digest requires that both arguments are either bytes or a `str` containing
# only ASCII characters. Since we do not control `request_client_secret`, we encode both
# arguments to bytes.
if not hmac.compare_digest(client.client_secret.encode(), request_client_secret.encode()):
raise AuthenticationError("Invalid client_secret") # pragma: no cover
if client.client_secret_expires_at and client.client_secret_expires_at < int(time.time()):
raise AuthenticationError("Client secret has expired") # pragma: no cover
return client