|
6 | 6 | import secrets |
7 | 7 | from fastapi import FastAPI, Depends, HTTPException, status, Security |
8 | 8 | from fastapi.responses import FileResponse |
| 9 | +from fastapi.security import OAuth2PasswordRequestForm |
9 | 10 | from pydantic import BaseModel |
10 | 11 | from sqlalchemy.future import select |
11 | 12 | from sqlalchemy import update |
|
37 | 38 | create_access_token, |
38 | 39 | get_api_key, |
39 | 40 | get_api_key_hash, |
| 41 | + check_creds, |
40 | 42 | ) |
41 | 43 |
|
42 | 44 | # Set the logging |
@@ -95,22 +97,27 @@ def read_root() -> Any: |
95 | 97 | @app.post("/token-from-api-key") |
96 | 98 | async def access_token_from_api_key( |
97 | 99 | sql_session: Annotated[Session, Depends(get_session)], |
| 100 | + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], |
98 | 101 | api_key_enduser_tuple: str = Security(get_api_key), |
99 | 102 | ) -> Token: |
100 | 103 | """Generate a token from an API key.""" |
101 | 104 | local_session.set(sql_session) |
| 105 | + |
102 | 106 | user = authenticate_api_key(api_key_enduser_tuple, local_session.get()) |
103 | | - if not user: |
| 107 | + valid = check_creds(form_data.username, form_data.password) |
| 108 | + |
| 109 | + if not user and not valid: |
104 | 110 | raise HTTPException( |
105 | 111 | status_code=status.HTTP_401_UNAUTHORIZED, |
106 | | - detail="API key not valid", |
| 112 | + detail="Invalid API key and credentials", |
107 | 113 | headers={"WWW-Authenticate": "X-API-KEY"}, |
108 | 114 | ) |
| 115 | + email = user.email if user else form_data.username |
109 | 116 | access_token_expires = timedelta( |
110 | 117 | minutes=int(env_vars["ACCESS_TOKEN_EXPIRE_MINUTES"]) |
111 | 118 | ) |
112 | 119 | access_token = create_access_token( |
113 | | - data={"sub": user.email}, expires_delta=access_token_expires |
| 120 | + data={"sub": email}, expires_delta=access_token_expires |
114 | 121 | ) |
115 | 122 | return Token(access_token=access_token, token_type="bearer") |
116 | 123 |
|
|
0 commit comments