Skip to content

Commit c671a7b

Browse files
committed
feat(AGX1-274): record task creator identity (user/service-account) on creation
Adds two nullable creator-audit columns to the `tasks` table — `creator_user_id` and `creator_service_account_id` — populated from the request principal in `AgentTaskService.create_task`. A CHECK constraint `ck_tasks_at_most_one_creator` enforces that at most one of the two is set; partial indexes back future "tasks created by X" lookups. Online migration: the CHECK is added `NOT VALID` then `VALIDATE`d separately so the brief ACCESS EXCLUSIVE lock doesn't have to wait on an existence scan. `tasks` is a high-write table; a vanilla CHECK addition would queue behind in-flight transactions and block readers until released. Indexes use `CREATE INDEX CONCURRENTLY` inside `autocommit_block`. Best-effort attribution: tasks created outside an HTTP request context (Temporal activities, background workers, any path that constructs `AgentTaskService` without `request.state.principal_context`) leave both columns NULL. The CHECK constraint allows both-NULL, and an integration test exercises the no-resolvable-creator path. These columns are how the AGX1-291 operator runbook identifies orphan rows for backfill when the dual-write call sites added in the next commit fail under load. Part of the AGX1-264 stack: scaleapi/scaleapi NEW2 (per-account FF endpoint) → scaleapi/agentex#353 (agentex-auth routing + cancel) → this PR → #249 (per-RPC route migration). Two commits land together in #246; this one is the schema/audit change and is independent of the dual-write call sites.
1 parent 5d055f4 commit c671a7b

13 files changed

Lines changed: 308 additions & 4 deletions

