Skip to content

Commit 76353de

Browse files
authored
feat(authz): register agentex resources and grant agent/task ownership (#270)
## Related work Parent ticket: AGX1-242 - AgentEx authorization dual-write. This change is part of a 4-PR stack across 3 repos. | Repo | PR | Purpose | |---|---|---| | scaleapi/scaleapi | [scaleapi/scaleapi#146335](scaleapi/scaleapi#146335) | merged flag registry update for the shared AgentEx resources rollout flags | | scaleapi/agentex | [scaleapi/agentex#364](scaleapi/agentex#364) | agentex-auth routes each auth verb to one backend target | | **scaleapi/scale-agentex** | **this PR** | **registers agentex resources in the authorization graph and grants agent/task ownership** | | scaleapi/scale-agentex | [#271](#271) | denied direct agent access collapses to 404 | **Merge/deploy order:** [scaleapi/scaleapi#146335](scaleapi/scaleapi#146335) is merged; deploy [scaleapi/agentex#364](scaleapi/agentex#364) before this PR, then merge/deploy [#271](#271). Rollout per account is: enable `fgac-agentex-resources-dual-write`, backfill existing resources into Spark, then enable `fgac-agentex-resources` reads. ## What Registers agentex resources in the authorization graph on create and removes them on delete, separating resource lifecycle from explicit ownership: - **Agents and tasks** register in the authorization graph on create and record an explicit owner; on delete they deregister and the owner record is revoked. - **Schedules and agent API keys** register under their parent agent on create and deregister on delete. Permissions cascade from the parent agent, so no separate owner record is written. Registration runs before the resource is persisted, so an authorization failure fails closed with no orphaned row. If the persist (or the Temporal create, for schedules) later fails, a compensating deregister runs. ## Why Giving each verb a single backend keeps the existing ownership writes (`grant`/`revoke`) unchanged and confines the new behavior to the new lifecycle interface (`register`/`deregister`). Explicit ownership is recorded only for agents and tasks; schedules and agent API keys inherit access from their parent agent, so they need no owner record of their own. Issuing the two write kinds independently keeps both backends current, so reads can cut over per account with a clean rollback. ## Testing - Unit and integration tests covering the agent, task, schedule, and api_key authorization writes (register/deregister ordering, fail-closed-before-persist, and compensating cleanup). - `ruff check` / `ruff format`.
1 parent 5277fcf commit 76353de

18 files changed

Lines changed: 769 additions & 308 deletions

agentex/openapi.yaml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5221,12 +5221,6 @@ components:
52215221
acp_type:
52225222
$ref: '#/components/schemas/ACPType'
52235223
description: The type of ACP to use for the agent.
5224-
principal_context:
5225-
anyOf:
5226-
- {}
5227-
- type: 'null'
5228-
title: Principal Context
5229-
description: Principal used for authorization
52305224
registration_metadata:
52315225
anyOf:
52325226
- additionalProperties: true
@@ -5336,12 +5330,6 @@ components:
53365330
type: string
53375331
title: Description
53385332
description: The description of the agent.
5339-
principal_context:
5340-
anyOf:
5341-
- {}
5342-
- type: 'null'
5343-
title: Principal Context
5344-
description: Principal used for authorization
53455333
registration_metadata:
53465334
anyOf:
53475335
- additionalProperties: true

agentex/src/api/routes/agents.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,9 @@ async def get_agent_by_name(
7676
AgentexResourceType.agent, AuthorizedOperationType.read
7777
),
7878
agents_use_case: DAgentsUseCase,
79-
authorization: DAuthorizationService,
8079
):
8180
"""Get an agent by its unique name."""
8281
agent_entity = await agents_use_case.get(name=agent_name)
83-
84-
await authorization.check(
85-
resource=AgentexResource.agent(agent_entity.id),
86-
operation=AuthorizedOperationType.read,
87-
)
88-
8982
return Agent.model_validate(agent_entity)
9083

9184

