Skip to content

Commit f7910e3

Browse files
committed
feat(AGX1-274): task FGAC dual-write call sites + FGAC_TASKS_DUAL_WRITE flag
Wires `register_resource` / `deregister_resource` into `AgentTaskService.create_task` / `delete_task`, gated by a new `FGAC_TASKS_DUAL_WRITE` env-var (off by default; resolved at DI-resolve time so mid-process flips are intentionally invisible — rollout assumes a redeploy cycles pods). - `create_task`: after the Postgres row persists, register the task in the authorization graph with `parent=agent` so the tenant + owner + parent_agent tuples are written atomically. - `delete_task`: pre-resolves the task id by name before the Postgres delete (lookup-after-delete would race), then deregisters once the row is gone. The name-lookup `ItemDoesNotExist` is swallowed so the subsequent `delete()` surfaces its own native error — flipping the flag must not change the error contract for missing tasks. - Both call sites share `_dual_write_with_retry(op_name, do_call, task_id)`, which retries transient `AuthenticationServiceUnavailableError` / `AuthenticationGatewayError` with exponential backoff + jitter (3 retries → 4 attempts max). Mirrors `agents_acp_use_case.grant_with_retry`, but with no `fail_task` fallback: the Postgres row is the durable record and orphan auth tuples are preferable to losing the task. The AGX1-291 operator runbook covers backfill using the creator-audit columns added in the parent commit. - Emits `task_fgac_dual_write.{attempt,success,retry,failure}` statsd counters (tagged `op:register|deregister` and `exception_class` on failure) — the rollout signal for the FGAC_TASKS_DUAL_WRITE flip dashboard. The `Port` interface gains `register_resource` / `deregister_resource`, and the agentex-auth proxy adapter calls `POST /v1/authz/register` and `POST /v1/authz/deregister`. The endpoints themselves already live on agentex-auth `main` via #354; per-account routing across them is set by scaleapi/agentex#353. Part of the AGX1-264 stack: scaleapi/scaleapi NEW2 (per-account FF endpoint) → scaleapi/agentex#353 (agentex-auth routing + cancel) → this PR → #249 (per-RPC route migration).
1 parent c671a7b commit f7910e3

8 files changed

Lines changed: 582 additions & 13 deletions

File tree

agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,35 @@ async def list_resources(
8585
)
8686
return response["items"]
8787

88+
async def register_resource(
89+
self,
90+
principal: AgentexAuthPrincipalContext,
91+
resource: AgentexResource,
92+
parent: AgentexResource | None = None,
93+
) -> None:
94+
payload: dict = {
95+
"principal": principal,
96+
"resource": resource.model_dump(),
97+
}
98+
if parent is not None:
99+
payload["parent"] = parent.model_dump()
100+
await HttpRequestHandler.post_with_error_handling(
101+
self.agentex_auth_url, "/v1/authz/register", json=payload
102+
)
103+
104+
async def deregister_resource(
105+
self,
106+
principal: AgentexAuthPrincipalContext,
107+
resource: AgentexResource,
108+
) -> None:
109+
payload = {
110+
"principal": principal,
111+
"resource": resource.model_dump(),
112+
}
113+
await HttpRequestHandler.post_with_error_handling(
114+
self.agentex_auth_url, "/v1/authz/deregister", json=payload
115+
)
116+
88117

