diff --git a/obs/api/auth.py b/obs/api/auth.py index 01d4c017..ddea4063 100644 --- a/obs/api/auth.py +++ b/obs/api/auth.py @@ -135,13 +135,19 @@ async def get_current_user( if api_key: key_hash = hash_api_key(api_key) - row = await db.fetchone("SELECT name FROM api_keys WHERE key_hash=?", (key_hash,)) + row = await db.fetchone("SELECT owner FROM api_keys WHERE key_hash=?", (key_hash,)) if not row: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid API key") + owner = (row["owner"] or "").strip() + if not owner: + raise HTTPException( + status.HTTP_401_UNAUTHORIZED, + "Legacy API key without owner is not supported; recreate the key", + ) # Update last_used_at now = datetime.now(UTC).isoformat() await db.execute_and_commit("UPDATE api_keys SET last_used_at=? WHERE key_hash=?", (now, key_hash)) - return row["name"] + return owner raise HTTPException( status.HTTP_401_UNAUTHORIZED, diff --git a/obs/api/v1/adapters.py b/obs/api/v1/adapters.py index 4797f135..630bbf8d 100644 --- a/obs/api/v1/adapters.py +++ b/obs/api/v1/adapters.py @@ -31,7 +31,7 @@ from obs.adapters import registry as adapter_registry from obs.adapters.knx.dpt_registry import DPTRegistry -from obs.api.auth import get_current_user +from obs.api.auth import get_admin_user, get_current_user from obs.db.database import Database, get_db router = APIRouter(tags=["adapters"]) @@ -853,7 +853,7 @@ async def test_adapter( async def update_adapter_config( adapter_type: str, body: ConfigPatch, - _user: str = Depends(get_current_user), + _user: str = Depends(get_admin_user), db: Database = Depends(lambda: get_db()), ) -> AdapterConfigOut: cls = adapter_registry.get_class(adapter_type) @@ -886,7 +886,7 @@ async def update_adapter_config( @router.get("/{adapter_type}/config", response_model=AdapterConfigOut) async def get_adapter_config( adapter_type: str, - _user: str = Depends(get_current_user), + _user: str = Depends(get_admin_user), db: Database = Depends(lambda: get_db()), ) -> AdapterConfigOut: row = await db.fetchone("SELECT * FROM adapter_configs WHERE adapter_type=?", (adapter_type,)) diff --git a/obs/api/v1/bindings.py b/obs/api/v1/bindings.py index 634b0b32..34183ece 100644 --- a/obs/api/v1/bindings.py +++ b/obs/api/v1/bindings.py @@ -19,7 +19,7 @@ from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel -from obs.api.auth import get_current_user +from obs.api.auth import get_admin_user, get_current_user from obs.core.registry import get_registry from obs.db.database import Database, get_db from obs.models.binding import ( @@ -138,7 +138,7 @@ async def list_bindings( async def create_binding( dp_id: uuid.UUID, body: AdapterBindingCreate, - _user: str = Depends(get_current_user), + _user: str = Depends(get_admin_user), db: Database = Depends(lambda: get_db()), ) -> BindingOut: if get_registry().get(dp_id) is None: @@ -213,7 +213,7 @@ async def update_binding( dp_id: uuid.UUID, binding_id: uuid.UUID, body: AdapterBindingUpdate, - _user: str = Depends(get_current_user), + _user: str = Depends(get_admin_user), db: Database = Depends(lambda: get_db()), ) -> BindingOut: row = await db.fetchone( @@ -279,7 +279,7 @@ async def update_binding( async def delete_binding( dp_id: uuid.UUID, binding_id: uuid.UUID, - _user: str = Depends(get_current_user), + _user: str = Depends(get_admin_user), db: Database = Depends(lambda: get_db()), ) -> None: row = await db.fetchone( diff --git a/tests/integration/test_auth.py b/tests/integration/test_auth.py index abb64516..4fa9477e 100644 --- a/tests/integration/test_auth.py +++ b/tests/integration/test_auth.py @@ -9,8 +9,11 @@ from __future__ import annotations +import uuid + import pytest +from obs.api.auth import create_access_token from tests.integration.conftest import assert_auth_token_shape pytestmark = pytest.mark.integration @@ -126,3 +129,46 @@ async def test_me_returns_admin_info(client, auth_headers): body = resp.json() assert body["username"] == "admin" assert body["is_admin"] is True + + +async def test_admin_key_name_does_not_grant_admin_privileges(client, auth_headers): + suffix = uuid.uuid4().hex[:8] + username = f"user_{suffix}" + + create_user = await client.post( + "/api/v1/auth/users", + headers=auth_headers, + json={"username": username, "password": "pw", "is_admin": False}, + ) + assert create_user.status_code == 201 + + user_token = create_access_token(username) + key_resp = await client.post( + "/api/v1/auth/apikeys", + headers={"Authorization": f"Bearer {user_token}"}, + json={"name": "admin"}, + ) + assert key_resp.status_code == 201 + key = key_resp.json()["key"] + + resp = await client.get( + "/api/v1/adapters/mqtt/config", + headers={"X-API-Key": key}, + ) + assert resp.status_code == 403 + + +async def test_admin_api_key_with_custom_name_keeps_admin_access(client, auth_headers): + key_resp = await client.post( + "/api/v1/auth/apikeys", + headers=auth_headers, + json={"name": f"automation-{uuid.uuid4().hex[:8]}"}, + ) + assert key_resp.status_code == 201 + key = key_resp.json()["key"] + + resp = await client.get( + "/api/v1/adapters/mqtt/config", + headers={"X-API-Key": key}, + ) + assert resp.status_code == 200