|
| 1 | +from datetime import datetime, timedelta |
| 2 | +from jose import jwt, JWTError |
| 3 | +from fastapi import HTTPException, Security, Depends |
| 4 | +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| 5 | +from db import get_user_by_email |
| 6 | + |
| 7 | +SECRET_KEY = "YOUR_SECRET_KEY" # Use a secure secret key from a secure randomness source |
| 8 | +ALGORITHM = "HS256" |
| 9 | +ACCESS_TOKEN_EXPIRE_MINUTES = 30 |
| 10 | + |
| 11 | +security = HTTPBearer() |
| 12 | + |
| 13 | + |
| 14 | +def create_access_token(data: dict, expires_delta: timedelta = None): |
| 15 | + to_encode = data.copy() |
| 16 | + if expires_delta: |
| 17 | + expire = datetime.utcnow() + expires_delta |
| 18 | + else: |
| 19 | + expire = datetime.utcnow() + timedelta(minutes=15) |
| 20 | + to_encode.update({"exp": expire}) |
| 21 | + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) |
| 22 | + return encoded_jwt |
| 23 | + |
| 24 | + |
| 25 | +def verify_token(token: str): |
| 26 | + credentials_exception = HTTPException(status_code=403, detail="Could not validate credentials") |
| 27 | + try: |
| 28 | + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| 29 | + email: str = payload.get("email") |
| 30 | + if email is None: |
| 31 | + raise credentials_exception |
| 32 | + token_data = payload |
| 33 | + except JWTError: |
| 34 | + raise credentials_exception |
| 35 | + user = get_user_by_email(email) |
| 36 | + if user is None: |
| 37 | + raise HTTPException(status_code=404, detail="User not found") |
| 38 | + return user |
| 39 | + |
| 40 | + |
| 41 | +def get_current_user(token: HTTPAuthorizationCredentials = Security(security)): |
| 42 | + return verify_token(token.credentials) |
0 commit comments