Skip to content

Commit b9cb26b

Browse files
committed
feat(AGX1-274): record task creator identity (user/service-account) on creation
Adds two nullable creator-audit columns to the tasks table — creator_user_id and creator_service_account_id — populated from the principal context at create time. A CHECK constraint (ck_tasks_one_creator) enforces that at most one is set. This replaces the earlier dual-write draft: grants are already issued unconditionally via grant_with_retry in agents_acp_use_case.py:239, and per-account rollout routing belongs in agentex-auth (private), not in this public Apache-2.0 codebase.
1 parent e1fb515 commit b9cb26b

13 files changed

Lines changed: 309 additions & 22 deletions

File tree

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""add_task_creator_columns
2+
3+
Revision ID: a1f73ada66c5
4+
Revises: a9959ebcbe98
5+
Create Date: 2026-05-21 15:08:51.441535
6+
7+
"""
8+
9+
from collections.abc import Sequence
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "a1f73ada66c5"
16+
down_revision: str | None = "a9959ebcbe98"
17+
branch_labels: str | Sequence[str] | None = None
18+
depends_on: str | Sequence[str] | None = None
19+
20+
21+
def upgrade() -> None:
22+
op.add_column("tasks", sa.Column("creator_user_id", sa.String(), nullable=True))
23+
op.add_column(
24+
"tasks", sa.Column("creator_service_account_id", sa.String(), nullable=True)
25+
)
26+
with op.get_context().autocommit_block():
27+
op.execute(
28+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_user_id "
29+
"ON tasks (creator_user_id)"
30+
)
31+
op.execute(
32+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_service_account_id "
33+
"ON tasks (creator_service_account_id)"
34+
)
35+
# Add the CHECK as NOT VALID so the brief ACCESS EXCLUSIVE lock doesn't have to
36+
# wait on an existence scan, then VALIDATE under SHARE UPDATE EXCLUSIVE which
37+
# doesn't block concurrent reads/writes. `tasks` is a high-write table; even the
38+
# short ACCESS EXCLUSIVE held during a vanilla CHECK addition queues behind
39+
# in-flight transactions and blocks readers until it releases.
40+
op.execute(
41+
"ALTER TABLE tasks ADD CONSTRAINT ck_tasks_at_most_one_creator "
42+
"CHECK ((creator_user_id IS NULL) OR (creator_service_account_id IS NULL)) "
43+
"NOT VALID"
44+
)
45+
op.execute("ALTER TABLE tasks VALIDATE CONSTRAINT ck_tasks_at_most_one_creator")
46+
47+
48+
def downgrade() -> None:
49+
op.drop_constraint("ck_tasks_at_most_one_creator", "tasks", type_="check")
50+
with op.get_context().autocommit_block():
51+
op.execute(
52+
"DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_service_account_id"
53+
)
54+
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_user_id")
55+
op.drop_column("tasks", "creator_service_account_id")
56+
op.drop_column("tasks", "creator_user_id")

agentex/database/migrations/migration_history.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
9ff3ee32c81b -> e9c4ff9e6542 (head), add_tasks_metadata_gin_index
1+
a9959ebcbe98 -> a1f73ada66c5 (head), add_task_creator_columns
2+
e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id
3+
9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index
24
57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels
35
4a9b7787ccd7 -> 57c5ed4f59ae, add_task_id_to_spans
46
d1a6cde41b3f -> 4a9b7787ccd7, deployments

agentex/src/adapters/orm.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
JSON,
33
BigInteger,
44
Boolean,
5+
CheckConstraint,
56
Column,
67
DateTime,
78
ForeignKey,
@@ -74,13 +75,17 @@ class TaskORM(BaseORM):
7475
)
7576
params = Column(JSONB, nullable=True)
7677
task_metadata = Column(JSONB, nullable=True)
78+
creator_user_id = Column(String, nullable=True, index=True)
79+
creator_service_account_id = Column(String, nullable=True, index=True)
7780
# Many-to-Many relationship with agents
7881
agents = relationship("AgentORM", secondary="task_agents", back_populates="tasks")
7982

80-
# Indexes for efficient querying
8183
__table_args__ = (
82-
# Index for filtering tasks by status (used in list queries)
8384
Index("ix_tasks_status", "status"),
85+
CheckConstraint(
86+
"creator_user_id IS NULL OR creator_service_account_id IS NULL",
87+
name="ck_tasks_at_most_one_creator",
88+
),
8489
)
8590

8691

agentex/src/domain/entities/tasks.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ class TaskEntity(BaseModel):
5858
None,
5959
title="Task metadata",
6060
)
61+
creator_user_id: str | None = Field(
62+
None,
63+
title="Identity ID of the user who created this task",
64+
)
65+
creator_service_account_id: str | None = Field(
66+
None,
67+
title="Service identity ID of the service account that created this task",
68+
)
6169

6270
# allow extra fields for agents relationships
6371
model_config = ConfigDict(extra="allow")

agentex/src/domain/services/task_service.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,25 @@
1414
from src.domain.repositories.task_repository import DTaskRepository
1515
from src.domain.repositories.task_state_repository import DTaskStateRepository
1616
from src.domain.services.agent_acp_service import DAgentACPService
17+
from src.domain.services.authorization_service import DAuthorizationService
1718
from src.utils.ids import orm_id
1819
from src.utils.logging import make_logger
1920
from src.utils.stream_topics import get_task_event_stream_topic
2021

2122
logger = make_logger(__name__)
2223

2324

25+
def _principal_field(principal_context: Any, key: str) -> str | None:
26+
"""Read an attribute from the principal context, which may be either a
27+
Pydantic-style object or a plain dict. The authn proxy returns a dict
28+
(``response.json()`` shape), so ``getattr`` alone silently yields None."""
29+
if principal_context is None:
30+
return None
31+
if isinstance(principal_context, dict):
32+
return principal_context.get(key)
33+
return getattr(principal_context, key, None)
34+
35+
2436
class AgentTaskService:
2537
"""
2638
Service for managing agent tasks and forwarding operations to ACP servers.
@@ -33,12 +45,14 @@ def __init__(
3345
task_repository: DTaskRepository,
3446
event_repository: DEventRepository,
3547
stream_repository: DRedisStreamRepository,
48+
authorization_service: DAuthorizationService,
3649
):
3750
self.acp_client = acp_client
3851
self.task_state_repository = task_state_repository
3952
self.task_repository = task_repository
4053
self.event_repository = event_repository
4154
self.stream_repository = stream_repository
55+
self.authorization_service = authorization_service
4256