89118
DAgentexAuthorization = Annotated[
90119
AgentexAuthorizationProxy, Depends(AgentexAuthorizationProxy)

agentex/src/adapters/authorization/port.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,31 @@ async def list_resources(
4949
filter_operation: AuthorizedOperationType = AuthorizedOperationType.read,
5050
) -> Iterable[str]:
5151
"""List resource_ids for a given principal"""
52+
53+
@abstractmethod
54+
async def register_resource(
55+
self,
56+
principal: PrincipalT,
57+
resource: AgentexResource,
58+
parent: AgentexResource | None = None,
59+
) -> None:
60+
"""Register a newly-created resource in the authorization graph.
61+
62+
Atomically writes the relation tuples the schema requires (tenant +
63+
owner, plus an optional typed parent like ``task.parent_agent``).
64+
Distinct from ``grant`` because ``grant`` writes a single role
65+
relation, which is insufficient for schemas that gate access on
66+
``tenant->membership``.
67+
"""
68+
69+
@abstractmethod
70+
async def deregister_resource(
71+
self,
72+
principal: PrincipalT,
73+
resource: AgentexResource,
74+
) -> None:
75+
"""Deregister a resource being deleted from the authorization graph.
76+
77+
Removes every relation tuple written for the resource — keeps the
78+
graph in sync with the application database on row delete.
79+
"""

agentex/src/config/environment_variables.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class EnvVarKeys(str, Enum):
5858
AGENTEX_SERVER_TASK_QUEUE = "AGENTEX_SERVER_TASK_QUEUE"
5959
ENABLE_HEALTH_CHECK_WORKFLOW = "ENABLE_HEALTH_CHECK_WORKFLOW"
6060
WEBHOOK_REQUEST_TIMEOUT = "WEBHOOK_REQUEST_TIMEOUT"
61+
FGAC_TASKS_DUAL_WRITE = "FGAC_TASKS_DUAL_WRITE"
6162

6263

6364
class Environment(str, Enum):
@@ -114,6 +115,10 @@ class EnvironmentVariables(BaseModel):
114115
AGENTEX_SERVER_TASK_QUEUE: str | None = None
115116
ENABLE_HEALTH_CHECK_WORKFLOW: bool = False
116117
WEBHOOK_REQUEST_TIMEOUT: float = 15.0 # Webhook request timeout in seconds
118+
# AGX1-274: gate the task FGAC dual-write call sites. Off by default so
119+
# rollout is operator-controlled per environment. Mirrors KB's
120+
# ``FGAC_KNOWLEDGE_BASES_DUAL_WRITE`` shape.
121+
FGAC_TASKS_DUAL_WRITE: bool = False
117122

118123
@classmethod
119124
def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
@@ -203,6 +208,10 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
203208
WEBHOOK_REQUEST_TIMEOUT=float(
204209
os.environ.get(EnvVarKeys.WEBHOOK_REQUEST_TIMEOUT, "15.0")
205210
),
211+
FGAC_TASKS_DUAL_WRITE=(
212+
os.environ.get(EnvVarKeys.FGAC_TASKS_DUAL_WRITE, "false").lower()
213+
== "true"
214+
),
206215
)
207216
refreshed_environment_variables = environment_variables
208217
return refreshed_environment_variables

agentex/src/domain/services/authorization_service.py

Lines changed: 85 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections.abc import Iterable
2-
from typing import Annotated
2+
from typing import Annotated, Any
33

44
from fastapi import Depends, Request
55

@@ -17,6 +17,11 @@
1717

1818
logger = make_logger(__name__)
1919

20+
# Sentinel for "caller did not pass an explicit principal_context" — None is a
21+
# valid value (the bypass path logs ``for principal None``) so it can't be the
22+
# default. Using a named object reads better at the call site than ``...``.
23+
_UNSET: Any = object()
24+
2025

