|
| 1 | +"""Direct unit tests for codeframe.core.api_key_service.ApiKeyService. |
| 2 | +
|
| 3 | +Covers the create/list/revoke/rotate/get surface against a real (tmp) SQLite |
| 4 | +Database — main paths plus error cases (invalid scopes, wrong owner, not-found, |
| 5 | +and the rotate-keeps-old-key-on-failure safety path). No live API calls. |
| 6 | +
|
| 7 | +Issue #654 (P6.8.1): test coverage hardening for untested core modules. |
| 8 | +""" |
| 9 | + |
| 10 | +from datetime import datetime, timezone, timedelta |
| 11 | +from unittest.mock import patch |
| 12 | + |
| 13 | +import pytest |
| 14 | + |
| 15 | +from codeframe.auth.api_keys import SCOPE_READ, SCOPE_WRITE, SCOPE_ADMIN |
| 16 | +from codeframe.core.api_key_service import ( |
| 17 | + ApiKeyService, |
| 18 | + ApiKeyInfo, |
| 19 | + CreatedApiKey, |
| 20 | +) |
| 21 | +from codeframe.platform_store.database import Database |
| 22 | + |
| 23 | + |
| 24 | +pytestmark = pytest.mark.v2 |
| 25 | + |
| 26 | + |
| 27 | +# Two distinct users so ownership / isolation can be exercised. |
| 28 | +USER_A = 1 |
| 29 | +USER_B = 2 |
| 30 | + |
| 31 | + |
| 32 | +@pytest.fixture |
| 33 | +def db(tmp_path): |
| 34 | + """Initialized Database with two seeded users.""" |
| 35 | + database = Database(tmp_path / "test.db") |
| 36 | + database.initialize() |
| 37 | + # initialize() seeds the bootstrap admin as user 1; OR REPLACE keeps the |
| 38 | + # seed deterministic for these tests. |
| 39 | + database.conn.execute( |
| 40 | + """ |
| 41 | + INSERT OR REPLACE INTO users ( |
| 42 | + id, email, name, hashed_password, |
| 43 | + is_active, is_superuser, is_verified, email_verified |
| 44 | + ) |
| 45 | + VALUES |
| 46 | + (?, 'a@example.com', 'User A', '!DISABLED!', 1, 0, 1, 1), |
| 47 | + (?, 'b@example.com', 'User B', '!DISABLED!', 1, 0, 1, 1) |
| 48 | + """, |
| 49 | + (USER_A, USER_B), |
| 50 | + ) |
| 51 | + database.conn.commit() |
| 52 | + return database |
| 53 | + |
| 54 | + |
| 55 | +@pytest.fixture |
| 56 | +def service(db): |
| 57 | + return ApiKeyService(db) |
| 58 | + |
| 59 | + |
| 60 | +# --- create_api_key --------------------------------------------------------- |
| 61 | + |
| 62 | + |
| 63 | +class TestCreateApiKey: |
| 64 | + def test_create_with_default_scopes(self, service, db): |
| 65 | + result = service.create_api_key(user_id=USER_A, name="Default") |
| 66 | + |
| 67 | + assert isinstance(result, CreatedApiKey) |
| 68 | + assert result.key.startswith("cf_live_") |
| 69 | + assert result.prefix == result.key[:12] |
| 70 | + assert result.id # non-empty UUID |
| 71 | + # created_at is RFC3339-ish (parseable) |
| 72 | + datetime.fromisoformat(result.created_at) |
| 73 | + |
| 74 | + # Persisted with the default scopes and active. |
| 75 | + record = db.api_keys.get_by_id(result.id) |
| 76 | + assert record is not None |
| 77 | + assert record["scopes"] == [SCOPE_READ, SCOPE_WRITE] |
| 78 | + assert record["is_active"] is True |
| 79 | + assert record["user_id"] == USER_A |
| 80 | + |
| 81 | + def test_create_with_custom_scopes(self, service, db): |
| 82 | + result = service.create_api_key( |
| 83 | + user_id=USER_A, name="ReadOnly", scopes=[SCOPE_READ] |
| 84 | + ) |
| 85 | + record = db.api_keys.get_by_id(result.id) |
| 86 | + assert record["scopes"] == [SCOPE_READ] |
| 87 | + |
| 88 | + def test_create_with_admin_scope(self, service, db): |
| 89 | + result = service.create_api_key( |
| 90 | + user_id=USER_A, name="Admin", scopes=[SCOPE_ADMIN] |
| 91 | + ) |
| 92 | + assert db.api_keys.get_by_id(result.id)["scopes"] == [SCOPE_ADMIN] |
| 93 | + |
| 94 | + def test_create_with_expiry(self, service, db): |
| 95 | + expires = datetime.now(timezone.utc) + timedelta(days=30) |
| 96 | + result = service.create_api_key( |
| 97 | + user_id=USER_A, name="Expiring", expires_at=expires |
| 98 | + ) |
| 99 | + record = db.api_keys.get_by_id(result.id) |
| 100 | + assert record["expires_at"] is not None |
| 101 | + |
| 102 | + def test_key_hash_is_persisted_but_not_returned(self, service, db): |
| 103 | + """The full key is returned once; the stored hash never leaves the DB.""" |
| 104 | + result = service.create_api_key(user_id=USER_A, name="Hashed") |
| 105 | + record = db.api_keys.get_by_id(result.id) |
| 106 | + assert record["key_hash"].startswith("$sha256$") |
| 107 | + # The CreatedApiKey dataclass exposes no hash field. |
| 108 | + assert not hasattr(result, "key_hash") |
| 109 | + |
| 110 | + def test_invalid_scopes_raise_value_error(self, service): |
| 111 | + with pytest.raises(ValueError, match="Invalid scopes"): |
| 112 | + service.create_api_key(user_id=USER_A, name="Bad", scopes=["bogus"]) |
| 113 | + |
| 114 | + def test_empty_scopes_raise_value_error(self, service): |
| 115 | + # validate_scopes() rejects an empty list. |
| 116 | + with pytest.raises(ValueError, match="Invalid scopes"): |
| 117 | + service.create_api_key(user_id=USER_A, name="Empty", scopes=[]) |
| 118 | + |
| 119 | + |
| 120 | +# --- list_api_keys ---------------------------------------------------------- |
| 121 | + |
| 122 | + |
| 123 | +class TestListApiKeys: |
| 124 | + def test_empty_when_no_keys(self, service): |
| 125 | + assert service.list_api_keys(user_id=USER_A) == [] |
| 126 | + |
| 127 | + def test_lists_multiple_keys(self, service): |
| 128 | + service.create_api_key(user_id=USER_A, name="One") |
| 129 | + service.create_api_key(user_id=USER_A, name="Two") |
| 130 | + |
| 131 | + keys = service.list_api_keys(user_id=USER_A) |
| 132 | + assert len(keys) == 2 |
| 133 | + assert all(isinstance(k, ApiKeyInfo) for k in keys) |
| 134 | + assert {k.name for k in keys} == {"One", "Two"} |
| 135 | + |
| 136 | + def test_isolated_per_user(self, service): |
| 137 | + service.create_api_key(user_id=USER_A, name="A-key") |
| 138 | + service.create_api_key(user_id=USER_B, name="B-key") |
| 139 | + |
| 140 | + a_keys = service.list_api_keys(user_id=USER_A) |
| 141 | + assert [k.name for k in a_keys] == ["A-key"] |
| 142 | + |
| 143 | + def test_includes_revoked_keys(self, service): |
| 144 | + created = service.create_api_key(user_id=USER_A, name="Soon-revoked") |
| 145 | + service.revoke_api_key(created.id, user_id=USER_A) |
| 146 | + |
| 147 | + keys = service.list_api_keys(user_id=USER_A) |
| 148 | + assert len(keys) == 1 |
| 149 | + assert keys[0].is_active is False |
| 150 | + |
| 151 | + def test_listing_does_not_leak_hash(self, service): |
| 152 | + service.create_api_key(user_id=USER_A, name="NoLeak") |
| 153 | + info = service.list_api_keys(user_id=USER_A)[0] |
| 154 | + # ApiKeyInfo has no hash attribute and the dict source excludes it. |
| 155 | + assert not hasattr(info, "key_hash") |
| 156 | + |
| 157 | + |
| 158 | +# --- revoke_api_key --------------------------------------------------------- |
| 159 | + |
| 160 | + |
| 161 | +class TestRevokeApiKey: |
| 162 | + def test_revoke_active_key(self, service, db): |
| 163 | + created = service.create_api_key(user_id=USER_A, name="Revoke me") |
| 164 | + |
| 165 | + assert service.revoke_api_key(created.id, user_id=USER_A) is True |
| 166 | + assert db.api_keys.get_by_id(created.id)["is_active"] is False |
| 167 | + |
| 168 | + def test_revoke_unknown_key_returns_false(self, service): |
| 169 | + assert service.revoke_api_key("does-not-exist", user_id=USER_A) is False |
| 170 | + |
| 171 | + def test_revoke_other_users_key_returns_false(self, service, db): |
| 172 | + created = service.create_api_key(user_id=USER_A, name="A-owned") |
| 173 | + |
| 174 | + # User B may not revoke User A's key. |
| 175 | + assert service.revoke_api_key(created.id, user_id=USER_B) is False |
| 176 | + assert db.api_keys.get_by_id(created.id)["is_active"] is True |
| 177 | + |
| 178 | + |
| 179 | +# --- rotate_api_key --------------------------------------------------------- |
| 180 | + |
| 181 | + |
| 182 | +class TestRotateApiKey: |
| 183 | + def test_rotate_creates_new_and_revokes_old(self, service, db): |
| 184 | + original = service.create_api_key( |
| 185 | + user_id=USER_A, name="Rotating", scopes=[SCOPE_READ] |
| 186 | + ) |
| 187 | + |
| 188 | + rotated = service.rotate_api_key(original.id, user_id=USER_A) |
| 189 | + |
| 190 | + assert isinstance(rotated, CreatedApiKey) |
| 191 | + assert rotated.id != original.id |
| 192 | + assert rotated.key != original.key |
| 193 | + # Old key is now inactive; new key is active with carried-over name/scopes. |
| 194 | + assert db.api_keys.get_by_id(original.id)["is_active"] is False |
| 195 | + new_record = db.api_keys.get_by_id(rotated.id) |
| 196 | + assert new_record["is_active"] is True |
| 197 | + assert new_record["name"] == "Rotating" |
| 198 | + assert new_record["scopes"] == [SCOPE_READ] |
| 199 | + |
| 200 | + def test_rotate_unknown_key_returns_none(self, service): |
| 201 | + assert service.rotate_api_key("nope", user_id=USER_A) is None |
| 202 | + |
| 203 | + def test_rotate_other_users_key_returns_none(self, service, db): |
| 204 | + created = service.create_api_key(user_id=USER_A, name="A-owned") |
| 205 | + |
| 206 | + assert service.rotate_api_key(created.id, user_id=USER_B) is None |
| 207 | + # Untouched. |
| 208 | + assert db.api_keys.get_by_id(created.id)["is_active"] is True |
| 209 | + |
| 210 | + def test_rotate_keeps_old_key_when_creation_fails(self, service, db): |
| 211 | + """If new-key creation raises, the old key must remain active.""" |
| 212 | + created = service.create_api_key(user_id=USER_A, name="Safety") |
| 213 | + |
| 214 | + with patch.object( |
| 215 | + service, "create_api_key", side_effect=ValueError("boom") |
| 216 | + ): |
| 217 | + with pytest.raises(ValueError, match="boom"): |
| 218 | + service.rotate_api_key(created.id, user_id=USER_A) |
| 219 | + |
| 220 | + assert db.api_keys.get_by_id(created.id)["is_active"] is True |
| 221 | + |
| 222 | + |
| 223 | +# --- get_api_key ------------------------------------------------------------ |
| 224 | + |
| 225 | + |
| 226 | +class TestGetApiKey: |
| 227 | + def test_get_existing_key(self, service): |
| 228 | + created = service.create_api_key(user_id=USER_A, name="Fetch me") |
| 229 | + |
| 230 | + info = service.get_api_key(created.id) |
| 231 | + assert isinstance(info, ApiKeyInfo) |
| 232 | + assert info.id == created.id |
| 233 | + assert info.name == "Fetch me" |
| 234 | + assert info.prefix == created.prefix |
| 235 | + assert info.is_active is True |
| 236 | + |
| 237 | + def test_get_unknown_key_returns_none(self, service): |
| 238 | + assert service.get_api_key("missing") is None |
| 239 | + |
| 240 | + def test_get_api_key_is_owner_agnostic(self, service): |
| 241 | + """Unlike revoke/rotate, get_api_key takes no user_id and does NOT |
| 242 | + enforce ownership — it returns public (non-secret) info for any key id. |
| 243 | + This pins that intentional contract so a future ownership change is a |
| 244 | + deliberate, test-visible decision.""" |
| 245 | + created = service.create_api_key(user_id=USER_A, name="A-owned") |
| 246 | + # Fetched without any user context; still resolves. |
| 247 | + info = service.get_api_key(created.id) |
| 248 | + assert info is not None |
| 249 | + assert info.id == created.id |
| 250 | + # And the returned info carries no secret material. |
| 251 | + assert not hasattr(info, "key_hash") |
0 commit comments