Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/controllers/_helpers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
from typing import Annotated, Any
from fastapi import Depends, HTTPException
from fastapi import Depends, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

from src.services.userService import UserService
from src.infra.database.database import PgDatabase
from src.infra.security.hashing import ALGORITHM, JWT_ACCESS_SECRET_KEY, decode_token

oauth2_bearer = OAuth2PasswordBearer(tokenUrl="auth/login")
token_dependency = Annotated[str, Depends(oauth2_bearer)]
oauth2_bearer = OAuth2PasswordBearer(tokenUrl="auth/login", auto_error=False)
token_dependency = Annotated[str | None, Depends(oauth2_bearer)]


def get_current_user(token: token_dependency) -> dict[str, Any]:
def get_token_from_cookie(request: Request) -> str:
auth_jwt = request.cookies.get("access_token")

if auth_jwt is None:
raise HTTPException(
status_code=401,
detail="Not authenticated"
)

return auth_jwt

def get_current_user(token: token_dependency, request: Request) -> dict[str, Any]:
if token is None:
token = get_token_from_cookie(request)

payload: dict[str, Any] = decode_token(token, JWT_ACCESS_SECRET_KEY, [ALGORITHM])
user_id = payload.get("sub")
if not user_id:
Expand Down
89 changes: 88 additions & 1 deletion src/controllers/authController.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from fastapi import APIRouter, HTTPException, Header, Request
from fastapi import APIRouter, HTTPException, Header, Request, Response

from src.infra.database.database import PgDatabase
from src.services.userService import UserService
Expand Down Expand Up @@ -52,6 +52,93 @@ def login(form_data: form_auth_dependency):
}


@router.post("/login/cookie")
def login_cookie(form_data: form_auth_dependency, response: Response):
email = form_data.username.lower()
user = UserService(PgDatabase()).view_by_email(email)

if not user:
raise HTTPException(
status_code=404,
detail={"message": "Email or password incorrect.", "error": True},
)

if not bcrypt_context.verify(form_data.password, user["senha"]):
raise HTTPException(
status_code=401,
detail={"message": "Email or password incorrect.", "error": True},
)

access_token = create_token(
user["id"],
JWT_ACCESS_SECRET_KEY,
timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
refresh_token = create_token(
user["id"], JWT_REFRESH_SECRET_KEY, timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
)

response.set_cookie(
"access_token",
access_token
)

response.set_cookie(
"refresh_token",
refresh_token
)

response.set_cookie(
"token_type",
"Bearer"
)

return {"message": "Login successfull"}


@router.post("/refresh/cookie")
def refresh_cookie(
response: Response,
refresh_token: str = Header(..., alias="refresh_token")
):
payload = decode_token(refresh_token, JWT_REFRESH_SECRET_KEY, [ALGORITHM])
user_id = int(payload["sub"])

user = UserService(PgDatabase()).view(user_id)
if not user:
raise HTTPException(
status_code=401, detail={"message": "User not found", "error": True}
)

new_access_token = create_token(
payload["sub"],
JWT_ACCESS_SECRET_KEY,
timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
new_refresh_token = create_token(
payload["sub"],
JWT_REFRESH_SECRET_KEY,
timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
)

response.set_cookie(
"access_token",
new_access_token
)

response.set_cookie(
"refresh_token",
new_refresh_token
)

response.set_cookie(
"token_type",
"Bearer"
)

return {"message": "Token refreshed successfully"}


@router.post("/refresh")
def refresh(refresh_token: str = Header(..., alias="X-Refresh-Token")):
payload = decode_token(refresh_token, JWT_REFRESH_SECRET_KEY, [ALGORITHM])
Expand Down