|
6 | 6 | import pytest_asyncio |
7 | 7 | from sqlalchemy.ext.asyncio import AsyncSession |
8 | 8 |
|
9 | | -from src.modules.api_keys.crud import crud_key_permissions |
| 9 | +from src.modules.api_keys.crud import crud_api_keys, crud_key_permissions |
10 | 10 | from src.modules.api_keys.enums import KeyPermissionAction, KeyPermissionResource |
11 | 11 | from src.modules.api_keys.schemas import ( |
12 | 12 | APIKeyCreate, |
| 13 | + APIKeyCreateInternal, |
13 | 14 | APIKeyUpdate, |
14 | 15 | KeyPermissionCreate, |
15 | 16 | KeyUsageCreate, |
@@ -385,6 +386,42 @@ async def test_api_key_hash_roundtrip(api_key_service): |
385 | 386 | assert not api_key_service._verify_api_key("fai_wrong_key", hash1) |
386 | 387 |
|
387 | 388 |
|
| 389 | +@pytest.mark.asyncio |
| 390 | +async def test_validate_api_key_with_underscore_in_prefix(api_key_service, db_session: AsyncSession, test_user: dict): |
| 391 | + """Regression: secrets.token_urlsafe alphabet includes `_`; prefix extraction must not split on it. |
| 392 | +
|
| 393 | + When the random 8-char prefix happens to contain `_`, a naive `split("_", 2)` returns the wrong |
| 394 | + substring and the key_prefix lookup misses, breaking validation for the (rare) keys that draw |
| 395 | + underscores. |
| 396 | + """ |
| 397 | + api_key, prefix, key_hash = api_key_service._generate_api_key() |
| 398 | + forced_prefix = "ab_cd_ef" |
| 399 | + api_key = f"fai_{forced_prefix}_{api_key.split('_', 2)[2]}" |
| 400 | + forced_hash = api_key_service._hash_api_key(api_key) |
| 401 | + |
| 402 | + key_dict = { |
| 403 | + "name": "underscore prefix", |
| 404 | + "user_id": test_user["id"], |
| 405 | + "key_hash": forced_hash, |
| 406 | + "key_prefix": forced_prefix, |
| 407 | + "permissions": {}, |
| 408 | + "usage_limits": {}, |
| 409 | + } |
| 410 | + await crud_api_keys.create(db=db_session, object=APIKeyCreateInternal(**key_dict)) |
| 411 | + |
| 412 | + permission_data = KeyPermissionCreate( |
| 413 | + api_key_id=(await crud_api_keys.get(db=db_session, key_prefix=forced_prefix))["id"], |
| 414 | + resource=KeyPermissionResource.WILDCARD, |
| 415 | + action=KeyPermissionAction.WILDCARD, |
| 416 | + is_allowed=True, |
| 417 | + ) |
| 418 | + await crud_key_permissions.create(db=db_session, object=permission_data) |
| 419 | + |
| 420 | + validation = await api_key_service.validate_api_key(api_key=api_key, resource="anything", action="anything", db=db_session) |
| 421 | + |
| 422 | + assert validation.is_valid is True |
| 423 | + |
| 424 | + |
388 | 425 | @pytest.mark.asyncio |
389 | 426 | async def test_usage_pagination(api_key_service, db_session: AsyncSession, test_user: dict, test_api_key): |
390 | 427 | """Test usage history pagination.""" |
|
0 commit comments