|
| 1 | +"""Delegation depth guard. |
| 2 | +
|
| 3 | +Patches an agent's ``invoke`` method to track recursion depth and raise |
| 4 | +a ``GovernanceBlockException`` when the configured maximum is exceeded. |
| 5 | +This prevents runaway sub-agent chains. |
| 6 | +""" |
| 7 | + |
| 8 | +from __future__ import annotations |
| 9 | + |
| 10 | +import asyncio |
| 11 | +import functools |
| 12 | +import logging |
| 13 | +import os |
| 14 | +from contextvars import ContextVar, Token |
| 15 | +from typing import Any |
| 16 | + |
| 17 | +from uipath.core.governance.exceptions import ( |
| 18 | + GovernanceBlockException, |
| 19 | + GovernanceViolation, |
| 20 | +) |
| 21 | + |
| 22 | +logger = logging.getLogger(__name__) |
| 23 | + |
| 24 | +_DEFAULT_MAX_DELEGATION_DEPTH = 25 |
| 25 | +_ENV_MAX_DELEGATION_DEPTH = "UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH" |
| 26 | + |
| 27 | +# Single module-level ContextVar holding per-agent delegation depths |
| 28 | +# keyed by ``id(agent)``. Each install / uninstall pair shares this one |
| 29 | +# ContextVar instead of allocating a new one per agent — the interpreter |
| 30 | +# interns ContextVars and never GCs them, so per-agent allocation was an |
| 31 | +# unbounded leak in long-running hosts (every `install_delegation_guard` |
| 32 | +# call permanently grew the interpreter's ContextVar registry). |
| 33 | +# |
| 34 | +# Per-context isolation (asyncio task / thread) still works the standard |
| 35 | +# ContextVar way: each context sees its own copy of the depths dict, and |
| 36 | +# nested invokes use ``set`` / ``reset`` for LIFO depth tracking. The |
| 37 | +# dict itself is copied on every increment (copy-on-write) so concurrent |
| 38 | +# contexts don't share state through a mutable mapping. |
| 39 | +_DELEGATION_DEPTHS: ContextVar[dict[int, int]] = ContextVar( |
| 40 | + "_uipath_delegation_depths" |
| 41 | +) |
| 42 | + |
| 43 | + |
| 44 | +def _current_depth(agent_key: int) -> int: |
| 45 | + """Return the current depth for ``agent_key`` in this context.""" |
| 46 | + try: |
| 47 | + return _DELEGATION_DEPTHS.get().get(agent_key, 0) |
| 48 | + except LookupError: |
| 49 | + return 0 |
| 50 | + |
| 51 | + |
| 52 | +def _enter_depth_if_under( |
| 53 | + agent_key: int, max_depth: int |
| 54 | +) -> tuple[int, Token[dict[int, int]] | None]: |
| 55 | + """Attempt to increment depth for ``agent_key``. |
| 56 | +
|
| 57 | + Returns ``(new_depth, token)`` where ``token`` is ``None`` if the |
| 58 | + new depth would exceed ``max_depth`` — caller raises and does not |
| 59 | + need to clean up. On success, caller must reset via ``token``. |
| 60 | + """ |
| 61 | + try: |
| 62 | + depths = _DELEGATION_DEPTHS.get() |
| 63 | + except LookupError: |
| 64 | + depths = {} |
| 65 | + new_depth = depths.get(agent_key, 0) + 1 |
| 66 | + if new_depth > max_depth: |
| 67 | + return new_depth, None |
| 68 | + new_depths = dict(depths) |
| 69 | + new_depths[agent_key] = new_depth |
| 70 | + token = _DELEGATION_DEPTHS.set(new_depths) |
| 71 | + return new_depth, token |
| 72 | + |
| 73 | + |
| 74 | +def _exit_depth(token: Token[dict[int, int]]) -> None: |
| 75 | + """Undo a successful :func:`_enter_depth_if_under` call. |
| 76 | +
|
| 77 | + Tolerates cross-context resets (token created in a different |
| 78 | + context — happens when a child task awaits an agent invoke) by |
| 79 | + accepting the leak rather than crashing the agent on dispose. |
| 80 | + """ |
| 81 | + try: |
| 82 | + _DELEGATION_DEPTHS.reset(token) |
| 83 | + except (ValueError, LookupError): |
| 84 | + logger.debug("Delegation depth reset from foreign context") |
| 85 | + |
| 86 | + |
| 87 | +def _resolve_max_depth() -> int: |
| 88 | + """Read max-depth from env at call time, falling back to default on parse error.""" |
| 89 | + raw = os.getenv(_ENV_MAX_DELEGATION_DEPTH) |
| 90 | + if raw is None: |
| 91 | + return _DEFAULT_MAX_DELEGATION_DEPTH |
| 92 | + try: |
| 93 | + return int(raw) |
| 94 | + except ValueError: |
| 95 | + logger.warning( |
| 96 | + "Invalid %s=%r; using default %d", |
| 97 | + _ENV_MAX_DELEGATION_DEPTH, |
| 98 | + raw, |
| 99 | + _DEFAULT_MAX_DELEGATION_DEPTH, |
| 100 | + ) |
| 101 | + return _DEFAULT_MAX_DELEGATION_DEPTH |
| 102 | + |
| 103 | + |
| 104 | +def _build_violation(current: int, resolved_max: int) -> GovernanceBlockException: |
| 105 | + """Build the depth-exceeded exception (shared by sync and async guards).""" |
| 106 | + return GovernanceBlockException.from_violation( |
| 107 | + GovernanceViolation( |
| 108 | + rule_id="ASI-02", |
| 109 | + rule_name="Excessive Agency", |
| 110 | + detail=f"Delegation depth {current} exceeds max {resolved_max}", |
| 111 | + ) |
| 112 | + ) |
| 113 | + |
| 114 | + |
| 115 | +def _wrap_invoke(original: Any, agent_key: int, resolved_max: int) -> Any: |
| 116 | + """Return a depth-guarded wrapper matching the sync/async shape of ``original``. |
| 117 | +
|
| 118 | + Coroutine functions get an ``async def`` wrapper so the returned object |
| 119 | + is itself an awaitable — wrapping with a sync function would return an |
| 120 | + un-awaited coroutine and silently bypass the guard entirely. |
| 121 | +
|
| 122 | + Depth lives in the module-level :data:`_DELEGATION_DEPTHS` ContextVar |
| 123 | + keyed by ``agent_key`` (``id(agent)``), so every guarded agent shares |
| 124 | + the same ContextVar instance and the interpreter's ContextVar |
| 125 | + registry doesn't grow with each install. |
| 126 | + """ |
| 127 | + if asyncio.iscoroutinefunction(original): |
| 128 | + |
| 129 | + @functools.wraps(original) |
| 130 | + async def _guarded_async(input_data: Any, **kwargs: Any) -> Any: |
| 131 | + current, token = _enter_depth_if_under(agent_key, resolved_max) |
| 132 | + if token is None: |
| 133 | + raise _build_violation(current, resolved_max) |
| 134 | + try: |
| 135 | + return await original(input_data, **kwargs) |
| 136 | + finally: |
| 137 | + _exit_depth(token) |
| 138 | + |
| 139 | + return _guarded_async |
| 140 | + |
| 141 | + @functools.wraps(original) |
| 142 | + def _guarded_sync(input_data: Any, **kwargs: Any) -> Any: |
| 143 | + current, token = _enter_depth_if_under(agent_key, resolved_max) |
| 144 | + if token is None: |
| 145 | + raise _build_violation(current, resolved_max) |
| 146 | + try: |
| 147 | + return original(input_data, **kwargs) |
| 148 | + finally: |
| 149 | + _exit_depth(token) |
| 150 | + |
| 151 | + return _guarded_sync |
| 152 | + |
| 153 | + |
| 154 | +# Method names we guard on the agent. ``ainvoke`` is required because |
| 155 | +# LangChain / LangGraph / LlamaIndex agents expose it as the primary |
| 156 | +# async entrypoint; wrapping only ``invoke`` would let async callers |
| 157 | +# bypass the depth check entirely. A single ContextVar is shared across |
| 158 | +# both so an async call that internally falls through to sync ``invoke`` |
| 159 | +# still increments the same counter. |
| 160 | +_GUARDED_METHODS = ("invoke", "ainvoke") |
| 161 | + |
| 162 | + |
| 163 | +def install_delegation_guard(agent: Any, max_depth: int | None = None) -> None: |
| 164 | + """Patch the agent's invoke methods to enforce a maximum delegation depth. |
| 165 | +
|
| 166 | + Patches both ``invoke`` and ``ainvoke`` when present; each wrapper |
| 167 | + matches the sync/async shape of the original so awaitables stay |
| 168 | + awaitable. No-op when neither attribute exists or the agent has |
| 169 | + already been guarded. |
| 170 | +
|
| 171 | + Per-call-chain depth is tracked in a single :class:`contextvars.ContextVar` |
| 172 | + shared across both methods so an ``ainvoke`` that internally calls |
| 173 | + ``invoke`` still increments the same counter. Concurrent invokes on |
| 174 | + the same agent (across threads or asyncio tasks) keep separate |
| 175 | + counters because ContextVar values are per-context. |
| 176 | +
|
| 177 | + Originals are stashed on the agent under |
| 178 | + ``_uipath_original_<method>`` so :func:`uninstall_delegation_guard` |
| 179 | + can restore them on dispose. |
| 180 | + """ |
| 181 | + if max_depth is None: |
| 182 | + max_depth = _resolve_max_depth() |
| 183 | + if getattr(agent, "_delegation_wrapped", False): |
| 184 | + return |
| 185 | + |
| 186 | + originals = { |
| 187 | + name: getattr(agent, name, None) |
| 188 | + for name in _GUARDED_METHODS |
| 189 | + if callable(getattr(agent, name, None)) |
| 190 | + } |
| 191 | + if not originals: |
| 192 | + return |
| 193 | + |
| 194 | + agent_key = id(agent) |
| 195 | + resolved_max = max_depth |
| 196 | + |
| 197 | + for name, original in originals.items(): |
| 198 | + try: |
| 199 | + setattr(agent, name, _wrap_invoke(original, agent_key, resolved_max)) |
| 200 | + setattr(agent, f"_uipath_original_{name}", original) |
| 201 | + except (AttributeError, TypeError) as exc: |
| 202 | + # Some agent objects expose `invoke` via __getattr__ or via a |
| 203 | + # slot/descriptor that can't be re-assigned. Skip those — |
| 204 | + # better to guard partial coverage than to crash the runtime. |
| 205 | + logger.debug("Could not patch %s on agent: %s", name, exc) |
| 206 | + agent._delegation_wrapped = True |
| 207 | + logger.debug( |
| 208 | + "Delegation guard installed (max=%d, methods=%s)", |
| 209 | + resolved_max, |
| 210 | + list(originals), |
| 211 | + ) |
| 212 | + |
| 213 | + |
| 214 | +def uninstall_delegation_guard(agent: Any) -> None: |
| 215 | + """Restore the agent's invoke methods if a delegation guard was installed. |
| 216 | +
|
| 217 | + Safe to call on agents that were never guarded. Also clears the |
| 218 | + agent's entry from the current context's depth map — ``id(agent)`` |
| 219 | + is reused by Python after GC, so a stale entry could mis-attribute |
| 220 | + a future agent's count to this one. |
| 221 | + """ |
| 222 | + if not getattr(agent, "_delegation_wrapped", False): |
| 223 | + return |
| 224 | + for name in _GUARDED_METHODS: |
| 225 | + attr = f"_uipath_original_{name}" |
| 226 | + original = getattr(agent, attr, None) |
| 227 | + if original is not None: |
| 228 | + try: |
| 229 | + setattr(agent, name, original) |
| 230 | + except Exception as exc: # noqa: BLE001 - dispose path; never raise |
| 231 | + logger.debug("Could not restore original %s: %s", name, exc) |
| 232 | + try: |
| 233 | + delattr(agent, attr) |
| 234 | + except AttributeError: |
| 235 | + pass |
| 236 | + agent._delegation_wrapped = False |
| 237 | + # Drop the agent's depth entry in the current context. Best-effort |
| 238 | + # — if dispose runs from a different context than where the depth |
| 239 | + # was set, the foreign context still owns its own copy and will |
| 240 | + # discard it when it ends. |
| 241 | + agent_key = id(agent) |
| 242 | + try: |
| 243 | + depths = _DELEGATION_DEPTHS.get() |
| 244 | + except LookupError: |
| 245 | + return |
| 246 | + if agent_key in depths: |
| 247 | + new_depths = {k: v for k, v in depths.items() if k != agent_key} |
| 248 | + _DELEGATION_DEPTHS.set(new_depths) |
0 commit comments