|
8 | 8 | from typing import Any, Dict, Optional, Tuple |
9 | 9 |
|
10 | 10 | from authlib.integrations.starlette_client import OAuth |
| 11 | +from authlib.jose import JsonWebKey |
| 12 | +from authlib.jose import jwt as jose_jwt |
| 13 | +from fief_client import FiefAsync |
11 | 14 |
|
12 | 15 | from carbonserver.config import settings |
13 | 16 |
|
14 | 17 | DEFAULT_SIGNATURE_CACHE_TTL = 3600 # seconds |
15 | 18 | OAUTH_SCOPES = ["openid", "email", "profile"] |
16 | 19 |
|
| 20 | +fief = FiefAsync( |
| 21 | + settings.fief_url, settings.fief_client_id, settings.fief_client_secret |
| 22 | +) |
17 | 23 |
|
18 | 24 | oauth = OAuth() |
19 | 25 | oauth.register( |
@@ -44,3 +50,20 @@ async def get_authorize_url(self, request, login_url): |
44 | 50 |
|
45 | 51 | def get_client_credentials(self) -> Tuple[str, str]: |
46 | 52 | return (self.client.client_id, self.client.client_secret) |
| 53 | + |
| 54 | + async def _decode_token(self, token: str) -> Dict[str, Any]: |
| 55 | + try: |
| 56 | + access_token_info = await fief.validate_access_token(token) |
| 57 | + return access_token_info |
| 58 | + except Exception: |
| 59 | + ... |
| 60 | + |
| 61 | + jwks_data = await self.client.fetch_jwk_set() |
| 62 | + keyset = JsonWebKey.import_key_set(jwks_data) |
| 63 | + claims = jose_jwt.decode(token, keyset) |
| 64 | + claims.validate() |
| 65 | + return dict(claims) |
| 66 | + |
| 67 | + async def validate_access_token(self, token: str) -> bool: |
| 68 | + await self._decode_token(token) |
| 69 | + return True |
0 commit comments