File tree

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""add_task_creator_columns
2+
3+
Revision ID: a1f73ada66c5
4+
Revises: 6c942325c828
5+
Create Date: 2026-05-21 15:08:51.441535
6+
7+
"""
8+
9+
from collections.abc import Sequence
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "a1f73ada66c5"
16+
down_revision: str | None = "6c942325c828"
17+
branch_labels: str | Sequence[str] | None = None
18+
depends_on: str | Sequence[str] | None = None
19+
20+
21+
def upgrade() -> None:
22+
op.add_column("tasks", sa.Column("creator_user_id", sa.String(), nullable=True))
23+
op.add_column(
24+
"tasks", sa.Column("creator_service_account_id", sa.String(), nullable=True)
25+
)
26+
with op.get_context().autocommit_block():
27+
op.execute(
28+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_user_id "
29+
"ON tasks (creator_user_id)"
30+
)
31+
op.execute(
32+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_service_account_id "
33+
"ON tasks (creator_service_account_id)"
34+
)
35+
# Add the CHECK as NOT VALID so the brief ACCESS EXCLUSIVE lock doesn't have to
36+
# wait on an existence scan, then VALIDATE under SHARE UPDATE EXCLUSIVE which
37+
# doesn't block concurrent reads/writes. `tasks` is a high-write table; even the
38+
# short ACCESS EXCLUSIVE held during a vanilla CHECK addition queues behind
39+
# in-flight transactions and blocks readers until it releases.
40+
op.execute(
41+
"ALTER TABLE tasks ADD CONSTRAINT ck_tasks_at_most_one_creator "
42+
"CHECK ((creator_user_id IS NULL) OR (creator_service_account_id IS NULL)) "
43+
"NOT VALID"
44+
)
45+
op.execute("ALTER TABLE tasks VALIDATE CONSTRAINT ck_tasks_at_most_one_creator")
46+
47+
48+
def downgrade() -> None:
49+
op.drop_constraint("ck_tasks_at_most_one_creator", "tasks", type_="check")
50+
with op.get_context().autocommit_block():
51+
op.execute(
52+
"DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_service_account_id"
53+
)
54+
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_user_id")
55+
op.drop_column("tasks", "creator_service_account_id")
56+
op.drop_column("tasks", "creator_user_id")

agentex/database/migrations/migration_history.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
a9959ebcbe98 -> 6c942325c828 (head), adding task cleaned at
1+
6c942325c828 -> a1f73ada66c5 (head), add_task_creator_columns
2+
a9959ebcbe98 -> 6c942325c828, adding task cleaned at
23
e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id
34
9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index
45
57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels

agentex/src/adapters/orm.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
JSON,
33
BigInteger,
44
Boolean,
5+
CheckConstraint,
56
Column,
67
DateTime,
78
ForeignKey,
@@ -75,13 +76,19 @@ class TaskORM(BaseORM):
7576
cleaned_at = Column(DateTime(timezone=True), nullable=True)
7677
params = Column(JSONB, nullable=True)
7778
task_metadata = Column(JSONB, nullable=True)
79+
creator_user_id = Column(String, nullable=True, index=True)
80+
creator_service_account_id = Column(String, nullable=True, index=True)
7881
# Many-to-Many relationship with agents
7982
agents = relationship("AgentORM", secondary="task_agents", back_populates="tasks")
8083

8184
# Indexes for efficient querying
8285
__table_args__ = (
8386
# Index for filtering tasks by status (used in list queries)
8487
Index("ix_tasks_status", "status"),
88+
CheckConstraint(
89+
"creator_user_id IS NULL OR creator_service_account_id IS NULL",
90+
name="ck_tasks_at_most_one_creator",
91+
),
8592
)
8693

8794

agentex/src/domain/entities/tasks.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ class TaskEntity(BaseModel):
6262
None,
6363
title="Task metadata",
6464
)
65+
creator_user_id: str | None = Field(
66+
None,
67+
title="Identity ID of the user who created this task",
68+
)
69+
creator_service_account_id: str | None = Field(
70+
None,
71+
title="Service identity ID of the service account that created this task",
72+
)
6573

6674
# allow extra fields for agents relationships
6775
model_config = ConfigDict(extra="allow")

agentex/src/domain/services/task_service.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,25 @@
1414
from src.domain.repositories.task_repository import DTaskRepository
1515
from src.domain.repositories.task_state_repository import DTaskStateRepository
1616
from src.domain.services.agent_acp_service import DAgentACPService
17+
from src.domain.services.authorization_service import DAuthorizationService
1718
from src.utils.ids import orm_id
1819
from src.utils.logging import make_logger
1920
from src.utils.stream_topics import get_task_event_stream_topic
2021

2122
logger = make_logger(__name__)
2223

2324

25+
def _principal_field(principal_context: Any, key: str) -> str | None:
26+
"""Read an attribute from the principal context, which may be either a
27+
Pydantic-style object or a plain dict. The authn proxy returns a dict
28+
(``response.json()`` shape), so ``getattr`` alone silently yields None."""
29+
if principal_context is None:
30+
return None
31+
if isinstance(principal_context, dict):
32+
return principal_context.get(key)
33+
return getattr(principal_context, key, None)
34+
35+
2436
class AgentTaskService:
2537
"""
2638
Service for managing agent tasks and forwarding operations to ACP servers.
@@ -33,12 +45,14 @@ def __init__(
3345
task_repository: DTaskRepository,
3446
event_repository: DEventRepository,
3547
stream_repository: DRedisStreamRepository,
48+
authorization_service: DAuthorizationService,
3649
):
3750
self.acp_client = acp_client
3851
self.task_state_repository = task_state_repository
3952
self.task_repository = task_repository
4053
self.event_repository = event_repository
4154
self.stream_repository = stream_repository
55+
self.authorization_service = authorization_service
4256

