|
5 | 5 | from temporalio.client import ScheduleActionStartWorkflow, ScheduleDescription |
6 | 6 |
|
7 | 7 | from src.adapters.temporal.adapter_temporal import DTemporalAdapter |
| 8 | +from src.api.schemas.authorization_types import AgentexResource |
8 | 9 | from src.api.schemas.schedules import ( |
9 | 10 | CreateScheduleRequest, |
10 | 11 | ScheduleActionInfo, |
|
15 | 16 | ScheduleState, |
16 | 17 | ) |
17 | 18 | from src.domain.entities.agents import AgentEntity |
| 19 | +from src.domain.services.authorization_service import DAuthorizationService |
18 | 20 | from src.utils.logging import make_logger |
19 | 21 |
|
20 | 22 | logger = make_logger(__name__) |
@@ -44,8 +46,10 @@ class ScheduleService: |
44 | 46 | def __init__( |
45 | 47 | self, |
46 | 48 | temporal_adapter: DTemporalAdapter, |
| 49 | + authorization_service: DAuthorizationService, |
47 | 50 | ): |
48 | 51 | self.temporal_adapter = temporal_adapter |
| 52 | + self.authorization_service = authorization_service |
49 | 53 |
|
50 | 54 | async def create_schedule( |
51 | 55 | self, |
@@ -80,23 +84,105 @@ async def create_schedule( |
80 | 84 | else None |
81 | 85 | ) |
82 | 86 |
|
83 | | - await self.temporal_adapter.create_schedule( |
84 | | - schedule_id=schedule_id, |
85 | | - workflow=request.workflow_name, |
86 | | - workflow_id=workflow_id_prefix, |
87 | | - args=args, |
88 | | - task_queue=request.task_queue, |
89 | | - cron_expressions=cron_expressions, |
90 | | - interval_seconds=request.interval_seconds, |
91 | | - execution_timeout=execution_timeout, |
92 | | - start_at=request.start_at, |
93 | | - end_at=request.end_at, |
94 | | - paused=request.paused, |
| 87 | + # Schedules have no Postgres row — Temporal is the store and the auth |
| 88 | + # selector is the schedule id ({agent_id}--{schedule_name}). Register |
| 89 | + # the auth tuple (with the parent_agent edge) BEFORE the Temporal write |
| 90 | + # (fail-closed), and compensate if the Temporal create fails so we never |
| 91 | + # leave an orphan tuple. The read-back below is intentionally OUTSIDE |
| 92 | + # the compensation scope: a describe failure must not deregister a |
| 93 | + # schedule that was actually created. |
| 94 | + registered = await self._register_schedule_in_auth( |
| 95 | + schedule_id=schedule_id, agent_id=agent.id |
95 | 96 | ) |
| 97 | + try: |
| 98 | + await self.temporal_adapter.create_schedule( |
| 99 | + schedule_id=schedule_id, |
| 100 | + workflow=request.workflow_name, |
| 101 | + workflow_id=workflow_id_prefix, |
| 102 | + args=args, |
| 103 | + task_queue=request.task_queue, |
| 104 | + cron_expressions=cron_expressions, |
| 105 | + interval_seconds=request.interval_seconds, |
| 106 | + execution_timeout=execution_timeout, |
| 107 | + start_at=request.start_at, |
| 108 | + end_at=request.end_at, |
| 109 | + paused=request.paused, |
| 110 | + ) |
| 111 | + except Exception: |
| 112 | + # Orphan-tuple guard: the tuple was written but the schedule never |
| 113 | + # landed in Temporal. Best-effort compensating deregister, then |
| 114 | + # re-raise the original error. |
| 115 | + if registered: |
| 116 | + await self._deregister_schedule_from_auth(schedule_id=schedule_id) |
| 117 | + raise |
96 | 118 |
|
97 | 119 | # Fetch and return the created schedule |
98 | 120 | return await self.get_schedule(agent.id, request.name) |
99 | 121 |
|
| 122 | + async def _register_schedule_in_auth( |
| 123 | + self, *, schedule_id: str, agent_id: str |
| 124 | + ) -> bool: |
| 125 | + """Register a new agent_schedule with the auth service, including the |
| 126 | + parent_agent edge so permissions cascade from the owning agent. |
| 127 | +
|
| 128 | + Called BEFORE the Temporal write — a failure raises and prevents the |
| 129 | + schedule from being created. Skipped with a warning when no usable |
| 130 | + creator identity is available on the principal context (e.g. |
| 131 | + agent-bypass / internal paths without an authenticated user); this is |
| 132 | + the interim behavior until on-behalf-of-user identity is threaded. |
| 133 | +
|
| 134 | + Returns True when a tuple was actually registered (so the caller knows |
| 135 | + whether a compensating deregister is warranted), False when skipped. |
| 136 | + """ |
| 137 | + principal_context = self.authorization_service.principal_context |
| 138 | + user_id = getattr(principal_context, "user_id", None) |
| 139 | + service_account_id = getattr(principal_context, "service_account_id", None) |
| 140 | + if user_id is None and service_account_id is None: |
| 141 | + logger.warning( |
| 142 | + "Skipping auth registration for schedule: no creator resolvable", |
| 143 | + extra={"schedule_id": schedule_id, "agent_id": agent_id}, |
| 144 | + ) |
| 145 | + return False |
| 146 | + try: |
| 147 | + await self.authorization_service.register_resource( |
| 148 | + resource=AgentexResource.schedule(schedule_id), |
| 149 | + parent=AgentexResource.agent(agent_id), |
| 150 | + ) |
| 151 | + except Exception as exc: |
| 152 | + # Fail closed: log + re-raise so the schedule is never created. |
| 153 | + logger.exception( |
| 154 | + "Auth register_resource failed for agent_schedule; aborting create", |
| 155 | + extra={ |
| 156 | + "schedule_id": schedule_id, |
| 157 | + "agent_id": agent_id, |
| 158 | + "error_type": type(exc).__name__, |
| 159 | + }, |
| 160 | + ) |
| 161 | + raise |
| 162 | + return True |
| 163 | + |
| 164 | + async def _deregister_schedule_from_auth(self, *, schedule_id: str) -> None: |
| 165 | + """Best-effort deregistration of a schedule's auth tuples. |
| 166 | +
|
| 167 | + ``deregister_resource`` removes the resource and all of its |
| 168 | + relationships (owner, parent, grantees) atomically. Used both on delete |
| 169 | + and as the compensating action when a Temporal create fails. Failures |
| 170 | + are logged but never propagate. |
| 171 | + """ |
| 172 | + try: |
| 173 | + await self.authorization_service.deregister_resource( |
| 174 | + resource=AgentexResource.schedule(schedule_id), |
| 175 | + ) |
| 176 | + except Exception as exc: |
| 177 | + logger.warning( |
| 178 | + "Auth deregister failed for agent_schedule; tuple may be orphaned", |
| 179 | + extra={ |
| 180 | + "schedule_id": schedule_id, |
| 181 | + "error_type": type(exc).__name__, |
| 182 | + }, |
| 183 | + exc_info=True, |
| 184 | + ) |
| 185 | + |
100 | 186 | async def get_schedule(self, agent_id: str, schedule_name: str) -> ScheduleResponse: |
101 | 187 | """ |
102 | 188 | Get details of a schedule. |
@@ -237,6 +323,9 @@ async def delete_schedule(self, agent_id: str, schedule_name: str) -> None: |
237 | 323 | """ |
238 | 324 | schedule_id = build_schedule_id(agent_id, schedule_name) |
239 | 325 | await self.temporal_adapter.delete_schedule(schedule_id) |
| 326 | + # Best-effort: drop the auth tuple after the Temporal delete. A failure |
| 327 | + # here is logged but never blocks the delete. |
| 328 | + await self._deregister_schedule_from_auth(schedule_id=schedule_id) |
240 | 329 |
|
241 | 330 | def _description_to_response( |
242 | 331 | self, schedule_id: str, description: ScheduleDescription |
|
0 commit comments