4357
async def create_task(
4458
self,
@@ -49,16 +63,12 @@ async def create_task(
4963
) -> TaskEntity:
5064
"""
5165
Create a new task record in the repository with single agent (maintains existing interface).
52-
53-
Args:
54-
agent: The agent to create the task for
55-
task_name: The name of the task to be created
56-
task_params: The parameters for the task
57-
task_metadata: Caller-provided metadata to persist on the task row.
58-
Not forwarded to the agent.
59-
Returns:
60-
Task containing the created task info
6166
"""
67+
principal_context = self.authorization_service.principal_context
68+
creator_user_id = _principal_field(principal_context, "user_id")
69+
creator_service_account_id = _principal_field(
70+
principal_context, "service_account_id"
71+
)
6272

6373
task_entity = await self.task_repository.create(
6474
agent_id=agent.id,
@@ -69,6 +79,8 @@ async def create_task(
6979
status_reason="Task created, forwarding to ACP server",
7080
params=task_params,
7181
task_metadata=task_metadata,
82+
creator_user_id=creator_user_id,
83+
creator_service_account_id=creator_service_account_id,
7284
),
7385
)
7486
return task_entity
@@ -82,16 +94,11 @@ async def create_task_and_forward_to_acp(
8294
"""
8395
Create a new task record in the repository with single agent (maintains existing interface).
8496
Then, forward the task to the ACP server.
85-
86-
Args:
87-
agent: The agent to create the task for
88-
task_params: The parameters for the task to be sent to the ACP server
89-
90-
Returns:
91-
Task containing the created task info
9297
"""
9398
task_entity = await self.create_task(
94-
agent=agent, task_name=task_name, task_params=task_params
99+
agent=agent,
100+
task_name=task_name,
101+
task_params=task_params,
95102
)
96103

97104
if agent.acp_type == ACPType.SYNC:

agentex/tests/fixtures/services.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Provides factory functions and specific fixtures for creating services with test repositories.
44
"""
55

6-
from unittest.mock import MagicMock, Mock
6+
from unittest.mock import AsyncMock, MagicMock, Mock
77

88
import pytest
99

@@ -12,6 +12,21 @@
1212
# =============================================================================
1313

1414

15+
def make_noop_authorization_service() -> Mock:
16+
"""Shared noop AuthorizationService mock for tests that don't exercise authz.
17+
18+
``principal_context`` is ``None`` (so creator-audit columns resolve to NULL),
19+
and ``grant``/``revoke`` are async no-ops returning ``None`` — matching the
20+
real service signature. Use this anywhere a test just needs to construct
21+
``AgentTaskService`` without caring about authorization behavior.
22+
"""
23+
svc = Mock()
24+
svc.principal_context = None
25+
svc.grant = AsyncMock(return_value=None)
26+
svc.revoke = AsyncMock(return_value=None)
27+
return svc
28+
29+
1530
def create_task_message_service(task_message_repository):
1631
"""Factory function to create TaskMessageService with given repository"""
1732
from src.domain.services.task_message_service import TaskMessageService
@@ -52,16 +67,21 @@ def create_task_service(
5267
event_repository,
5368
agent_acp_service,
5469
redis_stream_repository,
70+
authorization_service=None,
5571
):
56-
"""Factory function to create AgentTaskService with given repositories and services"""
72+
"""Factory function to create AgentTaskService with given repositories and services."""
5773
from src.domain.services.task_service import AgentTaskService
5874

75+
if authorization_service is None:
76+
authorization_service = make_noop_authorization_service()
77+
5978
return AgentTaskService(
6079
task_repository=task_repository,
6180
task_state_repository=task_state_repository,
6281
event_repository=event_repository,
6382
acp_client=agent_acp_service,
6483
stream_repository=redis_stream_repository,
84+
authorization_service=authorization_service,
6585
)
6686

6787

agentex/tests/integration/fixtures/integration_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from src.config.dependencies import GlobalDependencies
2323
from src.config.environment_variables import EnvironmentVariables
2424

25+
from tests.fixtures.services import make_noop_authorization_service
26+
2527

2628
@pytest.fixture(scope="session")
2729
def event_loop():
@@ -447,6 +449,7 @@ async def send_message(self, *args, **kwargs):
447449
task_repository=isolated_repositories["task_repository"],
448450
event_repository=isolated_repositories["event_repository"],
449451
stream_repository=isolated_repositories["redis_stream_repository"],
452+
authorization_service=make_noop_authorization_service(),
450453
)
451454

452455
return TasksUseCase(task_service=task_service)

agentex/tests/integration/test_task_stream.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from src.domain.use_cases.tasks_use_case import TasksUseCase
88
from src.utils.ids import orm_id
99

10+
from tests.fixtures.services import make_noop_authorization_service
11+
1012

1113
@pytest.mark.asyncio
1214
@pytest.mark.integration
@@ -76,6 +78,7 @@ async def send_message(self, *args, **kwargs):
7678
task_repository=isolated_repositories["task_repository"],
7779
event_repository=isolated_repositories["event_repository"],
7880
stream_repository=isolated_repositories["redis_stream_repository"],
81+
authorization_service=make_noop_authorization_service(),
7982
)
8083

8184
return TasksUseCase(task_service=task_service)
@@ -103,6 +106,7 @@ async def send_message(self, *args, **kwargs):
103106
task_repository=isolated_repositories["task_repository"],
104107
event_repository=isolated_repositories["event_repository"],
105108
stream_repository=isolated_repositories["redis_stream_repository"],
109+
authorization_service=make_noop_authorization_service(),
106110
)
107111

108112
environment_variables = EnvironmentVariables.refresh()

agentex/tests/integration/use_cases/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)