Skip to content

Commit 112f22b

Browse files
authored
feat(tasks): register tasks in authorization graph on create/delete (#246)
## Related work Parent epic: [AGX1-264](https://linear.app/scale-epd/issue/AGX1-264) — per-task FGAC. Follow-ups bundled in [AGX1-291](https://linear.app/scale-epd/issue/AGX1-291). This change is part of an 8-PR stack across 3 repos (4 merged, 4 open). | Repo | PR | Purpose | |---|---|---| | scaleapi/scaleapi | ~~scaleapi/scaleapi#144783~~ ✅ merged | sgp-authz — `Action.CANCEL` + `parent` on `register_resource` | | scaleapi/scaleapi | ~~scaleapi/scaleapi#145000~~ ✅ merged | register `FGAC_AGENTEX_AUTH_SPARK` routing flag | | scaleapi/scaleapi | ~~scaleapi/scaleapi#145044~~ ✅ merged | add `cancel` to SGP's `AgentexOperation` enum + role map | | scaleapi/agentex | ~~scaleapi/agentex#353~~ ✅ merged | agentex-auth per-account provider routing + `cancel` op | | scaleapi/scaleapi | scaleapi/scaleapi#145521 | register the `fgac-tasks-dual-write` per-account flag | | scaleapi/agentex | scaleapi/agentex#358 | agentex-auth: gate register/deregister by `fgac-<resource>-dual-write` | | **scaleapi/scale-agentex** | **this PR** | **register tasks in the FGAC authz graph on create/delete** | | scaleapi/scale-agentex | #249 | per-RPC task permission rewire (`update`/`cancel`) + 404/403 wrap | **Merge order:** flag (scaleapi/scaleapi#145521, anytime) → gate (scaleapi/agentex#358) → #246 → enable `fgac-tasks-dual-write` per account → #249 (after the `cancel` enum is live in every SGP env). ## Summary Wires task create/delete into the FGAC authorization graph (the egp-api-backend dual-write pattern). The generic register/deregister plumbing already landed via #260, so this PR is just the task call sites plus tests. Whether the calls actually write to Spark AuthZ is gated per-account in agentex-auth (scaleapi/agentex#358) by the `fgac-tasks-dual-write` flag (scaleapi/scaleapi#145521); this OSS repo always calls through. - **create:** `register_resource(task, parent=agent)` before persisting the row. If the persist fails, a compensating `deregister_resource` cleans up the tuple and the original error re-raises (register-first means a register failure aborts with no row written). - **delete:** delete the row first, then deregister best-effort — failures are logged and swallowed, since Postgres is the source of truth for existence and a tuple on an already-deleted row is invisible to reads. The task id is resolved before the delete so the deregister can't race a name lookup. - `AgentTaskService` now takes `AuthorizationService` via DI. No schema/migration, env var, or retry/metrics helper: those were part of an earlier iteration, dropped now that the plumbing lives in #260 and the gating in agentex-auth. ## Tests `test_task_fgac_dual_write.py`: register-before-persist ordering, compensation on persist failure, deregister-after-delete, deregister-failure swallow, and missing-name no-op. Fixtures updated to pass a no-op authorization service. (Integration tests run in CI; they need Docker testcontainers locally.) Ruff + ruff-format clean. <!-- greptile_comment --> <h3>Greptile Summary</h3> This PR wires task `create` and `delete` into the FGAC authorization graph using the dual-write pattern: `register_resource` fires before the Postgres insert (registration failure aborts cleanly, no row written), a compensating `deregister_resource` runs if the insert subsequently fails, and `delete_task` deregisters best-effort after the row is gone. - `AgentTaskService` now accepts `DAuthorizationService` via DI; all existing callers and test fixtures are updated with a shared `make_noop_authorization_service()` helper. - New integration test class `TestTaskDualWrite` covers register-before-persist ordering, compensation on persist failure, deregister-after-delete, deregister-failure swallow, and missing-name no-op. - Test files also carry a bulk reformat of `assert (expr), msg` → `assert expr, (msg)` for ruff compliance. <details><summary><h3>Confidence Score: 5/5</h3></summary> Safe to merge; the dual-write ordering is correct and all failure modes (register failure, persist failure, deregister failure) are intentionally handled with clear compensation logic. The create/delete authz wiring follows the documented dual-write contract. The compensation on persist failure is well-tested, and the best-effort deregister on delete correctly swallows errors. Observations raised (asyncio cancellation gap, test tier placement) are both acknowledged by the AGX1-291 runbook or are non-blocking style points — neither affects the correctness of the shipped code path. No files require special attention. </details> <h3>Important Files Changed</h3> | Filename | Overview | |----------|----------| | agentex/src/domain/services/task_service.py | Core FGAC dual-write wiring: register-before-persist in create_task with compensation, and best-effort deregister in delete_task; logic and comments are solid. | | agentex/tests/integration/use_cases/test_task_authz_dual_write.py | New integration tests covering register-before-persist, compensation on persist failure, deregister-after-delete, failure swallow, and missing-name no-op; one mock-only test is misclassified in the integration suite. | | agentex/tests/fixtures/services.py | Adds make_noop_authorization_service() shared helper and threads authorization_service into create_task_service factory; clean and consistent. | | agentex/tests/integration/fixtures/integration_client.py | Passes make_noop_authorization_service() to AgentTaskService in isolated_integration_app; minimal and correct change. | | agentex/tests/integration/test_task_stream.py | Adds noop authorization_service to two AgentTaskService constructions; also reformats assert messages to ruff-preferred style. | </details> <details><summary><h3>Sequence Diagram</h3></summary> ```mermaid sequenceDiagram participant C as Caller participant TS as AgentTaskService participant AZ as AuthorizationService participant DB as TaskRepository Note over C,DB: create_task (register-before-persist) C->>TS: create_task(agent, name, ...) TS->>AZ: "register_resource(task, parent=agent)" alt register fails AZ-->>TS: raise TS-->>C: re-raise (no row written) end AZ-->>TS: ok TS->>DB: create(agent_id, task) alt persist fails DB-->>TS: raise TS->>AZ: deregister_resource(task) [compensation] TS-->>C: re-raise original error end DB-->>TS: task_entity TS-->>C: task_entity Note over C,DB: delete_task (best-effort deregister) C->>TS: "delete_task(id | name)" opt name provided, id unknown TS->>DB: "get(name=name)" DB-->>TS: task.id end TS->>DB: delete(id, name) DB-->>TS: ok TS->>AZ: deregister_resource(task_id) [best-effort] alt deregister fails AZ-->>TS: raise TS->>TS: log + swallow end TS-->>C: void ``` </details> <a href="https://app.greptile.com/api/ide/cursor?prompt=Fix%20the%20following%202%20code%20review%20issues.%20Work%20through%20them%20one%20at%20a%20time%2C%20proposing%20concise%20fixes.%0A%0A---%0A%0A%23%23%23%20Issue%201%20of%202%0Aagentex%2Ftests%2Fintegration%2Fuse_cases%2Ftest_task_authz_dual_write.py%3A140-158%0A**Mock-only%20test%20misclassified%20as%20integration**%0A%0A%60test_create_compensates_with_deregister_when_persist_fails%60%20uses%20only%20%60Mock%28%29%60%20%2F%20%60AsyncMock%60%20%E2%80%94%20no%20%60isolated_repositories%60%2C%20no%20DB%20or%20Redis.%20Because%20it%20lives%20inside%20%60%40pytest.mark.integration%60%20%60TestTaskDualWrite%60%2C%20pytest%20includes%20it%20in%20the%20integration%20suite%2C%20requiring%20testcontainers%20to%20be%20running%20before%20it%20can%20execute.%20Moving%20it%20to%20%60agentex%2Ftests%2Funit%2Fuse_cases%2Ftest_task_fgac_dual_write.py%60%20%28or%20similar%29%20would%20let%20it%20run%20in%20the%20fast%20unit%20pass%20and%20give%20earlier%20feedback%20on%20the%20compensation%20logic.%0A%0A%23%23%23%20Issue%202%20of%202%0Aagentex%2Fsrc%2Fdomain%2Fservices%2Ftask_service.py%3A84-93%0A**%60asyncio.CancelledError%60%20bypasses%20compensation%20deregister**%0A%0AIn%20Python%203.8%2B%2C%20%60asyncio.CancelledError%60%20is%20a%20%60BaseException%60%2C%20not%20an%20%60Exception%60.%20If%20the%20incoming%20request%20is%20cancelled%20%28client%20disconnect%2C%20gateway%20timeout%29%20while%20%60task_repository.create%60%20is%20awaiting%20the%20DB%20write%2C%20the%20cancellation%20propagates%20straight%20through%20%60except%20Exception%60%20without%20triggering%20the%20compensation%20%60deregister_resource%60.%20The%20task%20is%20then%20registered%20in%20the%20authz%20graph%20but%20never%20written%20to%20Postgres.%20This%20is%20the%20same%20orphan%20scenario%20covered%20by%20the%20AGX1-291%20runbook%2C%20but%20it%20can%20also%20be%20triggered%20silently%20mid-flight%20%E2%80%94%20worth%20noting%20if%20the%20scan-by-%60creator_user_id%60%20cleanup%20is%20the%20intended%20recovery%20path.%0A%0A&pr=246&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCursorDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCursor.svg?v=3"><img alt="Fix All in Cursor" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCursor.svg?v=3" height="20"></picture></a> <a href="https://app.greptile.com/ide/claude-code?prompt=Fix%20the%20following%202%20code%20review%20issues.%20Work%20through%20them%20one%20at%20a%20time%2C%20proposing%20concise%20fixes.%0A%0A---%0A%0A%23%23%23%20Issue%201%20of%202%0Aagentex%2Ftests%2Fintegration%2Fuse_cases%2Ftest_task_authz_dual_write.py%3A140-158%0A**Mock-only%20test%20misclassified%20as%20integration**%0A%0A%60test_create_compensates_with_deregister_when_persist_fails%60%20uses%20only%20%60Mock%28%29%60%20%2F%20%60AsyncMock%60%20%E2%80%94%20no%20%60isolated_repositories%60%2C%20no%20DB%20or%20Redis.%20Because%20it%20lives%20inside%20%60%40pytest.mark.integration%60%20%60TestTaskDualWrite%60%2C%20pytest%20includes%20it%20in%20the%20integration%20suite%2C%20requiring%20testcontainers%20to%20be%20running%20before%20it%20can%20execute.%20Moving%20it%20to%20%60agentex%2Ftests%2Funit%2Fuse_cases%2Ftest_task_fgac_dual_write.py%60%20%28or%20similar%29%20would%20let%20it%20run%20in%20the%20fast%20unit%20pass%20and%20give%20earlier%20feedback%20on%20the%20compensation%20logic.%0A%0A%23%23%23%20Issue%202%20of%202%0Aagentex%2Fsrc%2Fdomain%2Fservices%2Ftask_service.py%3A84-93%0A**%60asyncio.CancelledError%60%20bypasses%20compensation%20deregister**%0A%0AIn%20Python%203.8%2B%2C%20%60asyncio.CancelledError%60%20is%20a%20%60BaseException%60%2C%20not%20an%20%60Exception%60.%20If%20the%20incoming%20request%20is%20cancelled%20%28client%20disconnect%2C%20gateway%20timeout%29%20while%20%60task_repository.create%60%20is%20awaiting%20the%20DB%20write%2C%20the%20cancellation%20propagates%20straight%20through%20%60except%20Exception%60%20without%20triggering%20the%20compensation%20%60deregister_resource%60.%20The%20task%20is%20then%20registered%20in%20the%20authz%20graph%20but%20never%20written%20to%20Postgres.%20This%20is%20the%20same%20orphan%20scenario%20covered%20by%20the%20AGX1-291%20runbook%2C%20but%20it%20can%20also%20be%20triggered%20silently%20mid-flight%20%E2%80%94%20worth%20noting%20if%20the%20scan-by-%60creator_user_id%60%20cleanup%20is%20the%20intended%20recovery%20path.%0A%0A&repo=scaleapi%2Fscale-agentex&pr=246&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInClaudeDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInClaude.svg?v=3"><img alt="Fix All in Claude Code" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInClaude.svg?v=3" height="20"></picture></a> <a href="https://chatgpt.com/codex/deeplink?prompt=IMPORTANT%3A%20Work%20in%20the%20repository%20%22scaleapi%2Fscale-agentex%22%20on%20the%20existing%20branch%20%22asher.fink%2Fagx1-274-task-dual-write%22.%20Checkout%20that%20branch%20%E2%80%94%20do%20NOT%20create%20a%20new%20branch%20or%20open%20a%20new%20PR.%20Push%20your%20changes%20to%20%22asher.fink%2Fagx1-274-task-dual-write%22.%0A%0AFix%20the%20following%202%20code%20review%20issues.%20Work%20through%20them%20one%20at%20a%20time%2C%20proposing%20concise%20fixes.%0A%0A---%0A%0A%23%23%23%20Issue%201%20of%202%0Aagentex%2Ftests%2Fintegration%2Fuse_cases%2Ftest_task_authz_dual_write.py%3A140-158%0A**Mock-only%20test%20misclassified%20as%20integration**%0A%0A%60test_create_compensates_with_deregister_when_persist_fails%60%20uses%20only%20%60Mock%28%29%60%20%2F%20%60AsyncMock%60%20%E2%80%94%20no%20%60isolated_repositories%60%2C%20no%20DB%20or%20Redis.%20Because%20it%20lives%20inside%20%60%40pytest.mark.integration%60%20%60TestTaskDualWrite%60%2C%20pytest%20includes%20it%20in%20the%20integration%20suite%2C%20requiring%20testcontainers%20to%20be%20running%20before%20it%20can%20execute.%20Moving%20it%20to%20%60agentex%2Ftests%2Funit%2Fuse_cases%2Ftest_task_fgac_dual_write.py%60%20%28or%20similar%29%20would%20let%20it%20run%20in%20the%20fast%20unit%20pass%20and%20give%20earlier%20feedback%20on%20the%20compensation%20logic.%0A%0A%23%23%23%20Issue%202%20of%202%0Aagentex%2Fsrc%2Fdomain%2Fservices%2Ftask_service.py%3A84-93%0A**%60asyncio.CancelledError%60%20bypasses%20compensation%20deregister**%0A%0AIn%20Python%203.8%2B%2C%20%60asyncio.CancelledError%60%20is%20a%20%60BaseException%60%2C%20not%20an%20%60Exception%60.%20If%20the%20incoming%20request%20is%20cancelled%20%28client%20disconnect%2C%20gateway%20timeout%29%20while%20%60task_repository.create%60%20is%20awaiting%20the%20DB%20write%2C%20the%20cancellation%20propagates%20straight%20through%20%60except%20Exception%60%20without%20triggering%20the%20compensation%20%60deregister_resource%60.%20The%20task%20is%20then%20registered%20in%20the%20authz%20graph%20but%20never%20written%20to%20Postgres.%20This%20is%20the%20same%20orphan%20scenario%20covered%20by%20the%20AGX1-291%20runbook%2C%20but%20it%20can%20also%20be%20triggered%20silently%20mid-flight%20%E2%80%94%20worth%20noting%20if%20the%20scan-by-%60creator_user_id%60%20cleanup%20is%20the%20intended%20recovery%20path.%0A%0A"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCodexDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCodex.svg?v=3"><img alt="Fix All in Codex" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCodex.svg?v=3" height="20"></picture></a> <details><summary>Prompt To Fix All With AI</summary> `````markdown Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes. --- ### Issue 1 of 2 agentex/tests/integration/use_cases/test_task_authz_dual_write.py:140-158 **Mock-only test misclassified as integration** `test_create_compensates_with_deregister_when_persist_fails` uses only `Mock()` / `AsyncMock` — no `isolated_repositories`, no DB or Redis. Because it lives inside `@pytest.mark.integration` `TestTaskDualWrite`, pytest includes it in the integration suite, requiring testcontainers to be running before it can execute. Moving it to `agentex/tests/unit/use_cases/test_task_fgac_dual_write.py` (or similar) would let it run in the fast unit pass and give earlier feedback on the compensation logic. ### Issue 2 of 2 agentex/src/domain/services/task_service.py:84-93 **`asyncio.CancelledError` bypasses compensation deregister** In Python 3.8+, `asyncio.CancelledError` is a `BaseException`, not an `Exception`. If the incoming request is cancelled (client disconnect, gateway timeout) while `task_repository.create` is awaiting the DB write, the cancellation propagates straight through `except Exception` without triggering the compensation `deregister_resource`. The task is then registered in the authz graph but never written to Postgres. This is the same orphan scenario covered by the AGX1-291 runbook, but it can also be triggered silently mid-flight — worth noting if the scan-by-`creator_user_id` cleanup is the intended recovery path. ````` </details> <sub>Reviews (8): Last reviewed commit: ["feat(tasks): register tasks in authoriza..."](6c869dc) | [Re-trigger Greptile](https://app.greptile.com/api/retrigger?id=34256125)</sub> <!-- /greptile_comment -->
1 parent b6e6004 commit 112f22b

9 files changed

Lines changed: 466 additions & 165 deletions

File tree

agentex/src/domain/services/task_service.py

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
from fastapi import Depends
55

6+
from src.adapters.crud_store.exceptions import ItemDoesNotExist
67
from src.adapters.streams.adapter_redis import DRedisStreamRepository
8+
from src.api.schemas.authorization_types import AgentexResource
79
from src.domain.entities.agents import ACPType, AgentEntity
810
from src.domain.entities.events import EventEntity
911
from src.domain.entities.task_message_updates import TaskMessageUpdateEntity
@@ -14,6 +16,7 @@
1416
from src.domain.repositories.task_repository import DTaskRepository
1517
from src.domain.repositories.task_state_repository import DTaskStateRepository
1618
from src.domain.services.agent_acp_service import DAgentACPService
19+
from src.domain.services.authorization_service import DAuthorizationService
1720
from src.utils.ids import orm_id
1821
from src.utils.logging import make_logger
1922
from src.utils.stream_topics import get_task_event_stream_topic
@@ -33,12 +36,14 @@ def __init__(
3336
task_repository: DTaskRepository,
3437
event_repository: DEventRepository,
3538
stream_repository: DRedisStreamRepository,
39+
authorization_service: DAuthorizationService,
3640
):
3741
self.acp_client = acp_client
3842
self.task_state_repository = task_state_repository
3943
self.task_repository = task_repository
4044
self.event_repository = event_repository
4145
self.stream_repository = stream_repository
46+
self.authorization_service = authorization_service
4247

4348
async def create_task(
4449
self,
@@ -59,19 +64,33 @@ async def create_task(
5964
Returns:
6065
Task containing the created task info
6166
"""
62-
63-
task_entity = await self.task_repository.create(
64-
agent_id=agent.id,
65-
task=TaskEntity(
66-
id=orm_id(),
67-
name=task_name,
68-
status=TaskStatus.RUNNING,
69-
status_reason="Task created, forwarding to ACP server",
70-
params=task_params,
71-
task_metadata=task_metadata,
72-
),
67+
# Register in the authorization service before persisting: a registration
68+
# failure aborts the request with no orphaned row. If the persist fails
69+
# after a successful registration, the compensating deregister_resource
70+
# below prevents a dangling authorization entry. Both calls are no-ops
71+
# when the authorization service is disabled for this account.
72+
task_entity = TaskEntity(
73+
id=orm_id(),
74+
name=task_name,
75+
status=TaskStatus.RUNNING,
76+
status_reason="Task created, forwarding to ACP server",
77+
params=task_params,
78+
task_metadata=task_metadata,
79+
)
80+
await self.authorization_service.register_resource(
81+
AgentexResource.task(task_entity.id),
82+
parent=AgentexResource.agent(agent.id),
7383
)
74-
return task_entity
84+
try:
85+
return await self.task_repository.create(
86+
agent_id=agent.id,
87+
task=task_entity,
88+
)
89+
except Exception:
90+
await self.authorization_service.deregister_resource(
91+
AgentexResource.task(task_entity.id),
92+
)
93+
raise
7594

7695
async def create_task_and_forward_to_acp(
7796
self,
@@ -91,7 +110,9 @@ async def create_task_and_forward_to_acp(
91110
Task containing the created task info
92111
"""
93112
task_entity = await self.create_task(
94-
agent=agent, task_name=task_name, task_params=task_params
113+
agent=agent,
114+
task_name=task_name,
115+
task_params=task_params,
95116
)
96117

97118
if agent.acp_type == ACPType.SYNC:
@@ -214,8 +235,35 @@ async def delete_task(self, id: str | None = None, name: str | None = None) -> N
214235
"""
215236
Delete a task from the repository.
216237
"""
238+
# Delete first (Postgres is the source of truth for existence), then
239+
# deregister best-effort: a deregister failure is logged and swallowed
240+
# rather than failing a delete that already succeeded.
241+
# Resolve the id before the delete so we can pass it to deregister_resource;
242+
# looking it up by name afterwards would race. If the name doesn't resolve,
243+
# swallow ItemDoesNotExist and let delete() surface its own native error
244+
# so the missing-task error contract is unchanged.
245+
task_id_for_deregister: str | None = id
246+
if task_id_for_deregister is None and name is not None:
247+
try:
248+
task = await self.task_repository.get(name=name)
249+
task_id_for_deregister = task.id
250+
except ItemDoesNotExist:
251+
task_id_for_deregister = None
252+
217253
await self.task_repository.delete(id=id, name=name)
218254

255+
if task_id_for_deregister is not None:
256+
try:
257+
await self.authorization_service.deregister_resource(
258+
AgentexResource.task(task_id_for_deregister),
259+
)
260+
except Exception:
261+
logger.exception(
262+
"task authorization deregister failed for task %s after successful delete; "
263+
"the deregistration failure has been swallowed",
264+
task_id_for_deregister,
265+
)
266+
219267
async def list_tasks(
220268
self,
221269
*,

agentex/tests/fixtures/services.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Provides factory functions and specific fixtures for creating services with test repositories.
44
"""
55

6-
from unittest.mock import MagicMock, Mock
6+
from unittest.mock import AsyncMock, MagicMock, Mock
77

88
import pytest
99

@@ -12,6 +12,24 @@
1212
# =============================================================================
1313

1414

15+
def make_noop_authorization_service() -> Mock:
16+
"""Shared noop AuthorizationService mock for tests that don't exercise authz.
17+
18+
``principal_context`` is ``None``, and
19+
``grant``/``revoke``/``register_resource``/``deregister_resource`` are async
20+
no-ops returning ``None`` — matching the real service signature. Use this
21+
anywhere a test just needs to construct ``AgentTaskService`` without caring
22+
about authorization behavior.
23+
"""
24+
svc = Mock()
25+
svc.principal_context = None
26+
svc.grant = AsyncMock(return_value=None)
27+
svc.revoke = AsyncMock(return_value=None)
28+
svc.register_resource = AsyncMock(return_value=None)
29+
svc.deregister_resource = AsyncMock(return_value=None)
30+
return svc
31+
32+
1533
def create_task_message_service(task_message_repository):
1634
"""Factory function to create TaskMessageService with given repository"""
1735
from src.domain.services.task_message_service import TaskMessageService
@@ -52,16 +70,21 @@ def create_task_service(
5270
event_repository,
5371
agent_acp_service,
5472
redis_stream_repository,
73+
authorization_service=None,
5574
):
56-
"""Factory function to create AgentTaskService with given repositories and services"""
75+
"""Factory function to create AgentTaskService with given repositories and services."""
5776
from src.domain.services.task_service import AgentTaskService
5877

78+
if authorization_service is None:
79+
authorization_service = make_noop_authorization_service()
80+
5981
return AgentTaskService(
6082
task_repository=task_repository,
6183
task_state_repository=task_state_repository,
6284
event_repository=event_repository,
6385
acp_client=agent_acp_service,
6486
stream_repository=redis_stream_repository,
87+
authorization_service=authorization_service,
6588
)
6689

6790

agentex/tests/integration/fixtures/integration_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from src.config.dependencies import GlobalDependencies
2323
from src.config.environment_variables import EnvironmentVariables
2424

25+
from tests.fixtures.services import make_noop_authorization_service
26+
2527

2628
@pytest.fixture(scope="session")
2729
def event_loop():
@@ -455,6 +457,7 @@ async def send_message(self, *args, **kwargs):
455457
task_repository=isolated_repositories["task_repository"],
456458
event_repository=isolated_repositories["event_repository"],
457459
stream_repository=isolated_repositories["redis_stream_repository"],
460+
authorization_service=make_noop_authorization_service(),
458461
)
459462

460463
return TasksUseCase(task_service=task_service)

agentex/tests/integration/test_task_stream.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from src.domain.use_cases.tasks_use_case import TasksUseCase
88
from src.utils.ids import orm_id
99

10+
from tests.fixtures.services import make_noop_authorization_service
11+
1012

1113
@pytest.mark.asyncio
1214
@pytest.mark.integration
@@ -76,6 +78,7 @@ async def send_message(self, *args, **kwargs):
7678
task_repository=isolated_repositories["task_repository"],
7779
event_repository=isolated_repositories["event_repository"],
7880
stream_repository=isolated_repositories["redis_stream_repository"],
81+
authorization_service=make_noop_authorization_service(),
7982
)
8083

8184
return TasksUseCase(task_service=task_service)
@@ -103,6 +106,7 @@ async def send_message(self, *args, **kwargs):
103106
task_repository=isolated_repositories["task_repository"],
104107
event_repository=isolated_repositories["event_repository"],
105108
stream_repository=isolated_repositories["redis_stream_repository"],
109+
authorization_service=make_noop_authorization_service(),
106110
)
107111

108112
environment_variables = EnvironmentVariables.refresh()
@@ -194,17 +198,17 @@ async def collect_stream_events():
194198
pass
195199

196200
# Then - Verify the stream event was received
197-
assert (
198-
len(stream_events) >= 1
199-
), f"Expected at least 1 stream event, got {len(stream_events)}"
201+
assert len(stream_events) >= 1, (
202+
f"Expected at least 1 stream event, got {len(stream_events)}"
203+
)
200204

201205
# Find the task_updated event
202206
task_updated_events = [
203207
e for e in stream_events if e.get("type") == "task_updated"
204208
]
205-
assert (
206-
len(task_updated_events) >= 1
207-
), f"Expected task_updated event, got events: {[e.get('type') for e in stream_events]}"
209+
assert len(task_updated_events) >= 1, (
210+
f"Expected task_updated event, got events: {[e.get('type') for e in stream_events]}"
211+
)
208212

209213
task_updated_event = task_updated_events[0]
210214

@@ -389,9 +393,9 @@ async def collect_stream_events():
389393
task_updated_events = [
390394
e for e in stream_events if e.get("type") == "task_updated"
391395
]
392-
assert (
393-
len(task_updated_events) >= 3
394-
), f"Expected at least 3 task_updated events, got {len(task_updated_events)}"
396+
assert len(task_updated_events) >= 3, (
397+
f"Expected at least 3 task_updated events, got {len(task_updated_events)}"
398+
)
395399

396400
# Verify each event has the correct metadata for its update
397401
versions = [
@@ -599,8 +603,8 @@ async def collect_stream_data():
599603
pass
600604

601605
# Then - Verify we received at least 2 pings
602-
assert (
603-
ping_count >= 2
604-
), f"Expected at least 2 ping messages during idle period, got {ping_count}"
606+
assert ping_count >= 2, (
607+
f"Expected at least 2 ping messages during idle period, got {ping_count}"
608+
)
605609

606610
print(f"✅ Stream sent {ping_count} keepalive pings during idle period")

agentex/tests/integration/use_cases/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)