2126
class AuthorizationService:
2227
def __init__(
@@ -40,7 +45,11 @@ def is_enabled(self) -> bool:
4045
return self.enabled
4146

4247
async def grant(
43-
self, resource: AgentexResource, *, commit: bool = True, principal_context=...
48+
self,
49+
resource: AgentexResource,
50+
*,
51+
commit: bool = True,
52+
principal_context: Any = _UNSET,
4453
) -> None:
4554
if self._bypass():
4655
logger.info(
@@ -57,15 +66,19 @@ async def grant(
5766
)
5867
result = await self.gateway.grant(
5968
principal_context
60-
if principal_context is not ...
69+
if principal_context is not _UNSET
6170
else self.principal_context,
6271
resource,
6372
AuthorizedOperationType.create,
6473
)
6574
return result
6675

6776
async def revoke(
68-
self, resource: AgentexResource, *, commit: bool = True, principal_context=...
77+
self,
78+
resource: AgentexResource,
79+
*,
80+
commit: bool = True,
81+
principal_context: Any = _UNSET,
6982
) -> None:
7083
if self._bypass():
7184
logger.info("Authorization bypassed for revoke operation")
@@ -81,7 +94,7 @@ async def revoke(
8194

8295
result = await self.gateway.revoke(
8396
principal_context
84-
if principal_context is not ...
97+
if principal_context is not _UNSET
8598
else self.principal_context,
8699
resource,
87100
AuthorizedOperationType.delete,
@@ -96,7 +109,7 @@ async def check(
96109
resource: AgentexResource,
97110
operation: AuthorizedOperationType,
98111
*,
99-
principal_context=...,
112+
principal_context: Any = _UNSET,
100113
) -> bool:
101114
if self._bypass():
102115
logger.info("Authorization bypassed for check operation")
@@ -105,7 +118,7 @@ async def check(
105118
# Determine which principal context to use
106119
effective_principal = (
107120
principal_context
108-
if principal_context is not ...
121+
if principal_context is not _UNSET
109122
else self.principal_context
110123
)
111124

@@ -157,12 +170,75 @@ async def check(
157170
)
158171
return result
159172

173+
async def register_resource(
174+
self,
175+
resource: AgentexResource,
176+
*,
177+
parent: AgentexResource | None = None,
178+
principal_context: Any = _UNSET,
179+
) -> None:
180+
"""Register a freshly-created resource with the authorization graph.
181+
182+
Used immediately after persisting a new row to write the tenant +
183+
owner (and optionally typed parent) relation tuples atomically.
184+
Distinct from ``grant`` because ``grant`` only writes a single
185+
role relation, which is insufficient for schemas (e.g. ``task``)
186+
that require a ``tenant->membership`` gate.
187+
"""
188+
if self._bypass():
189+
logger.info(
190+
f"Authorization bypassed for register_resource on {resource.type}:{resource.selector}"
191+
)
192+
return None
193+
194+
logger.info(
195+
"[authorization_service] Registering resource %s:%s for principal %s (parent=%s)",
196+
resource.type,
197+
resource.selector,
198+
self.principal_context,
199+
parent,
200+
)
201+
await self.gateway.register_resource(
202+
principal_context
203+
if principal_context is not _UNSET
204+
else self.principal_context,
205+
resource,
206+
parent,
207+
)
208+
209+
async def deregister_resource(
210+
self,
211+
resource: AgentexResource,
212+
*,
213+
principal_context: Any = _UNSET,
214+
) -> None:
215+
"""Remove every relation tuple written for the resource — used when
216+
deleting the underlying database row."""
217+
if self._bypass():
218+
logger.info(
219+
f"Authorization bypassed for deregister_resource on {resource.type}:{resource.selector}"
220+
)
221+
return None
222+
223+
logger.info(
224+
"[authorization_service] Deregistering resource %s:%s for principal %s",
225+
resource.type,
226+
resource.selector,
227+
self.principal_context,
228+
)
229+
await self.gateway.deregister_resource(
230+
principal_context
231+
if principal_context is not _UNSET
232+
else self.principal_context,
233+
resource,
234+
)
235+
160236
async def list_resources(
161237
self,
162238
filter_resource: AgentexResourceType,
163239
filter_operation: AuthorizedOperationType = AuthorizedOperationType.read,
164240
*,
165-
principal_context=...,
241+
principal_context: Any = _UNSET,
166242
) -> Iterable[str] | None:
167243
"""List resource identifiers for which the current principal has *filter_operation* permission."""
168244

@@ -178,7 +254,7 @@ async def list_resources(
178254
)
179255
result = await self.gateway.list_resources(
180256
principal_context
181-
if principal_context is not ...
257+
if principal_context is not _UNSET
182258
else self.principal_context,
183259
filter_resource,
184260
filter_operation,

0 commit comments

Comments
 (0)