Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
import uvicorn
from fastapi import FastAPI
from src.controllers import userTypeController, userController
from src.controllers import userTypeController, userController, authController

app = FastAPI(debug=True)

app.include_router(userTypeController.router)
app.include_router(userController.router)
app.include_router(authController.router)

if __name__ == "__main__":
uvicorn.run(
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ requires-python = ">=3.13"
dependencies = [
"fastapi>=0.115.14",
"httpx>=0.28.1",
"passlib[bcrypt]>=1.7.4",
"psycopg2-binary>=2.9.10",
"pydantic[email]>=2.11.7",
"pyjwt==2.10.0",
"pytest>=8.4.1",
"python-multipart>=0.0.20",
"uvicorn>=0.35.0",
]
25 changes: 25 additions & 0 deletions src/controllers/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import jwt
from typing import Annotated, Any
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

from src.services.userService import UserService
from src.infra.security.hashing import ALGORITHM, JWT_ACCESS_SECRET_KEY, decode_token

oauth2_bearer = OAuth2PasswordBearer(tokenUrl="auth/login")
token_dependency = Annotated[str, Depends(oauth2_bearer)]

def get_current_user(token: token_dependency) -> dict[str, Any]:
payload: dict[str, Any] = decode_token(token, JWT_ACCESS_SECRET_KEY, [ALGORITHM])
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Usuário não autorizado")

user = UserService().view(user_id)
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")

return user

user_dependency = Annotated[dict[str, Any], Depends(get_current_user)]
form_auth_dependency = Annotated[OAuth2PasswordRequestForm, Depends()]
46 changes: 46 additions & 0 deletions src/controllers/authController.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import timedelta
from fastapi import APIRouter, HTTPException, Header

from src.services.userService import UserService
from ._helpers import user_dependency, form_auth_dependency
from src.infra.security.hashing import ALGORITHM, create_token, ACCESS_TOKEN_EXPIRE_MINUTES, REFRESH_TOKEN_EXPIRE_DAYS, JWT_ACCESS_SECRET_KEY, JWT_REFRESH_SECRET_KEY, bcrypt_context, decode_token


router = APIRouter(prefix="/auth", tags=["auth"])

@router.post("/login")
def login(form_data: form_auth_dependency):
email = form_data.username.lower()
try:
user = UserService().view_by_email(email)
except Exception:
raise HTTPException(status_code=404, detail={"message": "Email or password incorrect.", "error": True})

if not bcrypt_context.verify(form_data.password, user["senha"]):
raise HTTPException(status_code=401, detail={"message": "Email or password incorrect.", "error": True})

access_token = create_token(user["id"], JWT_ACCESS_SECRET_KEY, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
refresh_token = create_token(user["id"], JWT_REFRESH_SECRET_KEY, timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS))

return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer"
}

@router.post("/refresh")
def refresh(refresh_token: str = Header(..., alias="X-Refresh-Token")):
payload = decode_token(refresh_token, JWT_REFRESH_SECRET_KEY, [ALGORITHM])
user_id = int(payload["sub"])

try:
UserService().view(user_id)
except Exception:
raise HTTPException(status_code=401, detail={"message": "User not found", "error": True})

new_access_token = create_token(payload["sub"], JWT_ACCESS_SECRET_KEY, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": new_access_token, "error": False}

@router.get("/me")
def me(user: user_dependency):
return user
Empty file added src/infra/security/__init__.py
Empty file.
39 changes: 39 additions & 0 deletions src/infra/security/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
import jwt
from typing import Any
from fastapi import HTTPException
from passlib.context import CryptContext
from datetime import datetime, timedelta, timezone


JWT_ACCESS_SECRET_KEY = os.getenv("JWT_ACCESS_SECRET_KEY", "5978c7af950f2a6097b4f07701b388a57abea6c4bbc36f09a7677306653e7796")
JWT_REFRESH_SECRET_KEY = os.getenv("JWT_REFRESH_SECRET_KEY", "8f1b0cfb3694d16b60beccac61b2a40bec9e5fedbf59d1fed1c0cbb050f37236")
ALGORITHM = "HS512"

ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

bcrypt_context = CryptContext(
schemes=["sha512_crypt"],
deprecated="auto",
sha512_crypt__default_rounds=5000
)

def create_token(user_id: int, secret_key: str, expires_delta: timedelta) -> str:
to_encode: dict[str, Any] = {"sub": str(user_id)}
expire = datetime.now(timezone.utc) + expires_delta

to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
return encoded_jwt

def decode_token(jwt_token: str, secret_key: str | bytes, algorithms: list[str]) -> dict[str, Any]:
try:
payload = jwt.decode(jwt_token, secret_key, algorithms)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expirado")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Token inválido")
except Exception:
raise HTTPException(status_code=401, detail="Erro ao autenticar usuário")
27 changes: 25 additions & 2 deletions src/services/userService.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from src.infra.database.database import PgDatabase
from src.services import paginate, fields_to_update
from src.infra.security.hashing import bcrypt_context
from src.infra.database import retrieve_table_columns
from src.infra.database.serializers import line_to_dict
from src.schemas.userSchema import UserAddSchema, UserEditSchema
Expand All @@ -13,7 +14,10 @@ class UserService:
def __init__(self) -> None:
self.table = "usuario"
self.columns = retrieve_table_columns(self.table)
self.all_columns = reduce(lambda acc, elem: acc + ", " + str(elem), self.columns)
try:
self.all_columns = reduce(lambda acc, elem: acc + ", " + str(elem), self.columns)
except Exception:
self.all_columns = ""

def all(self, query_params: QueryParams) -> dict[str, Any]:
show_fk_id = bool(int(query_params.get("show_fk_id", 1)))
Expand Down Expand Up @@ -57,10 +61,29 @@ def view(self, user_id: int) -> dict[str, Any]:
user = line_to_dict(row, self.columns)
return user


def view_by_email(self, email: str) -> dict[str, Any]:
user = None

try:
with PgDatabase() as db:
db.cursor.execute(f"SELECT {self.all_columns} FROM {self.table} WHERE email = %s", (email,))
row = db.cursor.fetchone()

if row is None:
raise HTTPException(status_code=404, detail={"error": True, "message": "Usuário não encontrado"})
except Exception:
raise HTTPException(status_code=500, detail={"error": True, "message": "Database error"})

user = line_to_dict(row, self.columns)
return user

def add(self, user: UserAddSchema) -> dict[str, Any]:
senha = bcrypt_context.hash(user.senha)

try:
with PgDatabase() as db:
db.cursor.execute(f"INSERT INTO {self.table} (email, senha, tipo_usuario_id) VALUES (%s, %s, %s) RETURNING id", (user.email, user.senha, user.tipo_usuario_id))
db.cursor.execute(f"INSERT INTO {self.table} (email, senha, tipo_usuario_id) VALUES (%s, %s, %s) RETURNING id", (user.email.lower(), senha, user.tipo_usuario_id))
raw_id = db.cursor.fetchone()

if raw_id is None:
Expand Down
5 changes: 4 additions & 1 deletion src/services/userTypeService.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ class UserTypeService:
def __init__(self) -> None:
self.table: str = "tipo_usuario"
self.columns: list[str] = retrieve_table_columns(self.table)
self.all_columns = reduce(lambda acc, elem: acc + ", " + str(elem), self.columns)
try:
self.all_columns = reduce(lambda acc, elem: acc + ", " + str(elem), self.columns)
except Exception:
self.all_columns = ""

def all(self, query_params: QueryParams) -> dict[str, Any]:
query = f"SELECT {self.all_columns} FROM {self.table}"
Expand Down