Skip to content

Commit 13fe4b2

Browse files
committed
feat(AGX1-274): dual-write tasks to spark-authz
1 parent e1fb515 commit 13fe4b2

16 files changed

Lines changed: 521 additions & 9 deletions
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""add_task_creator_and_zedtoken
2+
3+
Revision ID: a1f73ada66c5
4+
Revises: a9959ebcbe98
5+
Create Date: 2026-05-21 15:08:51.441535
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = 'a1f73ada66c5'
16+
down_revision: Union[str, None] = 'a9959ebcbe98'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
op.add_column('tasks', sa.Column('creator_user_id', sa.String(), nullable=True))
23+
op.add_column('tasks', sa.Column('creator_service_account_id', sa.String(), nullable=True))
24+
op.add_column('tasks', sa.Column('spark_authz_zedtoken', sa.Text(), nullable=True))
25+
with op.get_context().autocommit_block():
26+
op.execute(
27+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_user_id "
28+
"ON tasks (creator_user_id)"
29+
)
30+
op.execute(
31+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_service_account_id "
32+
"ON tasks (creator_service_account_id)"
33+
)
34+
op.create_check_constraint(
35+
'ck_tasks_one_creator',
36+
'tasks',
37+
'(creator_user_id IS NULL) OR (creator_service_account_id IS NULL)',
38+
)
39+
40+
41+
def downgrade() -> None:
42+
op.drop_constraint('ck_tasks_one_creator', 'tasks', type_='check')
43+
with op.get_context().autocommit_block():
44+
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_service_account_id")
45+
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_user_id")
46+
op.drop_column('tasks', 'spark_authz_zedtoken')
47+
op.drop_column('tasks', 'creator_service_account_id')
48+
op.drop_column('tasks', 'creator_user_id')

agentex/database/migrations/migration_history.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
9ff3ee32c81b -> e9c4ff9e6542 (head), add_tasks_metadata_gin_index
1+
a9959ebcbe98 -> a1f73ada66c5 (head), add_task_creator_and_zedtoken
2+
e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id
3+
9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index
24
57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels
35
4a9b7787ccd7 -> 57c5ed4f59ae, add_task_id_to_spans
46
d1a6cde41b3f -> 4a9b7787ccd7, deployments

agentex/src/adapters/orm.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ class TaskORM(BaseORM):
7474
)
7575
params = Column(JSONB, nullable=True)
7676
task_metadata = Column(JSONB, nullable=True)
77+
creator_user_id = Column(String, nullable=True, index=True)
78+
creator_service_account_id = Column(String, nullable=True, index=True)
79+
spark_authz_zedtoken = Column(Text, nullable=True)
7780
# Many-to-Many relationship with agents
7881
agents = relationship("AgentORM", secondary="task_agents", back_populates="tasks")
7982

agentex/src/domain/entities/tasks.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,18 @@ class TaskEntity(BaseModel):
5858
None,
5959
title="Task metadata",
6060
)
61+
creator_user_id: str | None = Field(
62+
None,
63+
title="Identity ID of the user who created this task (granted as FGAC owner)",
64+
)
65+
creator_service_account_id: str | None = Field(
66+
None,
67+
title="Service identity ID of the service account that created this task",
68+
)
69+
spark_authz_zedtoken: str | None = Field(
70+
None,
71+
title="ZedToken from the Spark AuthZ grant for new-write isolation",
72+
)
6173

6274
# allow extra fields for agents relationships
6375
model_config = ConfigDict(extra="allow")

agentex/src/domain/services/task_service.py

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from fastapi import Depends
55

