-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathauth.py
More file actions
168 lines (141 loc) · 6.95 KB
/
auth.py
File metadata and controls
168 lines (141 loc) · 6.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from typing import Any, Literal
from pydantic import AnyHttpUrl, AnyUrl, BaseModel, Field, field_serializer, field_validator
class OAuthToken(BaseModel):
"""See https://datatracker.ietf.org/doc/html/rfc6749#section-5.1"""
access_token: str
token_type: Literal["Bearer"] = "Bearer"
expires_in: int | None = None
scope: str | None = None
refresh_token: str | None = None
@field_validator("token_type", mode="before")
@classmethod
def normalize_token_type(cls, v: str | None) -> str | None:
if isinstance(v, str):
# Bearer is title-cased in the spec, so we normalize it
# https://datatracker.ietf.org/doc/html/rfc6750#section-4
return v.title()
return v # pragma: no cover
class InvalidScopeError(Exception):
def __init__(self, message: str):
self.message = message
class InvalidRedirectUriError(Exception):
def __init__(self, message: str):
self.message = message
class OAuthClientMetadata(BaseModel):
"""RFC 7591 OAuth 2.0 Dynamic Client Registration metadata.
See https://datatracker.ietf.org/doc/html/rfc7591#section-2
for the full specification.
"""
redirect_uris: list[AnyUrl] | None = Field(..., min_length=1)
# supported auth methods for the token endpoint
token_endpoint_auth_method: (
Literal["none", "client_secret_post", "client_secret_basic", "private_key_jwt"] | None
) = None
# supported grant_types of this implementation
grant_types: list[
Literal["authorization_code", "refresh_token", "urn:ietf:params:oauth:grant-type:jwt-bearer"] | str
] = [
"authorization_code",
"refresh_token",
]
# The MCP spec requires the "code" response type, but OAuth
# servers may also return additional types they support
response_types: list[str] = ["code"]
scope: str | None = None
# these fields are currently unused, but we support & store them for potential
# future use
client_name: str | None = None
client_uri: AnyHttpUrl | None = None
logo_uri: AnyHttpUrl | None = None
contacts: list[str] | None = None
tos_uri: AnyHttpUrl | None = None
policy_uri: AnyHttpUrl | None = None
jwks_uri: AnyHttpUrl | None = None
jwks: Any | None = None
software_id: str | None = None
software_version: str | None = None
def validate_scope(self, requested_scope: str | None) -> list[str] | None:
if requested_scope is None:
return None
requested_scopes = requested_scope.split(" ")
allowed_scopes = [] if self.scope is None else self.scope.split(" ")
for scope in requested_scopes:
if scope not in allowed_scopes: # pragma: no branch
raise InvalidScopeError(f"Client was not registered with scope {scope}")
return requested_scopes # pragma: no cover
def validate_redirect_uri(self, redirect_uri: AnyUrl | None) -> AnyUrl:
if redirect_uri is not None:
# Validate redirect_uri against client's registered redirect URIs
if self.redirect_uris is None or redirect_uri not in self.redirect_uris:
raise InvalidRedirectUriError(f"Redirect URI '{redirect_uri}' not registered for client")
return redirect_uri
elif self.redirect_uris is not None and len(self.redirect_uris) == 1:
return self.redirect_uris[0]
else:
raise InvalidRedirectUriError("redirect_uri must be specified when client has multiple registered URIs")
class OAuthClientInformationFull(OAuthClientMetadata):
"""RFC 7591 OAuth 2.0 Dynamic Client Registration full response
(client information plus metadata).
"""
client_id: str | None = None
client_secret: str | None = None
client_id_issued_at: int | None = None
client_secret_expires_at: int | None = None
class OAuthMetadata(BaseModel):
"""RFC 8414 OAuth 2.0 Authorization Server Metadata.
See https://datatracker.ietf.org/doc/html/rfc8414#section-2
"""
issuer: AnyHttpUrl
authorization_endpoint: AnyHttpUrl
token_endpoint: AnyHttpUrl
registration_endpoint: AnyHttpUrl | None = None
scopes_supported: list[str] | None = None
response_types_supported: list[str] = ["code"]
response_modes_supported: list[str] | None = None
grant_types_supported: list[str] | None = None
token_endpoint_auth_methods_supported: list[str] | None = None
token_endpoint_auth_signing_alg_values_supported: list[str] | None = None
service_documentation: AnyHttpUrl | None = None
ui_locales_supported: list[str] | None = None
op_policy_uri: AnyHttpUrl | None = None
op_tos_uri: AnyHttpUrl | None = None
revocation_endpoint: AnyHttpUrl | None = None
revocation_endpoint_auth_methods_supported: list[str] | None = None
revocation_endpoint_auth_signing_alg_values_supported: list[str] | None = None
introspection_endpoint: AnyHttpUrl | None = None
introspection_endpoint_auth_methods_supported: list[str] | None = None
introspection_endpoint_auth_signing_alg_values_supported: list[str] | None = None
code_challenge_methods_supported: list[str] | None = None
client_id_metadata_document_supported: bool | None = None
@field_serializer("issuer")
def serialize_issuer(self, v: AnyHttpUrl) -> str:
"""Strip trailing slash from issuer URL for RFC 8414 §3.3 compliance."""
return str(v).rstrip("/")
class ProtectedResourceMetadata(BaseModel):
"""RFC 9728 OAuth 2.0 Protected Resource Metadata.
See https://datatracker.ietf.org/doc/html/rfc9728#section-2
"""
resource: AnyHttpUrl
authorization_servers: list[AnyHttpUrl] = Field(..., min_length=1)
jwks_uri: AnyHttpUrl | None = None
scopes_supported: list[str] | None = None
bearer_methods_supported: list[str] | None = Field(default=["header"]) # MCP only supports header method
resource_signing_alg_values_supported: list[str] | None = None
resource_name: str | None = None
resource_documentation: AnyHttpUrl | None = None
resource_policy_uri: AnyHttpUrl | None = None
resource_tos_uri: AnyHttpUrl | None = None
# tls_client_certificate_bound_access_tokens default is False, but ommited here for clarity
tls_client_certificate_bound_access_tokens: bool | None = None
authorization_details_types_supported: list[str] | None = None
dpop_signing_alg_values_supported: list[str] | None = None
# dpop_bound_access_tokens_required default is False, but ommited here for clarity
dpop_bound_access_tokens_required: bool | None = None
@field_serializer("resource")
def serialize_resource(self, v: AnyHttpUrl) -> str:
"""Strip trailing slash from resource URL for RFC 9728 §3 compliance."""
return str(v).rstrip("/")
@field_serializer("authorization_servers")
def serialize_authorization_servers(self, v: list[AnyHttpUrl]) -> list[str]:
"""Strip trailing slashes from authorization server URLs for RFC 9728 §3 compliance."""
return [str(s).rstrip("/") for s in v]