|
4 | 4 | from typing import Dict |
5 | 5 | from typing import Optional |
6 | 6 | from typing import Union |
| 7 | +from urllib.parse import unquote |
7 | 8 |
|
8 | 9 | from cryptojwt.exception import BadSignature |
9 | 10 | from cryptojwt.exception import Invalid |
@@ -92,16 +93,17 @@ def is_usable( |
92 | 93 | raise NotImplementedError() |
93 | 94 |
|
94 | 95 |
|
95 | | -def basic_authn(authorization_token: str): |
96 | | - if not authorization_token.startswith("Basic "): |
| 96 | +def basic_authn(authorization_header: str, urldecode_client_id_secret=False): |
| 97 | + if not authorization_header.startswith("Basic "): |
97 | 98 | raise ClientAuthenticationError("Wrong type of authorization token") |
98 | 99 |
|
99 | | - _tok = as_bytes(authorization_token[6:]) |
100 | | - # Will raise ValueError type exception if not base64 encoded |
101 | | - _tok = base64.b64decode(_tok) |
102 | | - part = as_unicode(_tok).split(":", 1) |
| 100 | + _tok = base64.b64decode(authorization_header[6:].encode("utf-8")) |
| 101 | + part = _tok.decode("utf-8").split(":", 1) |
| 102 | + |
103 | 103 | if len(part) != 2: |
104 | 104 | raise ValueError("Illegal token") |
| 105 | + if urldecode_client_id_secret: |
| 106 | + part = [unquote(p) for p in part] |
105 | 107 |
|
106 | 108 | return dict(zip(["id", "secret"], part)) |
107 | 109 |
|
@@ -168,7 +170,9 @@ def _verify( |
168 | 170 | endpoint=None, # Optional[Endpoint] |
169 | 171 | **kwargs, |
170 | 172 | ): |
171 | | - client_info = basic_authn(authorization_token) |
| 173 | + kwargs = getattr(endpoint, "kwargs", {}) or {} |
| 174 | + enable_oauth2_1 = kwargs.get("enable_oauth2_1", False) |
| 175 | + client_info = basic_authn(authorization_token, urldecode_client_id_secret=enable_oauth2_1) |
172 | 176 | _context = self.upstream_get("context") |
173 | 177 | if _context.cdb[client_info["id"]]["client_secret"] == client_info["secret"]: |
174 | 178 | return {"client_id": client_info["id"]} |
|
0 commit comments