Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions backend/recotem/tests/test_assign_owners_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Tests for the assign_owners management command."""

from io import StringIO

import pytest
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.core.management.base import CommandError

from recotem.api.models import EvaluationConfig, Project, SplitConfig

User = get_user_model()


@pytest.fixture
def target_user(db):
return User.objects.create_user(username="assign_target", password="pass")


@pytest.mark.django_db
class TestAssignOwnersCommand:
def test_assigns_owner_to_unowned_projects(self, target_user):
p = Project.objects.create(
name="unowned", user_column="u", item_column="i", owner=None
)
out = StringIO()
call_command("assign_owners", "--user", target_user.username, stdout=out)

p.refresh_from_db()
assert p.owner == target_user
assert "assigned" in out.getvalue()

def test_assigns_created_by_to_configs(self, target_user):
sc = SplitConfig.objects.create(created_by=None)
ec = EvaluationConfig.objects.create(created_by=None)

out = StringIO()
call_command("assign_owners", "--user", target_user.username, stdout=out)

sc.refresh_from_db()
ec.refresh_from_db()
assert sc.created_by == target_user
assert ec.created_by == target_user

def test_dry_run_no_changes(self, target_user):
p = Project.objects.create(
name="dry_run_proj", user_column="u", item_column="i", owner=None
)
sc = SplitConfig.objects.create(created_by=None)

out = StringIO()
call_command(
"assign_owners", "--user", target_user.username, "--dry-run", stdout=out
)

p.refresh_from_db()
sc.refresh_from_db()
assert p.owner is None
assert sc.created_by is None
assert "Dry run complete" in out.getvalue()

def test_invalid_user_raises(self):
with pytest.raises(CommandError, match="does not exist"):
call_command("assign_owners", "--user", "ghost_user")

def test_no_unowned_records(self, target_user):
# Create records that already have owners
Project.objects.create(
name="owned_proj",
user_column="u",
item_column="i",
owner=target_user,
)
SplitConfig.objects.create(created_by=target_user)
EvaluationConfig.objects.create(created_by=target_user)

out = StringIO()
call_command("assign_owners", "--user", target_user.username, stdout=out)

assert "no unowned records" in out.getvalue()
75 changes: 75 additions & 0 deletions backend/recotem/tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import pytest
from django.contrib.auth import get_user_model
from django.contrib.auth.hashers import make_password
from django.test import RequestFactory
from rest_framework.exceptions import AuthenticationFailed

from recotem.api.authentication import (
API_KEY_PREFIX,
Expand Down Expand Up @@ -143,3 +145,76 @@ def test_authenticate_header(self):
factory = RequestFactory()
request = factory.get("/")
assert auth.authenticate_header(request) == "X-API-Key"


@pytest.mark.django_db
class TestRequireManagementScope:
"""Test RequireManagementScope permission class."""

def test_read_scope_for_get(self, api_key_data, user):
"""GET requires 'read' scope."""
from recotem.api.authentication import RequireManagementScope

full_key, key_obj = api_key_data
# key_obj has scopes=["read", "predict"]
factory = RequestFactory()
request = factory.get("/")
request.api_key = key_obj
request.user = user
perm = RequireManagementScope()
assert perm.has_permission(request, None) is True

def test_write_scope_for_post(self, api_key_data, user):
"""POST requires 'write' scope -- key only has read+predict, should fail."""
from recotem.api.authentication import RequireManagementScope

full_key, key_obj = api_key_data
factory = RequestFactory()
request = factory.post("/")
request.api_key = key_obj
request.user = user
perm = RequireManagementScope()
assert perm.has_permission(request, None) is False

def test_jwt_always_allowed(self, user):
"""JWT user (no api_key attr) passes all scope checks."""
from recotem.api.authentication import RequireManagementScope

factory = RequestFactory()
request = factory.post("/")
request.user = user
# No api_key attribute -> JWT
perm = RequireManagementScope()
assert perm.has_permission(request, None) is True


@pytest.mark.django_db
class TestAmbiguousApiKeyPrefix:
def test_ambiguous_prefix(self, user, project):
"""Two keys with same prefix -> 'Ambiguous API key prefix'."""
# Create two keys with the same prefix
prefix = "SAMEPRFX"
ApiKey.objects.create(
project=project,
owner=user,
name="key1",
key_prefix=prefix,
hashed_key=make_password("dummy1"),
scopes=["read"],
)
ApiKey.objects.create(
project=project,
owner=user,
name="key2",
key_prefix=prefix,
hashed_key=make_password("dummy2"),
scopes=["read"],
)

factory = RequestFactory()
request = factory.get(
"/", HTTP_X_API_KEY=f"{API_KEY_PREFIX}{prefix}longenoughkey"
)
auth = ApiKeyAuthentication()
with pytest.raises(AuthenticationFailed, match="Ambiguous"):
auth.authenticate(request)
141 changes: 141 additions & 0 deletions backend/recotem/tests/test_create_api_key_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Tests for the create_api_key management command."""