@@ -183,7 +176,6 @@ async def register_agent(
183176
await authorization_service.check(
184177
AgentexResource.agent("*"),
185178
AuthorizedOperationType.create,
186-
principal_context=request.principal_context,
187179
)
188180
logger.info(f"Registering agent: {request}")
189181
try:
@@ -198,7 +190,6 @@ async def register_agent(
198190
)
199191
await authorization_service.grant(
200192
AgentexResource.agent(agent_entity.id),
201-
principal_context=request.principal_context,
202193
)
203194
response_fields = agent_entity.model_dump()
204195
existing_key = await api_keys_use_case.get_internal_api_key_by_agent_id(
@@ -236,11 +227,10 @@ async def register_build(
236227
agents_use_case: DAgentsUseCase,
237228
authorization_service: DAuthorizationService,
238229
) -> Agent:
239-
"""Create a build-only agent row and register its authz resource."""
230+
"""Create a build-only agent row after create authorization."""
240231
await authorization_service.check(
241232
AgentexResource.agent("*"),
242233
AuthorizedOperationType.create,
243-
principal_context=request.principal_context,
244234
)
245235
logger.info(f"Registering build for agent: {request.name}")
246236
try:
@@ -252,9 +242,8 @@ async def register_build(
252242
)
253243
except ValueError as e:
254244
raise HTTPException(status_code=400, detail=str(e)) from e
255-
await authorization_service.register_resource(
245+
await authorization_service.grant(
256246
AgentexResource.agent(agent_entity.id),
257-
principal_context=request.principal_context,
258247
)
259248
return Agent.model_validate(agent_entity)
260249

agentex/src/api/schemas/agents.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,6 @@ class RegisterAgentRequest(BaseModel):
8888
description="Optional agent ID if the agent already exists and needs to be updated.",
8989
)
9090
acp_type: ACPType = Field(..., description="The type of ACP to use for the agent.")
91-
principal_context: Any | None = Field(
92-
default=None, description="Principal used for authorization"
93-
)
9491
registration_metadata: dict[str, Any] | None = Field(
9592
default=None,
9693
description="The metadata for the agent's registration.",
@@ -120,9 +117,6 @@ class RegisterBuildRequest(BaseModel):
120117
..., pattern=r"^[a-z0-9-]+$", description="The unique name of the agent."
121118
)
122119
description: str = Field(..., description="The description of the agent.")
123-
principal_context: Any | None = Field(
124-
default=None, description="Principal used for authorization"
125-
)
126120
registration_metadata: dict[str, Any] | None = Field(
127121
default=None,
128122
description="The metadata for the agent's build registration.",

agentex/src/domain/services/schedule_service.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,12 @@ async def create_schedule(
8484
else None
8585
)
8686

87-
# Schedules have no Postgres row Temporal is the store and the auth
87+
# Schedules have no Postgres row: Temporal is the store and the auth
8888
# selector is the schedule id ({agent_id}--{schedule_name}). Register
89-
# the auth tuple (with the parent_agent edge) BEFORE the Temporal write
90-
# (fail-closed), and compensate if the Temporal create fails so we never
91-
# leave an orphan tuple. The read-back below is intentionally OUTSIDE
92-
# the compensation scope: a describe failure must not deregister a
93-
# schedule that was actually created.
89+
# before the Temporal write so an auth failure fails closed. If the
90+
# Temporal create fails after registration, compensate with a deregister.
91+
# The read-back below is intentionally outside the compensation scope
92+
# because the schedule was already created.
9493
registered = await self._register_schedule_in_auth(
9594
schedule_id=schedule_id, agent_id=agent.id
9695
)
@@ -109,7 +108,7 @@ async def create_schedule(
109108
paused=request.paused,
110109
)
111110
except Exception:
112-
# Orphan-tuple guard: the tuple was written but the schedule never
111+
# Orphan guard: the auth entry was written but the schedule never
113112
# landed in Temporal. Best-effort compensating deregister, then
114113
# re-raise the original error.
115114
if registered:
@@ -122,17 +121,15 @@ async def create_schedule(
122121
async def _register_schedule_in_auth(
123122
self, *, schedule_id: str, agent_id: str
124123
) -> bool:
125-
"""Register a new agent_schedule with the auth service, including the
126-
parent_agent edge so permissions cascade from the owning agent.
124+
"""Register the schedule in the authorization graph before creating it.
127125
128-
Called BEFORE the Temporal write — a failure raises and prevents the
129-
schedule from being created. Skipped with a warning when no usable
130-
creator identity is available on the principal context (e.g.
131-
agent-bypass / internal paths without an authenticated user); this is
132-
the interim behavior until on-behalf-of-user identity is threaded.
126+
The schedule is registered under its parent agent so permissions
127+
cascade from the owning agent. Registering before the Temporal create
128+
fails closed: an auth failure aborts the create, and the caller
129+
compensates with a deregister if the Temporal create later fails.
133130
134-
Returns True when a tuple was actually registered (so the caller knows
135-
whether a compensating deregister is warranted), False when skipped.
131+
Returns True when the schedule was registered, or False when no creator
132+
identity is resolvable and registration is skipped.
136133
"""
137134
principal_context = self.authorization_service.principal_context
138135
user_id = getattr(principal_context, "user_id", None)
@@ -149,9 +146,8 @@ async def _register_schedule_in_auth(
149146
parent=AgentexResource.agent(agent_id),
150147
)
151148
except Exception as exc:
152-
# Fail closed: log + re-raise so the schedule is never created.
153149
logger.exception(
154-
"Auth register_resource failed for agent_schedule; aborting create",
150+
"Auth registration failed for agent_schedule; aborting create",
155151
extra={
156152
"schedule_id": schedule_id,
157153
"agent_id": agent_id,
@@ -162,20 +158,19 @@ async def _register_schedule_in_auth(
162158
return True
163159

164160
async def _deregister_schedule_from_auth(self, *, schedule_id: str) -> None:
165-
"""Best-effort deregistration of a schedule's auth tuples.
161+
"""Best-effort removal of the schedule from the authorization graph.
166162
167-
``deregister_resource`` removes the resource and all of its
168-
relationships (owner, parent, grantees) atomically. Used both on delete
169-
and as the compensating action when a Temporal create fails. Failures
170-
are logged but never propagate.
163+
Temporal is the source of truth for schedule existence. Once Temporal
164+
delete succeeds, a deregister failure is logged but does not block the
165+
delete response.
171166
"""
172167
try:
173168
await self.authorization_service.deregister_resource(
174169
resource=AgentexResource.schedule(schedule_id),
175170
)
176171
except Exception as exc:
177172
logger.warning(
178-
"Auth deregister failed for agent_schedule; tuple may be orphaned",
173+
"Auth deregister failed for agent_schedule; entry may be orphaned",
179174
extra={
180175
"schedule_id": schedule_id,
181176
"error_type": type(exc).__name__,
@@ -392,7 +387,10 @@ def _description_to_response(
392387
# Decode bytes to string if possible
393388
try:
394389
import json
395-
workflow_params.append(json.loads(arg.data.decode("utf-8")))
390+
391+
workflow_params.append(
392+
json.loads(arg.data.decode("utf-8"))
393+
)
396394
except (json.JSONDecodeError, UnicodeDecodeError):
397395
workflow_params.append(str(arg.data))
398396
else:
@@ -430,7 +428,9 @@ def _description_to_response(
430428
if hasattr(info, "recent_actions") and info.recent_actions:
431429
# ScheduleActionResult has started_at (when action started) and scheduled_at (when it was scheduled)
432430
last_action = info.recent_actions[-1]
433-
last_action_time = getattr(last_action, "started_at", None) or getattr(last_action, "scheduled_at", None)
431+
last_action_time = getattr(last_action, "started_at", None) or getattr(
432+
last_action, "scheduled_at", None
433+
)
434434
created_at: datetime | None = (
435435
cast(datetime, info.create_time)
436436
if hasattr(info, "create_time") and info.create_time

agentex/src/domain/services/task_service.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,10 @@ async def create_task(
6464
Returns:
6565
Task containing the created task info
6666
"""
67-
# Register in the authorization service before persisting: a registration
68-
# failure aborts the request with no orphaned row. If the persist fails
69-
# after a successful registration, the compensating deregister_resource
70-
# below prevents a dangling authorization entry. Both calls are no-ops
71-
# when the authorization service is disabled for this account.
67+
# Register the task in the authorization graph before persisting: a
68+
# registration failure aborts the request with no orphaned row. If the
69+
# persist fails after a successful registration, the compensating
70+
# deregister_resource below prevents a dangling authorization entry.
7271
task_entity = TaskEntity(
7372
id=orm_id(),
7473
name=task_name,

agentex/src/domain/use_cases/agent_api_keys_use_case.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,12 @@ async def _register_api_key_in_auth(
112112
api_key_id: str,
113113
agent_id: str,
114114
) -> None:
115-
"""Register a new agent_api_key with the auth service, including the
116-
parent_agent edge so permissions cascade from the owning agent.
117-
118-
Called BEFORE the Postgres write — a failure raises and prevents the
119-
row from being persisted, so there is no compensating action. Skipped
120-
with a warning when no usable creator identity is available on the
121-
principal context (e.g. internal-key creation paths without an
122-
authenticated user).
115+
"""Register a new agent_api_key in the authorization graph before persist.
116+
117+
The api key is registered under its parent agent so permissions cascade
118+
from the owning agent. Registering before the Postgres row is created
119+
fails closed: a failure aborts the create rather than leaving a
120+
persisted api key that cannot be authorized.
123121
"""
124122
principal_context = self.authorization_service.principal_context
125123
user_id = getattr(principal_context, "user_id", None)
@@ -139,9 +137,8 @@ async def _register_api_key_in_auth(
139137
parent=AgentexResource.agent(agent_id),
140138
)
141139
except Exception as exc:
142-
# Fail closed: log + re-raise so the Postgres row is never written.
143140
logger.exception(
144-
"Auth register_resource failed for agent_api_key; aborting create",
141+
"Auth registration failed for agent_api_key; aborting create",
145142
extra={
146143
"api_key_id": api_key_id,
147144
"agent_id": agent_id,
@@ -151,21 +148,19 @@ async def _register_api_key_in_auth(
151148
raise
152149

153150
async def _deregister_api_key_from_auth(self, *, api_key_id: str) -> None:
154-
"""Best-effort deregistration of an api_key's auth tuples on delete.
151+
"""Best-effort removal of the api_key from the authorization graph.
155152
156-
``deregister_resource`` removes the resource and all of its
157-
relationships (owner, parent, grantees) atomically. Always invoked;
158-
the authorization service decides how to route the call. Failures are
159-
logged but do not block the delete.
153+
Deletes treat Postgres as the source of truth for existence. Once the
154+
row is gone, a deregister failure is logged but does not block the
155+
delete response.
160156
"""
161157
try:
162158
await self.authorization_service.deregister_resource(
163159
resource=AgentexResource.api_key(api_key_id),
164160
)
165161
except Exception as exc:
166-
# Best-effort: log and continue. Postgres row already deleted.
167162
logger.warning(
168-
"Auth deregister failed for agent_api_key; tuple may be orphaned",
163+
"Auth deregister failed for agent_api_key; entry may be orphaned",
169164
extra={
170165
"api_key_id": api_key_id,
171166
"error_type": type(exc).__name__,
@@ -203,8 +198,8 @@ async def get_external_by_agent_id_and_key(
203198
)
204199

205200
async def delete(self, id: str) -> None:
206-
# Pre-fetch so we skip both the delete and the deregister when the row
207-
# never existed no DB round-trip, no auth round-trip for a no-op.
201+
# Pre-fetch so we skip both the delete and auth cleanup when the row
202+
# never existed: no DB round-trip, no auth round-trip for a no-op.
208203
try:
209204
await self.agent_api_key_repo.get(id=id)
210205
except ItemDoesNotExist:

agentex/src/domain/use_cases/agents_acp_use_case.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
AuthenticationServiceUnavailableError,
1111
)
1212
from src.adapters.crud_store.exceptions import ItemDoesNotExist
13-
from src.api.schemas.authorization_types import (
14-
AgentexResource,
15-
)
13+
from src.api.schemas.authorization_types import AgentexResource
1614
from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus
1715
from src.domain.entities.agents_rpc import (
1816
ACP_TYPE_TO_ALLOWED_RPC_METHODS,
@@ -237,7 +235,7 @@ async def _execute_with_error_handling(
237235
raise e
238236

239237
async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None:
240-
"""Grant authorization for a task with retry"""
238+
"""Grant ownership for a newly created task."""
241239
try:
242240
await self.authorization_service.grant(
243241
resource=AgentexResource.task(task.id),
@@ -250,11 +248,10 @@ async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None:
250248
)
251249
await asyncio.sleep(delay)
252250
return await self.grant_with_retry(task, attempts + 1)
253-
else:
254-
logger.error(
255-
f"Authentication service unavailable: {e}. Max retries reached."
256-
)
257-
raise e from e
251+
logger.error(
252+
f"Authentication service unavailable: {e}. Max retries reached."
253+
)
254+
raise e from e
258255
except Exception as e:
259256
logger.error(f"Error granting authorization for task {task.id}: {e}")
260257
await self.task_service.fail_task(task, str(e))

0 commit comments

Comments
 (0)