66
from src.adapters.streams.adapter_redis import DRedisStreamRepository
7+
from src.api.schemas.authorization_types import AgentexResource
78
from src.domain.entities.agents import ACPType, AgentEntity
89
from src.domain.entities.events import EventEntity
910
from src.domain.entities.task_message_updates import TaskMessageUpdateEntity
@@ -14,6 +15,8 @@
1415
from src.domain.repositories.task_repository import DTaskRepository
1516
from src.domain.repositories.task_state_repository import DTaskStateRepository
1617
from src.domain.services.agent_acp_service import DAgentACPService
18+
from src.domain.services.authorization_service import DAuthorizationService
19+
from src.utils.feature_flags import DFeatureFlagProvider, FeatureFlagName
1720
from src.utils.ids import orm_id
1821
from src.utils.logging import make_logger
1922
from src.utils.stream_topics import get_task_event_stream_topic
@@ -33,19 +36,24 @@ def __init__(
3336
task_repository: DTaskRepository,
3437
event_repository: DEventRepository,
3538
stream_repository: DRedisStreamRepository,
39+
authorization_service: DAuthorizationService,
40+
feature_flags: DFeatureFlagProvider,
3641
):
3742
self.acp_client = acp_client
3843
self.task_state_repository = task_state_repository
3944
self.task_repository = task_repository
4045
self.event_repository = event_repository
4146
self.stream_repository = stream_repository
47+
self.authorization_service = authorization_service
48+
self.feature_flags = feature_flags
4249

4350
async def create_task(
4451
self,
4552
agent: AgentEntity,
4653
task_name: str | None = None,
4754
task_params: dict[str, Any] | None = None,
4855
task_metadata: dict[str, Any] | None = None,
56+
account_id: str | None = None,
4957
) -> TaskEntity:
5058
"""
5159
Create a new task record in the repository with single agent (maintains existing interface).
@@ -56,28 +64,107 @@ async def create_task(
5664
task_params: The parameters for the task
5765
task_metadata: Caller-provided metadata to persist on the task row.
5866
Not forwarded to the agent.
67+
account_id: Caller-resolved account scope. When provided and the
68+
FGAC_TASKS_DUAL_WRITE flag is enabled for it, the task is also
69+
registered in Spark AuthZ.
5970
Returns:
6071
Task containing the created task info
6172
"""
73+
principal_context = self.authorization_service.principal_context
74+
creator_user_id = getattr(principal_context, "user_id", None)
75+
creator_service_account_id = getattr(
76+
principal_context, "service_account_id", None
77+
)
78+
79+
task_id = orm_id()
80+
zedtoken: str | None = None
81+
82+
if self.feature_flags.is_enabled(
83+
FeatureFlagName.FGAC_TASKS_DUAL_WRITE, account_id
84+
):
85+
zedtoken = await self._register_task_in_spark_authz(
86+
task_id=task_id,
87+
account_id=account_id,
88+
creator_user_id=creator_user_id,
89+
creator_service_account_id=creator_service_account_id,
90+
)
6291

6392
task_entity = await self.task_repository.create(
6493
agent_id=agent.id,
6594
task=TaskEntity(
66-
id=orm_id(),
95+
id=task_id,
6796
name=task_name,
6897
status=TaskStatus.RUNNING,
6998
status_reason="Task created, forwarding to ACP server",
7099
params=task_params,
71100
task_metadata=task_metadata,
101+
creator_user_id=creator_user_id,
102+
creator_service_account_id=creator_service_account_id,
103+
spark_authz_zedtoken=zedtoken,
72104
),
73105
)
74106
return task_entity
75107