from io import StringIO

import pytest
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils import timezone

from recotem.api.models import ApiKey, Project

User = get_user_model()


@pytest.fixture
def admin_user(db):
return User.objects.create_user(username="admin", password="pass")


@pytest.fixture
def project(admin_user):
return Project.objects.create(
name="test_project", user_column="user", item_column="item", owner=admin_user
)


@pytest.mark.django_db
class TestCreateApiKeyCommand:
def test_creates_key_prints_to_stdout(self, project, admin_user):
out = StringIO()
call_command(
"create_api_key",
"--project-id",
str(project.id),
"--name",
"my-key",
"--owner",
admin_user.username,
stdout=out,
)
raw_key = out.getvalue().strip()
assert raw_key.startswith("rctm_")
assert ApiKey.objects.filter(project=project, name="my-key").exists()

def test_invalid_project_raises(self, admin_user):
with pytest.raises(CommandError, match="not found"):
call_command(
"create_api_key",
"--project-id",
"99999",
"--name",
"bad-key",
"--owner",
admin_user.username,
)

def test_invalid_owner_raises(self, project):
with pytest.raises(CommandError, match="not found"):
call_command(
"create_api_key",
"--project-id",
str(project.id),
"--name",
"bad-key",
"--owner",
"nonexistent_user",
)

def test_invalid_scope_raises(self, project, admin_user):
with pytest.raises(CommandError, match="Invalid scope"):
call_command(
"create_api_key",
"--project-id",
str(project.id),
"--name",
"bad-scope-key",
"--scopes",
"predict,badscope",
"--owner",
admin_user.username,
)

def test_owner_project_mismatch_raises(self, project):
other_user = User.objects.create_user(username="other", password="pass")
with pytest.raises(CommandError, match="does not match project owner"):
call_command(
"create_api_key",
"--project-id",
str(project.id),
"--name",
"mismatch-key",
"--owner",
other_user.username,
)

def test_duplicate_name_raises(self, project, admin_user):
call_command(
"create_api_key",
"--project-id",
str(project.id),
"--name",
"dup-key",
"--owner",
admin_user.username,
stdout=StringIO(),
)
with pytest.raises(CommandError, match="already exists"):
call_command(
"create_api_key",
"--project-id",
str(project.id),
"--name",
"dup-key",
"--owner",
admin_user.username,
)

def test_expires_in_days_sets_expiry(self, project, admin_user):
before = timezone.now()
call_command(
"create_api_key",
"--project-id",
str(project.id),
"--name",
"expiring-key",
"--expires-in-days",
"30",
"--owner",
admin_user.username,
stdout=StringIO(),
)
after = timezone.now()

key = ApiKey.objects.get(project=project, name="expiring-key")
assert key.expires_at is not None
from datetime import timedelta

assert (
before + timedelta(days=30) <= key.expires_at <= after + timedelta(days=30)
)
33 changes: 33 additions & 0 deletions backend/recotem/tests/test_create_test_users_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Tests for the create_test_users management command."""

import pytest
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.core.management.base import CommandError

User = get_user_model()


@pytest.mark.django_db
class TestCreateTestUsersCommand:
def test_creates_new_user(self):
call_command("create_test_users", "--user", "newuser:secret123")

user = User.objects.get(username="newuser")
assert user.check_password("secret123")

def test_updates_existing_user(self):
User.objects.create_user(username="existing", password="oldpass")
call_command("create_test_users", "--user", "existing:newpass")

user = User.objects.get(username="existing")
assert user.check_password("newpass")
assert not user.check_password("oldpass")

def test_invalid_format_raises(self):
with pytest.raises(CommandError, match="Expected format"):
call_command("create_test_users", "--user", "nocolonhere")

def test_empty_username_raises(self):
with pytest.raises(CommandError, match="Username and password are required"):
call_command("create_test_users", "--user", ":password")
Loading
Loading