4357
async def create_task(
4458
self,
@@ -59,6 +73,11 @@ async def create_task(
5973
Returns:
6074
Task containing the created task info
6175
"""
76+
principal_context = self.authorization_service.principal_context
77+
creator_user_id = _principal_field(principal_context, "user_id")
78+
creator_service_account_id = _principal_field(
79+
principal_context, "service_account_id"
80+
)
6281

6382
task_entity = await self.task_repository.create(
6483
agent_id=agent.id,
@@ -69,6 +88,8 @@ async def create_task(
6988
status_reason="Task created, forwarding to ACP server",
7089
params=task_params,
7190
task_metadata=task_metadata,
91+
creator_user_id=creator_user_id,
92+
creator_service_account_id=creator_service_account_id,
7293
),
7394
)
7495
return task_entity
@@ -91,7 +112,9 @@ async def create_task_and_forward_to_acp(
91112
Task containing the created task info
92113
"""
93114
task_entity = await self.create_task(
94-
agent=agent, task_name=task_name, task_params=task_params
115+
agent=agent,
116+
task_name=task_name,
117+
task_params=task_params,
95118
)
96119

97120
if agent.acp_type == ACPType.SYNC:

agentex/tests/fixtures/services.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Provides factory functions and specific fixtures for creating services with test repositories.
44
"""
55

6-
from unittest.mock import MagicMock, Mock
6+
from unittest.mock import AsyncMock, MagicMock, Mock
77

88
import pytest
99

@@ -12,6 +12,21 @@
1212
# =============================================================================
1313

1414

15+
def make_noop_authorization_service() -> Mock:
16+
"""Shared noop AuthorizationService mock for tests that don't exercise authz.
17+
18+
``principal_context`` is ``None`` (so creator-audit columns resolve to NULL),
19+
and ``grant``/``revoke`` are async no-ops returning ``None`` — matching the
20+
real service signature. Use this anywhere a test just needs to construct
21+
``AgentTaskService`` without caring about authorization behavior.
22+
"""
23+
svc = Mock()
24+
svc.principal_context = None
25+
svc.grant = AsyncMock(return_value=None)
26+
svc.revoke = AsyncMock(return_value=None)
27+
return svc
28+
29+
1530
def create_task_message_service(task_message_repository):
1631
"""Factory function to create TaskMessageService with given repository"""
1732
from src.domain.services.task_message_service import TaskMessageService
@@ -52,16 +67,21 @@ def create_task_service(
5267
event_repository,
5368
agent_acp_service,
5469
redis_stream_repository,
70+
authorization_service=None,
5571
):
56-
"""Factory function to create AgentTaskService with given repositories and services"""
72+
"""Factory function to create AgentTaskService with given repositories and services."""
5773
from src.domain.services.task_service import AgentTaskService
5874

75+
if authorization_service is None:
76+
authorization_service = make_noop_authorization_service()
77+
5978
return AgentTaskService(
6079
task_repository=task_repository,
6180
task_state_repository=task_state_repository,
6281
event_repository=event_repository,
6382
acp_client=agent_acp_service,
6483
stream_repository=redis_stream_repository,
84+
authorization_service=authorization_service,
6585
)
6686

6787

agentex/tests/integration/fixtures/integration_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from src.config.dependencies import GlobalDependencies
2323
from src.config.environment_variables import EnvironmentVariables
2424

25+
from tests.fixtures.services import make_noop_authorization_service
26+
2527

2628
@pytest.fixture(scope="session")
2729
def event_loop():
@@ -448,6 +450,7 @@ async def send_message(self, *args, **kwargs):
448450
task_repository=isolated_repositories["task_repository"],
449451
event_repository=isolated_repositories["event_repository"],
450452
stream_repository=isolated_repositories["redis_stream_repository"],
453+
authorization_service=make_noop_authorization_service(),
451454
)
452455

453456
return TasksUseCase(task_service=task_service)

agentex/tests/integration/test_task_stream.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from src.domain.use_cases.tasks_use_case import TasksUseCase
88
from src.utils.ids import orm_id
99

10+
from tests.fixtures.services import make_noop_authorization_service
11+
1012

1113
@pytest.mark.asyncio
1214
@pytest.mark.integration
@@ -76,6 +78,7 @@ async def send_message(self, *args, **kwargs):
7678
task_repository=isolated_repositories["task_repository"],
7779
event_repository=isolated_repositories["event_repository"],
7880
stream_repository=isolated_repositories["redis_stream_repository"],
81+
authorization_service=make_noop_authorization_service(),
7982
)
8083

8184
return TasksUseCase(task_service=task_service)
@@ -103,6 +106,7 @@ async def send_message(self, *args, **kwargs):
103106
task_repository=isolated_repositories["task_repository"],
104107
event_repository=isolated_repositories["event_repository"],
105108
stream_repository=isolated_repositories["redis_stream_repository"],
109+
authorization_service=make_noop_authorization_service(),
106110
)
107111

108112
environment_variables = EnvironmentVariables.refresh()

agentex/tests/integration/use_cases/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)