108+
async def _register_task_in_spark_authz(
109+
self,
110+
*,
111+
task_id: str,
112+
account_id: str | None,
113+
creator_user_id: str | None,
114+
creator_service_account_id: str | None,
115+
) -> str | None:
116+
"""Register a new task in Spark AuthZ with creator as owner.
117+
118+
Called BEFORE the Postgres write — a failure raises and prevents the
119+
row from being persisted, so there is no compensating action to take.
120+
Mirrors the KB FGAC pattern at
121+
``packages/egp-api-backend/.../knowledge_base_v2_use_case.py:374-388``.
122+
123+
The current ``Provider.spark`` adapter returns ``{}`` from ``grant``;
124+
no ZedToken is surfaced today, so we always return ``None`` for the
125+
new-write-isolation column. A follow-up will plumb the token through
126+
once the adapter exposes it.
127+
"""
128+
if creator_user_id is None and creator_service_account_id is None:
129+
logger.warning(
130+
"Skipping Spark AuthZ task registration: no creator resolvable",
131+
extra={"task_id": task_id, "account_id": account_id},
132+
)
133+
return None
134+
await self.authorization_service.grant(
135+
resource=AgentexResource.task(task_id),
136+
)
137+
return None
138+
139+
async def deregister_task_from_spark_authz(
140+
self, *, task_id: str, account_id: str | None
141+
) -> None:
142+
"""Best-effort revocation of a task's Spark AuthZ tuples on delete.
143+
144+
Only invoked when the FGAC_TASKS_DUAL_WRITE flag is enabled for the
145+
caller's account. Failures are logged but do not block the delete.
146+
"""
147+
if not self.feature_flags.is_enabled(
148+
FeatureFlagName.FGAC_TASKS_DUAL_WRITE, account_id
149+
):
150+
return
151+
try:
152+
await self.authorization_service.revoke(
153+
resource=AgentexResource.task(task_id),
154+
)
155+
except Exception:
156+
logger.warning(
157+
"Spark AuthZ revoke failed for task",
158+
extra={"task_id": task_id, "account_id": account_id},
159+
exc_info=True,
160+
)
161+
76162
async def create_task_and_forward_to_acp(
77163
self,
78164
agent: AgentEntity,
79165
task_name: str | None = None,
80166
task_params: dict[str, Any] | None = None,
167+
account_id: str | None = None,
81168
) -> TaskEntity:
82169
"""
83170
Create a new task record in the repository with single agent (maintains existing interface).
@@ -86,12 +173,17 @@ async def create_task_and_forward_to_acp(
86173
Args:
87174
agent: The agent to create the task for
88175
task_params: The parameters for the task to be sent to the ACP server
176+
account_id: Caller-resolved account scope; threaded through to
177+
:meth:`create_task` for FGAC dual-write gating.
89178
90179
Returns:
91180
Task containing the created task info
92181
"""
93182
task_entity = await self.create_task(
94-
agent=agent, task_name=task_name, task_params=task_params
183+
agent=agent,
184+
task_name=task_name,
185+
task_params=task_params,
186+
account_id=account_id,
95187
)
96188

97189
if agent.acp_type == ACPType.SYNC:

