diff --git a/src/uipath/runtime/governance/delegation_guard.py b/src/uipath/runtime/governance/delegation_guard.py new file mode 100644 index 0000000..f872f62 --- /dev/null +++ b/src/uipath/runtime/governance/delegation_guard.py @@ -0,0 +1,248 @@ +"""Delegation depth guard. + +Patches an agent's ``invoke`` method to track recursion depth and raise +a ``GovernanceBlockException`` when the configured maximum is exceeded. +This prevents runaway sub-agent chains. +""" + +from __future__ import annotations + +import asyncio +import functools +import logging +import os +from contextvars import ContextVar, Token +from typing import Any + +from uipath.core.governance.exceptions import ( + GovernanceBlockException, + GovernanceViolation, +) + +logger = logging.getLogger(__name__) + +_DEFAULT_MAX_DELEGATION_DEPTH = 25 +_ENV_MAX_DELEGATION_DEPTH = "UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH" + +# Single module-level ContextVar holding per-agent delegation depths +# keyed by ``id(agent)``. Each install / uninstall pair shares this one +# ContextVar instead of allocating a new one per agent — the interpreter +# interns ContextVars and never GCs them, so per-agent allocation was an +# unbounded leak in long-running hosts (every `install_delegation_guard` +# call permanently grew the interpreter's ContextVar registry). +# +# Per-context isolation (asyncio task / thread) still works the standard +# ContextVar way: each context sees its own copy of the depths dict, and +# nested invokes use ``set`` / ``reset`` for LIFO depth tracking. The +# dict itself is copied on every increment (copy-on-write) so concurrent +# contexts don't share state through a mutable mapping. +_DELEGATION_DEPTHS: ContextVar[dict[int, int]] = ContextVar( + "_uipath_delegation_depths" +) + + +def _current_depth(agent_key: int) -> int: + """Return the current depth for ``agent_key`` in this context.""" + try: + return _DELEGATION_DEPTHS.get().get(agent_key, 0) + except LookupError: + return 0 + + +def _enter_depth_if_under( + agent_key: int, max_depth: int +) -> tuple[int, Token[dict[int, int]] | None]: + """Attempt to increment depth for ``agent_key``. + + Returns ``(new_depth, token)`` where ``token`` is ``None`` if the + new depth would exceed ``max_depth`` — caller raises and does not + need to clean up. On success, caller must reset via ``token``. + """ + try: + depths = _DELEGATION_DEPTHS.get() + except LookupError: + depths = {} + new_depth = depths.get(agent_key, 0) + 1 + if new_depth > max_depth: + return new_depth, None + new_depths = dict(depths) + new_depths[agent_key] = new_depth + token = _DELEGATION_DEPTHS.set(new_depths) + return new_depth, token + + +def _exit_depth(token: Token[dict[int, int]]) -> None: + """Undo a successful :func:`_enter_depth_if_under` call. + + Tolerates cross-context resets (token created in a different + context — happens when a child task awaits an agent invoke) by + accepting the leak rather than crashing the agent on dispose. + """ + try: + _DELEGATION_DEPTHS.reset(token) + except (ValueError, LookupError): + logger.debug("Delegation depth reset from foreign context") + + +def _resolve_max_depth() -> int: + """Read max-depth from env at call time, falling back to default on parse error.""" + raw = os.getenv(_ENV_MAX_DELEGATION_DEPTH) + if raw is None: + return _DEFAULT_MAX_DELEGATION_DEPTH + try: + return int(raw) + except ValueError: + logger.warning( + "Invalid %s=%r; using default %d", + _ENV_MAX_DELEGATION_DEPTH, + raw, + _DEFAULT_MAX_DELEGATION_DEPTH, + ) + return _DEFAULT_MAX_DELEGATION_DEPTH + + +def _build_violation(current: int, resolved_max: int) -> GovernanceBlockException: + """Build the depth-exceeded exception (shared by sync and async guards).""" + return GovernanceBlockException.from_violation( + GovernanceViolation( + rule_id="ASI-02", + rule_name="Excessive Agency", + detail=f"Delegation depth {current} exceeds max {resolved_max}", + ) + ) + + +def _wrap_invoke(original: Any, agent_key: int, resolved_max: int) -> Any: + """Return a depth-guarded wrapper matching the sync/async shape of ``original``. + + Coroutine functions get an ``async def`` wrapper so the returned object + is itself an awaitable — wrapping with a sync function would return an + un-awaited coroutine and silently bypass the guard entirely. + + Depth lives in the module-level :data:`_DELEGATION_DEPTHS` ContextVar + keyed by ``agent_key`` (``id(agent)``), so every guarded agent shares + the same ContextVar instance and the interpreter's ContextVar + registry doesn't grow with each install. + """ + if asyncio.iscoroutinefunction(original): + + @functools.wraps(original) + async def _guarded_async(input_data: Any, **kwargs: Any) -> Any: + current, token = _enter_depth_if_under(agent_key, resolved_max) + if token is None: + raise _build_violation(current, resolved_max) + try: + return await original(input_data, **kwargs) + finally: + _exit_depth(token) + + return _guarded_async + + @functools.wraps(original) + def _guarded_sync(input_data: Any, **kwargs: Any) -> Any: + current, token = _enter_depth_if_under(agent_key, resolved_max) + if token is None: + raise _build_violation(current, resolved_max) + try: + return original(input_data, **kwargs) + finally: + _exit_depth(token) + + return _guarded_sync + + +# Method names we guard on the agent. ``ainvoke`` is required because +# LangChain / LangGraph / LlamaIndex agents expose it as the primary +# async entrypoint; wrapping only ``invoke`` would let async callers +# bypass the depth check entirely. A single ContextVar is shared across +# both so an async call that internally falls through to sync ``invoke`` +# still increments the same counter. +_GUARDED_METHODS = ("invoke", "ainvoke") + + +def install_delegation_guard(agent: Any, max_depth: int | None = None) -> None: + """Patch the agent's invoke methods to enforce a maximum delegation depth. + + Patches both ``invoke`` and ``ainvoke`` when present; each wrapper + matches the sync/async shape of the original so awaitables stay + awaitable. No-op when neither attribute exists or the agent has + already been guarded. + + Per-call-chain depth is tracked in a single :class:`contextvars.ContextVar` + shared across both methods so an ``ainvoke`` that internally calls + ``invoke`` still increments the same counter. Concurrent invokes on + the same agent (across threads or asyncio tasks) keep separate + counters because ContextVar values are per-context. + + Originals are stashed on the agent under + ``_uipath_original_`` so :func:`uninstall_delegation_guard` + can restore them on dispose. + """ + if max_depth is None: + max_depth = _resolve_max_depth() + if getattr(agent, "_delegation_wrapped", False): + return + + originals = { + name: getattr(agent, name, None) + for name in _GUARDED_METHODS + if callable(getattr(agent, name, None)) + } + if not originals: + return + + agent_key = id(agent) + resolved_max = max_depth + + for name, original in originals.items(): + try: + setattr(agent, name, _wrap_invoke(original, agent_key, resolved_max)) + setattr(agent, f"_uipath_original_{name}", original) + except (AttributeError, TypeError) as exc: + # Some agent objects expose `invoke` via __getattr__ or via a + # slot/descriptor that can't be re-assigned. Skip those — + # better to guard partial coverage than to crash the runtime. + logger.debug("Could not patch %s on agent: %s", name, exc) + agent._delegation_wrapped = True + logger.debug( + "Delegation guard installed (max=%d, methods=%s)", + resolved_max, + list(originals), + ) + + +def uninstall_delegation_guard(agent: Any) -> None: + """Restore the agent's invoke methods if a delegation guard was installed. + + Safe to call on agents that were never guarded. Also clears the + agent's entry from the current context's depth map — ``id(agent)`` + is reused by Python after GC, so a stale entry could mis-attribute + a future agent's count to this one. + """ + if not getattr(agent, "_delegation_wrapped", False): + return + for name in _GUARDED_METHODS: + attr = f"_uipath_original_{name}" + original = getattr(agent, attr, None) + if original is not None: + try: + setattr(agent, name, original) + except Exception as exc: # noqa: BLE001 - dispose path; never raise + logger.debug("Could not restore original %s: %s", name, exc) + try: + delattr(agent, attr) + except AttributeError: + pass + agent._delegation_wrapped = False + # Drop the agent's depth entry in the current context. Best-effort + # — if dispose runs from a different context than where the depth + # was set, the foreign context still owns its own copy and will + # discard it when it ends. + agent_key = id(agent) + try: + depths = _DELEGATION_DEPTHS.get() + except LookupError: + return + if agent_key in depths: + new_depths = {k: v for k, v in depths.items() if k != agent_key} + _DELEGATION_DEPTHS.set(new_depths) diff --git a/tests/test_delegation_guard.py b/tests/test_delegation_guard.py new file mode 100644 index 0000000..a1ba432 --- /dev/null +++ b/tests/test_delegation_guard.py @@ -0,0 +1,320 @@ +"""Tests for the async-aware delegation depth guard. + +The guard wraps an agent's ``invoke`` and ``ainvoke`` so a single +ContextVar tracks delegation depth across both sync and async call +chains. The async wrapper must itself be a coroutine — wrapping with a +sync function would return an un-awaited coroutine and silently bypass +the depth check. +""" + +from __future__ import annotations + +import asyncio +import os +from types import SimpleNamespace + +import pytest +from uipath.core.governance.exceptions import GovernanceBlockException + +from uipath.runtime.governance.delegation_guard import ( + install_delegation_guard, + uninstall_delegation_guard, +) + +# --------------------------------------------------------------------------- +# Helpers — minimal agent shapes the guard might encounter in the wild. +# --------------------------------------------------------------------------- + + +def _make_sync_agent() -> SimpleNamespace: + agent = SimpleNamespace() + agent.invoke = lambda payload, **_: {"sync": payload} + return agent + + +def _make_async_agent() -> SimpleNamespace: + agent = SimpleNamespace() + + async def _ainvoke(payload, **_): + return {"async": payload} + + agent.ainvoke = _ainvoke + return agent + + +def _make_dual_agent() -> SimpleNamespace: + """Agent with both sync invoke and async ainvoke (LangGraph React shape).""" + agent = _make_sync_agent() + + async def _ainvoke(payload, **_): + return {"async": payload} + + agent.ainvoke = _ainvoke + return agent + + +# --------------------------------------------------------------------------- +# Sync path — preserves the original behaviour the guard always had. +# --------------------------------------------------------------------------- + + +def test_sync_invoke_passes_through_under_limit() -> None: + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=3) + assert agent.invoke({"x": 1}) == {"sync": {"x": 1}} + + +def test_sync_invoke_raises_when_depth_exceeded() -> None: + """Recursive sync invokes blow the limit.""" + agent = SimpleNamespace() + calls = {"n": 0} + + def _invoke(_payload, **_): + calls["n"] += 1 + # Recurse into ourselves through the guarded attribute. + return agent.invoke({}) + + agent.invoke = _invoke + install_delegation_guard(agent, max_depth=3) + + with pytest.raises(GovernanceBlockException): + agent.invoke({}) + # Depth check fires inside the wrapper before the original runs, so + # we got exactly max_depth=3 successful entries plus one rejection. + assert calls["n"] == 3 + + +# --------------------------------------------------------------------------- +# Async path — the new shape this change unlocks. +# --------------------------------------------------------------------------- + + +def test_async_wrapper_is_a_coroutine_function() -> None: + """The wrapped ainvoke must itself be awaitable. + + Regression test for the original bug: a sync wrapper around an async + method returned an un-awaited coroutine and silently bypassed the + depth check entirely. + """ + agent = _make_async_agent() + install_delegation_guard(agent, max_depth=3) + assert asyncio.iscoroutinefunction(agent.ainvoke) + + +def test_async_invoke_passes_through_under_limit() -> None: + agent = _make_async_agent() + install_delegation_guard(agent, max_depth=3) + result = asyncio.run(agent.ainvoke({"x": 1})) + assert result == {"async": {"x": 1}} + + +def test_async_invoke_raises_when_depth_exceeded() -> None: + agent = SimpleNamespace() + calls = {"n": 0} + + async def _ainvoke(_payload, **_): + calls["n"] += 1 + return await agent.ainvoke({}) + + agent.ainvoke = _ainvoke + install_delegation_guard(agent, max_depth=3) + + with pytest.raises(GovernanceBlockException): + asyncio.run(agent.ainvoke({})) + assert calls["n"] == 3 + + +def test_sync_and_async_share_one_depth_counter() -> None: + """A coroutine that falls through to sync ``invoke`` increments the same counter.""" + agent = _make_dual_agent() + calls = {"n": 0} + + def _invoke(_payload, **_): + calls["n"] += 1 + # Sync self-recursion through the same guarded attribute. + return agent.invoke({}) + + async def _ainvoke(_payload, **_): + calls["n"] += 1 + # Cross-mode: async entry falls through to the sync path. + return agent.invoke({}) + + agent.invoke = _invoke + agent.ainvoke = _ainvoke + install_delegation_guard(agent, max_depth=2) + + with pytest.raises(GovernanceBlockException): + asyncio.run(agent.ainvoke({})) + # ainvoke (depth=1) → invoke (depth=2) → invoke (depth=3, blocked). + # The guard rejects the third call before _invoke runs, so calls=2. + assert calls["n"] == 2 + + +# --------------------------------------------------------------------------- +# Lifecycle — install / uninstall semantics. +# --------------------------------------------------------------------------- + + +def test_install_is_idempotent() -> None: + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=5) + wrapped_once = agent.invoke + install_delegation_guard(agent, max_depth=5) + assert agent.invoke is wrapped_once, "second install must not re-wrap" + + +def test_uninstall_restores_originals_for_both_methods() -> None: + agent = _make_dual_agent() + original_invoke = agent.invoke + original_ainvoke = agent.ainvoke + install_delegation_guard(agent, max_depth=5) + assert agent.invoke is not original_invoke + assert agent.ainvoke is not original_ainvoke + + uninstall_delegation_guard(agent) + assert agent.invoke is original_invoke + assert agent.ainvoke is original_ainvoke + assert not getattr(agent, "_delegation_wrapped", False) + + +def test_uninstall_safe_on_unguarded_agent() -> None: + agent = _make_sync_agent() + # Should not raise; should leave agent unchanged. + uninstall_delegation_guard(agent) + assert callable(agent.invoke) + + +# --------------------------------------------------------------------------- +# Edge cases. +# --------------------------------------------------------------------------- + + +def test_agent_without_invoke_methods_is_noop() -> None: + """Agents without any invokable method must not crash the install.""" + agent = SimpleNamespace(unrelated="value") + install_delegation_guard(agent, max_depth=5) + assert not getattr(agent, "_delegation_wrapped", False) + + +def test_env_var_max_depth_override(monkeypatch: pytest.MonkeyPatch) -> None: + """``UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH`` overrides the default.""" + monkeypatch.setenv("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH", "1") + agent = SimpleNamespace() + calls = {"n": 0} + + def _invoke(_payload, **_): + calls["n"] += 1 + return agent.invoke({}) + + agent.invoke = _invoke + install_delegation_guard(agent) # picks up env + + with pytest.raises(GovernanceBlockException): + agent.invoke({}) + assert calls["n"] == 1, "max_depth=1 should allow exactly one call" + + +def test_invalid_env_var_falls_back_to_default( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH", "not-a-number") + agent = _make_sync_agent() + # Should not raise on install — falls back silently to the default. + install_delegation_guard(agent) + assert os.environ.get("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH") == "not-a-number" + assert callable(agent.invoke) + + +# --------------------------------------------------------------------------- +# Leak / scaling — pins the shared-ContextVar design. +# --------------------------------------------------------------------------- + + +def test_install_does_not_allocate_per_agent_contextvars() -> None: + """N installs must not grow the module's ContextVar registry by N. + + The old implementation allocated a ``ContextVar`` per agent. Since + ContextVar instances are interned by the interpreter and never GC'd, + that was an unbounded leak. The current design holds a single + module-level ContextVar of ``dict[id(agent), int]``. + """ + from uipath.runtime.governance import delegation_guard as dg + + # Snapshot the single shared ContextVar. + shared_var = dg._DELEGATION_DEPTHS + + for _ in range(100): + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=3) + uninstall_delegation_guard(agent) + + # The module-level ContextVar is unchanged — same instance, no new + # ContextVars were allocated. + assert dg._DELEGATION_DEPTHS is shared_var + + +def test_two_agents_have_independent_depth_counters() -> None: + """Exhausting one agent's depth limit doesn't leak into another agent. + + Both agents share the single module-level ContextVar but the dict + inside isolates them via ``id(agent)``. + """ + from uipath.runtime.governance import delegation_guard as dg + + agent_a = SimpleNamespace() + calls_a = {"n": 0} + + def _invoke_a(_payload, **_): + calls_a["n"] += 1 + return agent_a.invoke({}) # self-recursion until limit + + agent_a.invoke = _invoke_a + + agent_b = _make_sync_agent() + + install_delegation_guard(agent_a, max_depth=2) + install_delegation_guard(agent_b, max_depth=2) + + # Drive agent_a to its limit. + with pytest.raises(GovernanceBlockException): + agent_a.invoke({}) + assert calls_a["n"] == 2 + + # agent_b is a fresh chain in the same context. Its depth counter + # is keyed by id(agent_b), so agent_a's exhausted state doesn't + # affect it. Without the per-agent keying, agent_b would inherit + # whatever depth was last set in this context. + assert agent_b.invoke({"x": 1}) == {"sync": {"x": 1}} + + # After both calls, the ContextVar should be back to its initial + # state — either unset (LookupError) or holding an empty dict. The + # set/reset pairs each guarded call cleaned up after itself. + try: + depths = dg._DELEGATION_DEPTHS.get() + except LookupError: + depths = {} + assert depths.get(id(agent_a), 0) == 0 + assert depths.get(id(agent_b), 0) == 0 + + +def test_uninstall_clears_agent_depth_entry() -> None: + """After uninstall, the agent's id is no longer in the depths dict. + + Prevents ``id(agent)`` reuse — Python recycles ids after GC — from + mis-attributing a future agent's count to this one. + """ + from uipath.runtime.governance import delegation_guard as dg + + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=5) + # Enter the guard once so the agent gets a depth entry. + agent.invoke({}) + # invoke completed -> token reset -> entry should be back to 0 or + # absent. We re-enter manually to plant a non-zero entry. + agent_key = id(agent) + dg._DELEGATION_DEPTHS.set({agent_key: 3}) + assert dg._DELEGATION_DEPTHS.get().get(agent_key) == 3 + + uninstall_delegation_guard(agent) + # Uninstall pops the entry from the current context. + assert agent_key not in dg._DELEGATION_DEPTHS.get()