22from datetime import datetime , timedelta , timezone
33from typing import Any , Dict , Optional , Union
44
5- from fastapi import HTTPException
65from jose import ExpiredSignatureError , JWTError , jwt
76from passlib .context import CryptContext
87
@@ -30,77 +29,66 @@ def _jti() -> str:
3029 return str (uuid .uuid4 ())
3130
3231
33- # Token factories
34- def create_access_token (
32+ def _to_ts (dt : datetime ) -> int :
33+ return int (dt .timestamp ())
34+
35+
36+ def create_token (
3537 subject : Union [str , int , Dict [str , Any ]],
38+ token_type : str = "access" ,
3639 extra : Optional [Dict [str , Any ]] = None ,
37- expires_delta : timedelta | None = None ,
38- ) -> str :
39- """
40- Create access token (JWT) with proper iat and exp timestamps.
41- """
42-
40+ expires_delta : Optional [timedelta ] = None ,
41+ ):
4342 if isinstance (subject , dict ):
44- payload : Dict [ str , Any ] = subject .copy ()
43+ payload = subject .copy ()
4544 else :
46- payload : Dict [ str , Any ] = {"sub" : str (subject )} # type: ignore
45+ payload = {"sub" : str (subject )}
4746
48- payload .setdefault ("type" , "access" )
47+ payload .setdefault ("type" , token_type )
4948 payload .setdefault ("jti" , _jti ())
50-
51- now_ts = int (_now ().timestamp ())
52- payload ["iat" ] = now_ts
49+ payload .setdefault ("iat" , _to_ts (_now ()))
5350
5451 if extra :
5552 payload .update (extra )
5653
57- exp_ts = int (
58- (
59- _now ()
60- + (
61- expires_delta
62- or timedelta ( minutes = settings .PASSWORD_RESET_TOKEN_EXPIRE_MINUTES )
54+ if expires_delta :
55+ exp = _now () + expires_delta
56+ else :
57+ if token_type == "access" :
58+ exp = _now () + timedelta (
59+ minutes = settings .JWT_ACCESS_TOKEN_EXPIRE_MINUTES or 15
6360 )
64- ).timestamp ()
65- )
66- payload ["exp" ] = exp_ts
61+ else :
62+ exp = _now () + timedelta (days = settings .JWT_REFRESH_TOKEN_EXPIRES_DAYS or 30 )
6763
64+ payload ["exp" ] = _to_ts (exp )
6865 return jwt .encode (payload , settings .SECRET_KEY , algorithm = settings .JWT_ALGORITHM )
6966
7067
71- def create_refresh_token (
68+ # Token factories
69+ def create_access_token (
7270 subject : Union [str , int , Dict [str , Any ]],
7371 extra : Optional [Dict [str , Any ]] = None ,
74- expires_delta : timedelta | None = None ,
72+ expires_delta : Optional [ timedelta ] | None = None ,
7573) -> str :
7674 """
77- Create refresh token (JWT) with proper iat and exp timestamps.
75+ Create access token (JWT) with proper iat and exp timestamps.
7876 """
77+ return create_token (subject , "access" , extra , expires_delta )
7978
80- if isinstance (subject , dict ):
81- payload : Dict [str , Any ] = subject .copy ()
82- else :
83- payload : Dict [str , Any ] = {"sub" : str (subject )} # type: ignore
84-
85- payload .setdefault ("type" , "refresh" )
86- payload .setdefault ("jti" , _jti ())
87- payload ["iat" ] = int (_now ().timestamp ())
88-
89- if extra :
90- payload .update (extra )
91-
92- exp_ts = int (
93- (
94- _now ()
95- + (expires_delta or timedelta (days = settings .JWT_REFRESH_TOKEN_EXPIRES_DAYS ))
96- ).timestamp ()
97- )
98- payload ["exp" ] = exp_ts
9979
100- return jwt .encode (payload , settings .SECRET_KEY , algorithm = settings .JWT_ALGORITHM )
80+ def create_refresh_token (
81+ subject : Union [str , int , Dict [str , Any ]],
82+ extra : Optional [Dict [str , Any ]] = None ,
83+ expires_delta : Optional [timedelta ] | None = None ,
84+ ) -> str :
85+ """
86+ Create refresh token (JWT) with proper iat and exp timestamps.
87+ """
88+ return create_token (subject , "refresh" , extra , expires_delta )
10189
10290
103- def decode_token (token : str ) -> dict :
91+ def decode_token (token : str ) -> Dict [ str , Any ] :
10492 """
10593 Decode JWT token. By default, disables exp verification for internal inspection.
10694 Use jose.decode(token, ..., options={"verify_exp": True}) when verifying token lifetime.
@@ -113,7 +101,7 @@ def decode_token(token: str) -> dict:
113101 options = {"verify_exp" : True },
114102 )
115103 return payload
116- except ExpiredSignatureError :
117- raise HTTPException ( status_code = 400 , detail = "Token has expired" )
118- except JWTError :
119- raise HTTPException ( status_code = 400 , detail = "Invalid token" )
104+ except ExpiredSignatureError as e :
105+ raise ExpiredSignatureError ( "token_expired" ) from e
106+ except JWTError as e :
107+ raise JWTError ( "invalid_token" ) from e
0 commit comments