agentex/src/domain/use_cases/agents_acp_use_case.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ async def _get_or_create_task(
268268
task_name: str | None = None,
269269
task_params: dict[str, Any] | None = None,
270270
task_metadata: dict[str, Any] | None = None,
271+
account_id: str | None = None,
271272
) -> TaskEntity:
272273
"""Return the existing task if *task_id* is provided, otherwise create a new one.
273274
@@ -308,6 +309,7 @@ async def _get_or_create_task(
308309
task_name=task_name,
309310
task_params=task_params,
310311
task_metadata=task_metadata,
312+
account_id=account_id,
311313
)
312314
logger.info(f"[agent_id={agent.id}] Created task {task.id}")
313315
await self.grant_with_retry(task)
@@ -419,6 +421,9 @@ async def _handle_task_create(
419421
task_name=params.name,
420422
task_params=params.params,
421423
task_metadata=params.task_metadata,
424+
account_id=getattr(
425+
self.authorization_service.principal_context, "account_id", None
426+
),
422427
)
423428

424429
if agent.acp_type in [ACPType.AGENTIC, ACPType.ASYNC]:
@@ -457,6 +462,9 @@ async def _handle_message_send_sync(
457462
task_id=params.task_id,
458463
task_name=params.task_name,
459464
task_params=params.task_params,
465+
account_id=getattr(
466+
self.authorization_service.principal_context, "account_id", None
467+
),
460468
)
461469

462470
# Step 1: Insert the message in the messages table
@@ -642,6 +650,9 @@ async def flush_aggregated_deltas(task_message_index: int) -> TaskMessageEntity:
642650
task_id=params.task_id,
643651
task_name=params.task_name,
644652
task_params=params.task_params,
653+
account_id=getattr(
654+
self.authorization_service.principal_context, "account_id", None
655+
),
645656
)
646657

647658
# Append the input client message

agentex/src/domain/use_cases/tasks_use_case.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ async def delete_task(self, id: str | None = None, name: str | None = None) -> N
6060
task.status = TaskStatus.DELETED
6161
task.status_reason = "Task deleted successfully"
6262
await self.task_service.update_task(task=task)
63+
account_id = getattr(
64+
self.task_service.authorization_service.principal_context,
65+
"account_id",
66+
None,
67+
)
68+
await self.task_service.deregister_task_from_spark_authz(
69+
task_id=task.id, account_id=account_id
70+
)
6371

6472
async def list_tasks(
6573
self,

agentex/src/utils/feature_flags.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import os
2+
from enum import StrEnum
3+
from typing import Annotated
4+
5+
from fastapi import Depends
6+
7+
8+
class FeatureFlagName(StrEnum):
9+
FGAC_TASKS = "fgac-tasks"
10+
FGAC_TASKS_DUAL_WRITE = "fgac-tasks-dual-write"
11+
12+
13+
class FeatureFlagProvider:
14+
"""Per-account feature flag provider.
15+
16+
v1: env-var allowlist (per-account, comma-separated). The env var name is
17+
derived from the flag name, e.g. ``FGAC_TASKS_DUAL_WRITE_ACCOUNTS``. A
18+
follow-up will swap this for LaunchDarkly with an account_id context.
19+
"""
20+
21+
def is_enabled(self, name: FeatureFlagName, account_id: str | None) -> bool:
22+
if not account_id:
23+
return False
24+
env_key = f"{name.value.upper().replace('-', '_')}_ACCOUNTS"
25+
allowed = os.environ.get(env_key, "")
26+
return account_id in {a.strip() for a in allowed.split(",") if a.strip()}
27+
28+
29+
DFeatureFlagProvider = Annotated[FeatureFlagProvider, Depends(FeatureFlagProvider)]

agentex/tests/fixtures/services.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Provides factory functions and specific fixtures for creating services with test repositories.
44
"""
55

6-
from unittest.mock import MagicMock, Mock
6+
from unittest.mock import AsyncMock, MagicMock, Mock
77

88
import pytest
99

@@ -52,16 +52,29 @@ def create_task_service(
5252
event_repository,
5353
agent_acp_service,
5454
redis_stream_repository,
55+
authorization_service=None,
56+
feature_flags=None,
5557
):
56-
"""Factory function to create AgentTaskService with given repositories and services"""
58+
"""Factory function to create AgentTaskService with given repositories and services."""
5759
from src.domain.services.task_service import AgentTaskService
60+
from src.utils.feature_flags import FeatureFlagProvider
61+
62+
if authorization_service is None:
63+
authorization_service = Mock()
64+
authorization_service.principal_context = None
65+
authorization_service.grant = AsyncMock(return_value=None)
66+
authorization_service.revoke = AsyncMock(return_value=None)
67+
if feature_flags is None:
68+
feature_flags = FeatureFlagProvider()
5869

5970
return AgentTaskService(
6071
task_repository=task_repository,
6172
task_state_repository=task_state_repository,
6273
event_repository=event_repository,
6374
acp_client=agent_acp_service,
6475
stream_repository=redis_stream_repository,
76+
authorization_service=authorization_service,
77+
feature_flags=feature_flags,
6578
)
6679

6780

0 commit comments

Comments
 (0)