diff --git a/src/uipath/runtime/governance/native/guardrail_compensation.py b/src/uipath/runtime/governance/native/guardrail_compensation.py new file mode 100644 index 0000000..2d04970 --- /dev/null +++ b/src/uipath/runtime/governance/native/guardrail_compensation.py @@ -0,0 +1,431 @@ +"""Compensating governance for disabled centralized guardrails. + +When a ``guardrail_fallback`` rule fires (the guardrail is mapped to +UiPath but the centralized policy is disabled), the framework asks the +governance-server to run the real guardrail check via its +``/{org_id}/agenticgovernance_/api/v1/runtime/govern`` endpoint. + +This call is **fire-and-forget**: the server runs the guardrail AND +writes the audit trace from its side. The agent doesn't inspect the +response — it only cares about whether the call reached the server. + +The call also runs on a **bounded background pool** so even an agent +that fires hundreds of compensation events in a session can't pile up +threads or memory. :data:`COMPENSATION_MAX_WORKERS` workers process +the queue, and an in-flight semaphore drops submissions when the pool +is genuinely saturated — at that point the next call is logged and +skipped rather than queued indefinitely. + +URL composition, request headers, org/tenant resolution, and the +request timeout all come from +:mod:`uipath.runtime.governance.native.backend_client` so the policy +fetch and the compensating call share one definition of every +operator-tunable. +""" + +from __future__ import annotations + +import atexit +import json +import logging +import os +import threading +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from typing import Any, TypedDict + +from uipath.runtime.governance.native.backend_client import ( + BACKEND_REQUEST_TIMEOUT_SECONDS, + COMPENSATION_MAX_WORKERS, + ENV_ACCESS_TOKEN, + ENV_ORGANIZATION_ID, + ENV_TENANT_ID, + GOVERN_API_PATH, + TENANT_HEADER, + build_governance_url, + governance_request_headers, + resolve_job_context, + resolve_organization_id, + resolve_tenant_id, +) + +logger = logging.getLogger(__name__) + + +# ---------------------------------------------------------------------------- +# Bounded thread pool — caps both concurrent threads AND queued work. +# +# ThreadPoolExecutor alone caps concurrent worker threads, but its internal +# queue is unbounded — a misbehaving agent that fires compensation faster than +# the server can absorb would queue indefinitely (memory pressure). The +# semaphore caps total in-flight submissions (running + queued) at a +# multiple of the worker count. Saturated submissions are dropped with a +# warning. Process exit cancels queued work and lets running tasks finish +# (bounded by their HTTP timeout) via the atexit handler. +# ---------------------------------------------------------------------------- + +_INFLIGHT_OVERSUBSCRIPTION = 4 # queue up to (workers × this many) before dropping +_INFLIGHT_CAP = COMPENSATION_MAX_WORKERS * _INFLIGHT_OVERSUBSCRIPTION + +_pool = ThreadPoolExecutor( + max_workers=COMPENSATION_MAX_WORKERS, + thread_name_prefix="governance-compensation", +) +_inflight = threading.BoundedSemaphore(_INFLIGHT_CAP) + + +@atexit.register +def _shutdown_pool() -> None: + """Cancel queued compensation tasks at process exit. + + ``wait=False`` returns immediately so process shutdown isn't held + up; ``cancel_futures=True`` (Python 3.9+) drops anything not yet + running. Tasks already running finish bounded by their HTTP + timeout (``BACKEND_REQUEST_TIMEOUT_SECONDS``). + """ + try: + _pool.shutdown(wait=False, cancel_futures=True) + except Exception: # noqa: BLE001 - shutdown must never raise from atexit + pass + + +# ---------------------------------------------------------------------------- +# Public API +# ---------------------------------------------------------------------------- + + +class FiredRule(TypedDict): + """Per-rule metadata carried in the /runtime/govern payload. + + One entry per matching ``guardrail_fallback`` condition (in practice + one per rule, since each fallback-rule typically declares a single + such condition). The server uses these to write per-rule LLMOps + trace records (Doc-2 audit structure). + """ + + ruleId: str + ruleName: str + packName: str + validator: str + + +def disabled_guardrails(audit: Any, policy_index: Any) -> list[FiredRule]: + """Return per-rule metadata for each fired guardrail-fallback rule. + + A guardrail rule fires only when it is mapped to UiPath + (``mapped_to_uipath`` true) but disabled (``policy_enabled`` false) — + see the ``guardrail_fallback`` operator. The validator name (e.g. + ``pii_detection``) is read from the rule's ``guardrail_fallback`` + check config and used as the ``type`` of the compensating call. + + One :class:`FiredRule` entry is emitted per matching + ``guardrail_fallback`` condition. Rules in this codebase declare a + single fallback condition each, so the returned list has one entry + per fired rule in practice; multi-condition rules would emit more + than one entry sharing the same ``ruleId``. + + Each entry carries the metadata the server needs to write one + per-rule LLMOps trace record:: + + { + "ruleId": "...", + "ruleName": "...", + "packName": "...", + "validator": "pii_detection", + } + """ + out: list[FiredRule] = [] + for ev in audit.evaluations: + if not ev.matched: + continue + rule = policy_index.get_rule(ev.rule_id) + if rule is None: + continue + for check in rule.checks: + for cond in check.conditions: + if cond.operator != "guardrail_fallback": + continue + if not isinstance(cond.value, dict): + continue + # The ``guardrail_fallback`` operator at evaluation time + # only matches when ``mapped_to_uipath=True`` AND + # ``policy_enabled=False``. We re-check here defensively + # so a future code path that bypasses the evaluator (or + # a multi-condition rule that fired on a sibling check) + # can't trigger a compensation call for a guardrail + # that isn't actually disabled. + if not bool(cond.value.get("mapped_to_uipath", False)): + continue + if bool(cond.value.get("policy_enabled", True)): + continue + validator = str(cond.value.get("validator", "")) + if validator: + out.append( + { + "ruleId": ev.rule_id, + "ruleName": ev.rule_name, + "packName": getattr(rule, "pack_name", "") or "", + "validator": validator, + } + ) + return out + + +def _validators(rules: list[FiredRule]) -> list[str]: + """Distinct validator names from the fired rules, preserving order.""" + return list(dict.fromkeys(r["validator"] for r in rules if r.get("validator"))) + + +def _resolve_trace_id(fallback: str) -> str: + """Resolve the agent's trace id while still on the caller thread. + + MUST be called before the background-pool hop in + :func:`submit_compensation`: the worker thread that issues the + ``/govern`` call has no OpenTelemetry context, so resolving there would + miss the live span and fall back to a detached id — orphaning the + server-written compensation records from the agent's real trace (which + is exactly what the native audit spans bind to). + + Order: live OTel span trace id (32-char hex) -> ``UiPathConfig.trace_id`` + -> the caller-supplied ``fallback``. + """ + try: + from opentelemetry import trace + + ctx = trace.get_current_span().get_span_context() + if ctx.is_valid: + return format(ctx.trace_id, "032x") + except Exception: # noqa: BLE001 - tracing is best-effort; fall through + pass + + try: + from uipath.platform.common import UiPathConfig + + if UiPathConfig.trace_id: + return UiPathConfig.trace_id + except (ImportError, AttributeError): + pass + + return fallback + + +def submit_compensation( + rules: list[FiredRule], + data: dict[str, Any], + hook: str, + trace_id: str, + src_timestamp: str, + agent_name: str, + runtime_id: str, +) -> None: + """Schedule a /runtime/govern call on the bounded background pool. + + Fire-and-forget. Returns immediately; the call runs on a worker + thread bounded by :data:`COMPENSATION_MAX_WORKERS`. When the + in-flight queue is saturated (cap = workers × oversubscription), + the call is dropped with a warning and the agent continues. + + ``rules`` is the per-rule metadata from :func:`disabled_guardrails`; + the validators sent to the guardrail API are derived from it. + + Never raises — including when the pool has already been shut down + by process exit. + """ + if not rules: + return + + validators = _validators(rules) + if not validators: + return + + # Resolve the trace id HERE, on the caller (hook) thread where the + # agent's OTel span is still live. The /govern call below runs on a + # background worker (_pool.submit -> _run -> request_governance) where + # that context is gone, so the resolved value is captured now and + # carried into the worker — ensuring the server writes compensation + # records under the agent's real trace, not a detached id. + trace_id = _resolve_trace_id(trace_id) + + if not _inflight.acquire(blocking=False): + logger.warning( + "Compensation pool saturated (>%d in flight); dropping call " + "(validators=[%s])", + _INFLIGHT_CAP, + ", ".join(validators), + ) + return + + def _run() -> None: + try: + request_governance( + rules=rules, + data=data, + hook=hook, + trace_id=trace_id, + src_timestamp=src_timestamp, + agent_name=agent_name, + runtime_id=runtime_id, + ) + except Exception as exc: # noqa: BLE001 - fail-open by contract + logger.warning( + "Compensation worker failed (validators=[%s]): %s", + ", ".join(validators), + exc, + ) + finally: + _inflight.release() + + try: + _pool.submit(_run) + except RuntimeError as exc: + # Pool was shut down (atexit or test teardown) — release the + # semaphore slot we took and log; never raise. + _inflight.release() + logger.warning( + "Compensation pool unavailable (validators=[%s]): %s", + ", ".join(validators), + exc, + ) + + +def request_governance( + rules: list[FiredRule], + data: dict[str, Any], + hook: str, + trace_id: str, + src_timestamp: str, + agent_name: str, + runtime_id: str, +) -> None: + """Synchronous POST to the org-scoped ``/runtime/govern`` endpoint. + + Most callers should use :func:`submit_compensation` to run this on + the bounded background pool. ``request_governance`` is exposed + directly only for callers that already manage their own + concurrency (and for tests). + + POSTs:: + + { + "type": ["pii_detection", "harmful_content"], + "rules": [ + {"ruleId": "...", "ruleName": "...", + "packName": "...", "validator": "pii_detection"} + ], + "data": {...}, + "hook": "before_model", + "traceId": "...", + "src_timestamp": "...", + "agentName": "...", + "runtimeId": "...", + "folderKey": "...", "jobKey": "...", "processKey": "...", + "referenceId": "...", "agentVersion": "..." + } + + ``type`` (the distinct validators) drives the guardrail API call; + ``rules`` + the job-context fields let the server write one LLMOps + trace record per rule (Doc-2 audit structure). The job-context keys + are included only when resolvable from ``UiPathConfig`` / env. + + Skipped if the org or tenant id can't be resolved (no URL / no + header). The server runs the disabled guardrails AND writes the + audit trace itself — the agent does not consume or parse the + response body. The only thing this function reports back is + *whether the call landed*: + + - **Success** → ``INFO`` log ``Govern call has been made``. + - **Failure** → ``WARNING`` log; returns ``None``. + + Never raises. + """ + if not rules: + return + + validators = _validators(rules) + if not validators: + return + + org_id = resolve_organization_id() + if not org_id: + logger.warning( + "Govern call skipped: UiPathConfig.organization_id is not " + "available (set %s or ensure uipath-platform is installed). " + "validators=[%s]", + ENV_ORGANIZATION_ID, + ", ".join(validators), + ) + return + + tenant_id = resolve_tenant_id() + if not tenant_id: + logger.warning( + "Govern call skipped: UiPathConfig.tenant_id is not " + "available (set %s or ensure uipath-platform is installed). " + "validators=[%s]", + ENV_TENANT_ID, + ", ".join(validators), + ) + return + + # Bearer token is required by the backend; sending without one + # produces a 401 per call and pollutes logs. Skip cleanly when the + # token isn't present (e.g. local dev, missing host bootstrap) + # rather than burning quota on guaranteed auth failures. + if not os.environ.get(ENV_ACCESS_TOKEN): + logger.warning( + "Govern call skipped: %s is not set in the environment; " + "compensation requires a bearer token. validators=[%s]", + ENV_ACCESS_TOKEN, + ", ".join(validators), + ) + return + + try: + payload = json.dumps( + { + "type": validators, + "rules": rules, + "data": data, + "hook": hook, + "traceId": trace_id, + "src_timestamp": src_timestamp, + "agentName": agent_name, + "runtimeId": runtime_id, + **resolve_job_context(), + }, + default=str, # coerce any non-JSON-native value safely + ).encode("utf-8") + except Exception as exc: # noqa: BLE001 - fail-open + logger.warning( + "Govern call payload serialization failed (validators=[%s]): %s", + ", ".join(validators), + exc, + ) + return + + url = build_governance_url(org_id, GOVERN_API_PATH) + headers = governance_request_headers(json_body=True) + headers[TENANT_HEADER] = tenant_id + + request = urllib.request.Request( + url, + data=payload, + headers=headers, + method="POST", + ) + try: + with urllib.request.urlopen( # noqa: S310 - URL is built from config + request, timeout=BACKEND_REQUEST_TIMEOUT_SECONDS + ) as response: + logger.info( + "Govern call has been made (status=%s, validators=[%s])", + getattr(response, "status", "?"), + ", ".join(validators), + ) + except Exception as exc: # noqa: BLE001 - fail-and-log + logger.warning( + "Govern call failed (validators=[%s]): %s", + ", ".join(validators), + exc, + ) diff --git a/tests/test_guardrail_compensation.py b/tests/test_guardrail_compensation.py new file mode 100644 index 0000000..9884a2b --- /dev/null +++ b/tests/test_guardrail_compensation.py @@ -0,0 +1,855 @@ +"""Tests for compensating governance calls to /runtime/govern. + +The compensating call is fire-and-forget: the server runs the disabled +guardrail AND writes the audit trace itself, so we don't parse the +response. These tests cover: + +- payload + header composition, +- URL resolution off the shared backend base URL, +- error swallowing (no exception escapes, warning is logged), +- evaluator integration (a fired ``guardrail_fallback`` rule kicks off + the call on a background daemon thread). +""" + +from __future__ import annotations + +import json +import threading +import time +from types import SimpleNamespace +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +from uipath.core.governance.models import Action, LifecycleHook + +from uipath.runtime.governance.config import ( + EnforcementMode, + reset_enforcement_mode, + set_enforcement_mode, +) +from uipath.runtime.governance.native import guardrail_compensation +from uipath.runtime.governance.native.backend_client import ( + USER_AGENT, + governance_request_headers, +) +from uipath.runtime.governance.native.evaluator import GovernanceEvaluator +from uipath.runtime.governance.native.guardrail_compensation import ( + _resolve_trace_id, + disabled_guardrails, + request_governance, +) +from uipath.runtime.governance.native.models import ( + Check, + CheckContext, + Condition, + PolicyIndex, + PolicyPack, + Rule, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mock_response(status: int = 200) -> MagicMock: + """urlopen()-compatible context manager mock.""" + response = MagicMock() + response.status = status + response.read.return_value = b"" # body is not consumed by fire-and-forget + response.__enter__.return_value = response + response.__exit__.return_value = False + return response + + +def _rules(*validators: str, rule_id: str = "R1", rule_name: str = "n", pack: str = "p"): + """Build the per-rule metadata list the compensation API now takes.""" + return [ + { + "ruleId": rule_id, + "ruleName": rule_name, + "packName": pack, + "validator": v, + } + for v in validators + ] + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _reset_enforcement_mode(): + reset_enforcement_mode() + yield + reset_enforcement_mode() + + +@pytest.fixture +def _govern_env(monkeypatch): + """Provide the env vars that request_governance requires. + + The compensating call mirrors the policy fetch — it skips when + ``UIPATH_ORGANIZATION_ID`` / ``UIPATH_TENANT_ID`` / + ``UIPATH_ACCESS_TOKEN`` are missing (sending without a bearer + token would generate a guaranteed 401 per call). Tests that need + the network path to actually fire must opt into this fixture. + """ + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "test-token") + yield + + +# --------------------------------------------------------------------------- +# Shared header helper (lives in backend_client; covered here because it's +# the wire shape both the compensation POST and the policy GET share) +# --------------------------------------------------------------------------- + + +def test_governance_request_headers_get_shape(monkeypatch): + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + headers = governance_request_headers() + assert headers == {"Accept": "application/json", "User-Agent": USER_AGENT} + + +def test_governance_request_headers_post_shape(monkeypatch): + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + headers = governance_request_headers(json_body=True) + assert headers == { + "Accept": "application/json", + "Content-Type": "application/json", + "User-Agent": USER_AGENT, + } + + +def test_governance_request_headers_includes_authorization_when_token_set( + monkeypatch, +): + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "abc.def.ghi") + headers = governance_request_headers(json_body=True) + assert headers["Authorization"] == "Bearer abc.def.ghi" + + +def test_governance_request_headers_user_agent_is_browser_shaped(monkeypatch): + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + headers = governance_request_headers() + assert headers["User-Agent"].startswith("Mozilla/5.0") + assert "Chrome/" in headers["User-Agent"] + + +# --------------------------------------------------------------------------- +# request_governance — fire-and-forget contract +# --------------------------------------------------------------------------- + + +def test_request_governance_empty_types_short_circuits_without_call(): + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + result = request_governance( + [], {}, "before_model", "t1", "2026-06-06T00:00:00Z", "agent", "rt" + ) + assert result is None + mock_urlopen.assert_not_called() + + +def test_request_governance_posts_expected_payload_and_returns_none( + monkeypatch, _govern_env +): + rules = [ + { + "ruleId": "R-PII", + "ruleName": "PII guardrail", + "packName": "AITL", + "validator": "pii_detection", + }, + { + "ruleId": "R-HARM", + "ruleName": "Harmful content", + "packName": "AITL", + "validator": "harmful_content", + }, + ] + # Job context is resolved from UiPathConfig/env at call time; pin it so + # the assertion is deterministic and exercises the new payload keys. + monkeypatch.setattr( + guardrail_compensation, + "resolve_job_context", + lambda: {"folderKey": "folder-1", "jobKey": "job-1"}, + ) + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + result = request_governance( + rules, + {"content": "hello"}, + "before_model", + "trace-1", + "2026-06-06T00:00:00Z", + "langchain", + "patch-langchain", + ) + + assert result is None # fire-and-forget + + request_arg = mock_urlopen.call_args.args[0] + assert request_arg.get_method() == "POST" + + sent = json.loads(request_arg.data.decode("utf-8")) + assert sent == { + # distinct validators drive the guardrail API call + "type": ["pii_detection", "harmful_content"], + # per-rule metadata drives one trace record per rule + "rules": rules, + "data": {"content": "hello"}, + "hook": "before_model", + "traceId": "trace-1", + "src_timestamp": "2026-06-06T00:00:00Z", + "agentName": "langchain", + "runtimeId": "patch-langchain", + "folderKey": "folder-1", + "jobKey": "job-1", + } + + +def test_request_governance_sends_shared_headers(_govern_env): + """Headers must come from the shared helper — UA + Accept + Content-Type + Auth.""" + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + request_governance( + _rules("x"), {}, "before_model", "t", "ts", "a", "r" + ) + + request_arg = mock_urlopen.call_args.args[0] + # urllib title-cases header keys on the Request object. + assert request_arg.get_header("Accept") == "application/json" + assert request_arg.get_header("Content-type") == "application/json" + assert request_arg.get_header("User-agent") == USER_AGENT + # Bearer is required (see ``test_request_governance_skipped_when_token_missing``). + assert request_arg.get_header("Authorization") == "Bearer test-token" + # Tenant header must travel on the compensating POST (same as the + # policy GET) — the agenticgovernance ingress validates it. + assert request_arg.get_header("X-uipath-internal-tenantid") == "tenant-xyz" + + +def test_request_governance_includes_bearer_token_when_set(monkeypatch, _govern_env): + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "the-token") + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + + request_arg = mock_urlopen.call_args.args[0] + assert request_arg.get_header("Authorization") == "Bearer the-token" + + +def test_request_governance_skipped_when_token_missing(monkeypatch): + """Missing bearer → skip cleanly instead of sending a guaranteed-401 request. + + Sending without a token would produce a 401 per compensation event + and pollute logs. Mirrors the org-id / tenant-id skip paths above. + """ + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + assert not mock_urlopen.called, ( + "request_governance must NOT POST when bearer token is missing" + ) + + +def test_request_governance_skipped_when_org_id_missing(monkeypatch): + """Without an org id, we cannot build the URL — skip the call entirely.""" + monkeypatch.delenv("UIPATH_ORGANIZATION_ID", raising=False) + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + mock_urlopen.assert_not_called() + + +def test_request_governance_skipped_when_tenant_id_missing(monkeypatch): + """Without a tenant id, the server's tenant header would be invalid.""" + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") + monkeypatch.delenv("UIPATH_TENANT_ID", raising=False) + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + mock_urlopen.assert_not_called() + + +def test_request_governance_swallows_network_error(_govern_env): + """A network error must not propagate. (Log emission is logger-config + dependent and is verified manually — the test-isolation behavior of + pytest's caplog conflicts with the runtime's log interceptor.)""" + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + side_effect=OSError("connection refused"), + ): + result = request_governance( + _rules("pii_detection"), + {}, + "before_model", + "t", + "ts", + "langchain", + "patch-langchain", + ) + + assert result is None + + +def test_request_governance_swallows_unexpected_exception(_govern_env): + """Even a programmer-error inside urlopen must not propagate.""" + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + side_effect=RuntimeError("boom"), + ): + assert ( + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + is None + ) + + +def test_request_governance_does_not_read_response_body(_govern_env): + """Fire-and-forget: we must not consume the response body.""" + response = _mock_response() + with patch.object( + guardrail_compensation.urllib.request, "urlopen", return_value=response + ): + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + response.read.assert_not_called() + + +def test_request_governance_url_is_org_scoped(monkeypatch, _govern_env): + """URL must include the org segment and the agenticgovernance_ prefix. + + Mirrors the policy fetch URL shape — the agenticgovernance ingress + requires both segments; without them the request lands on a route + that doesn't exist (404 / wrong service). + """ + monkeypatch.delenv("UIPATH_GOVERNANCE_BACKEND_URL", raising=False) + monkeypatch.setenv("UIPATH_URL", "https://cloud.uipath.com/my-org/my-tenant") + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + + # org_id="appsdev" comes from the _govern_env fixture, not from UIPATH_URL + # (UiPathConfig.organization_id is honoured first — same as policy). + assert ( + mock_urlopen.call_args.args[0].full_url + == "https://cloud.uipath.com/appsdev/agenticgovernance_/api/v1/runtime/govern" + ) + + +# --------------------------------------------------------------------------- +# submit_compensation — bounded background pool +# --------------------------------------------------------------------------- + + +def test_submit_compensation_empty_types_short_circuits(): + """submit_compensation with no types is a no-op (no semaphore taken).""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + # Patch the executor to a MagicMock so we'd notice any spurious submit. + with patch.object(guardrail_compensation, "_pool") as mock_pool: + submit_compensation([], {}, "before_model", "t", "ts", "a", "r") + mock_pool.submit.assert_not_called() + + +def test_submit_compensation_routes_through_pool(): + """A non-empty types list submits a single task to the pool.""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + with patch.object(guardrail_compensation, "_pool") as mock_pool: + submit_compensation( + _rules("pii_detection"), + {"content": "x"}, + "before_model", + "trace-1", + "ts", + "agent", + "run", + ) + mock_pool.submit.assert_called_once() + + +def test_submit_compensation_drops_when_pool_saturated(monkeypatch): + """When the in-flight semaphore is exhausted, the call is dropped + logged.""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + # Force the semaphore into "exhausted" state. + drained = threading.BoundedSemaphore(1) + drained.acquire() # value is now 0; next acquire(blocking=False) returns False + monkeypatch.setattr(guardrail_compensation, "_inflight", drained) + + with patch.object(guardrail_compensation, "_pool") as mock_pool: + submit_compensation( + _rules("pii_detection"), + {}, + "before_model", + "trace-1", + "ts", + "agent", + "run", + ) + + mock_pool.submit.assert_not_called() + + +def test_submit_compensation_swallows_pool_shutdown_runtimeerror(monkeypatch): + """If the pool was shut down at process exit, submit must not raise.""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + # Fresh semaphore so we don't taint other tests. + monkeypatch.setattr( + guardrail_compensation, "_inflight", threading.BoundedSemaphore(4) + ) + + class _ShutdownPool: + def submit(self, fn, *args, **kwargs): # noqa: ARG002 + raise RuntimeError("cannot schedule new futures after shutdown") + + monkeypatch.setattr(guardrail_compensation, "_pool", _ShutdownPool()) + + # Must not raise. + submit_compensation( + _rules("x"), {}, "before_model", "t", "ts", "a", "r" + ) + + +# --------------------------------------------------------------------------- +# disabled_guardrails +# --------------------------------------------------------------------------- + + +def test_disabled_guardrails_extracts_validators_for_fired_rules(): + cond = SimpleNamespace( + operator="guardrail_fallback", + value={ + "validator": "pii_detection", + "mapped_to_uipath": True, + "policy_enabled": False, + }, + ) + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])]) + audit = SimpleNamespace( + evaluations=[ + SimpleNamespace(matched=True, rule_id="R1", rule_name="PII guardrail") + ] + ) + policy_index = SimpleNamespace( + get_rule=lambda rid: rule if rid == "R1" else None + ) + + assert disabled_guardrails(audit, policy_index) == [ + { + "ruleId": "R1", + "ruleName": "PII guardrail", + "packName": "", + "validator": "pii_detection", + } + ] + + +def test_disabled_guardrails_skips_unmatched_evaluations(): + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=False, rule_id="R1", rule_name="x")] + ) + policy_index = SimpleNamespace(get_rule=lambda rid: None) + assert disabled_guardrails(audit, policy_index) == [] + + +def test_disabled_guardrails_skips_non_guardrail_conditions(): + cond = SimpleNamespace(operator="regex", value="some-pattern") + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])]) + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=True, rule_id="R1", rule_name="x")] + ) + policy_index = SimpleNamespace(get_rule=lambda rid: rule) + assert disabled_guardrails(audit, policy_index) == [] + + +# --------------------------------------------------------------------------- +# Evaluator integration: a guardrail_fallback rule kicks off the compensation +# --------------------------------------------------------------------------- + + +def _guardrail_fallback_rule() -> Rule: + """A rule whose only check is a guardrail_fallback condition. + + Mirrors what ``_build_check`` produces for a YAML + ``type: guardrail_fallback`` entry with the guardrail mapped to + UiPath but disabled. + """ + return Rule( + rule_id="UIP-GR-01", + name="PII guardrail (UiPath-mapped, disabled)", + clause="UiPath-Mapped Guardrail", + hook=LifecycleHook.BEFORE_MODEL, + action=Action.AUDIT, + checks=[ + Check( + conditions=[ + Condition( + operator="guardrail_fallback", + field="", + value={ + "validator": "pii_detection", + "mapped_to_uipath": True, + "policy_enabled": False, + }, + ) + ], + action=Action.AUDIT, + message="PII guardrail disabled", + ) + ], + ) + + +def _build_index_with(rule: Rule) -> PolicyIndex: + idx = PolicyIndex() + idx.add_pack( + PolicyPack( + name="test_pack", + version="1.0", + description="test", + rules=[rule], + ) + ) + return idx + + +def test_evaluator_dispatches_compensation_for_fired_guardrail(): + """A matched guardrail_fallback rule must trigger request_governance.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(_guardrail_fallback_rule())) + + called = threading.Event() + captured: dict[str, Any] = {} + + def _spy(**kwargs: Any) -> None: + captured.update(kwargs) + called.set() + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="contact jane@acme.com", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", _spy + ): + audit = evaluator.evaluate(ctx) + + assert called.wait(timeout=1.0), ( + "Expected request_governance to be called on a background thread" + ) + + assert audit.final_action == Action.AUDIT + assert audit.rules_matched == 1 + assert captured["rules"] == [ + { + "ruleId": "UIP-GR-01", + "ruleName": "PII guardrail (UiPath-mapped, disabled)", + "packName": "test_pack", + "validator": "pii_detection", + } + ] + assert captured["data"] == {"content": "contact jane@acme.com"} + assert captured["hook"] == "before_model" + assert captured["trace_id"] == "trace-1" + assert captured["agent_name"] == "agent-x" + assert captured["runtime_id"] == "run-1" + assert isinstance(captured["src_timestamp"], str) + assert "T" in captured["src_timestamp"] + + +def test_evaluator_does_not_dispatch_when_guardrail_is_enabled(): + rule = _guardrail_fallback_rule() + rule.checks[0].conditions[0].value["policy_enabled"] = True # type: ignore[index] + + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(rule)) + + called = threading.Event() + + def _spy(**kwargs: Any) -> None: + called.set() + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", _spy + ): + audit = evaluator.evaluate(ctx) + time.sleep(0.05) + + assert not called.is_set() + assert audit.rules_matched == 0 + + +def test_evaluator_does_not_dispatch_when_not_mapped_to_uipath(): + rule = _guardrail_fallback_rule() + rule.checks[0].conditions[0].value["mapped_to_uipath"] = False # type: ignore[index] + rule.checks[0].conditions[0].value["policy_enabled"] = False # type: ignore[index] + + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(rule)) + + called = threading.Event() + + def _spy(**kwargs: Any) -> None: + called.set() + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", _spy + ): + evaluator.evaluate(ctx) + time.sleep(0.05) + + assert not called.is_set() + + +def test_evaluator_compensation_dispatch_swallows_thread_errors(): + """If request_governance raises, the background thread must absorb it.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(_guardrail_fallback_rule())) + + def _raising_spy(**kwargs: Any) -> None: + raise RuntimeError("network down") + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", + _raising_spy, + ): + audit = evaluator.evaluate(ctx) + time.sleep(0.05) + + assert audit.final_action == Action.AUDIT + assert audit.rules_matched == 1 + + +def test_evaluator_does_not_emit_audit_trace_for_guardrail_fallback_rule(): + """Python must not emit a per-rule audit trace for ``guardrail_fallback``. + + The governance-server emits the trace in response to the + ``/runtime/govern`` POST; emitting one here too would produce a + duplicate. The rule still appears in the AuditRecord (so + ``disabled_guardrails`` can find it) and the compensation thread + still fires — only the per-rule ``rule_evaluation`` event is + suppressed, and the hook summary's counts exclude it. + """ + from uipath.runtime.governance.audit import ( + AuditEvent, + AuditSink, + EventType, + get_audit_manager, + reset_audit_manager, + ) + + class _CapturingSink(AuditSink): + def __init__(self) -> None: + self.events: list[AuditEvent] = [] + + @property + def name(self) -> str: + return "capturing" + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + reset_audit_manager() + try: + manager = get_audit_manager() + for existing in list(manager.list_sinks()): + manager.unregister_sink(existing) + sink = _CapturingSink() + manager.register_sink(sink) + manager._async_mode = False # synchronous emission for assertions + + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_guardrail_fallback_rule()) + ) + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + # Stub the network call so it doesn't actually post; we're + # asserting on the Python-emitted trace events, not on whether + # /runtime/govern was reached. + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", + lambda **kwargs: None, + ): + audit = evaluator.evaluate(ctx) + time.sleep(0.05) # let the daemon thread land + + # The rule still matched and is in the audit record … + assert audit.rules_matched == 1 + assert any( + ev.matched and ev.rule_id == "UIP-GR-01" for ev in audit.evaluations + ) + + # … but NO rule_evaluation event for it was emitted by Python. + rule_events = [ + e for e in sink.events if e.event_type == EventType.RULE_EVALUATION + ] + assert not any( + e.data.get("rule_id") == "UIP-GR-01" for e in rule_events + ), "guardrail_fallback rule must not emit a Python-side audit trace" + + # The hook summary's counts must also exclude the fallback rule + # (so total_rules / matched_rules match what was actually emitted). + summaries = [ + e for e in sink.events if e.event_type == EventType.HOOK_END + ] + assert len(summaries) == 1 + assert summaries[0].data["total_rules"] == 0 + assert summaries[0].data["matched_rules"] == 0 + finally: + reset_audit_manager() + + +# --------------------------------------------------------------------------- +# _resolve_trace_id — must capture the live trace on the caller thread +# (the /govern call later runs on a worker thread with no OTel context). +# --------------------------------------------------------------------------- + + +def test_resolve_trace_id_prefers_active_otel_span(): + """Inside an active span, it returns that span's trace id (32-char hex). + + This is the binding fix: the server-written compensation records must + land on the agent's real trace — the same one the native audit spans + use — not a detached id. + """ + from opentelemetry.sdk.trace import TracerProvider + + tracer = TracerProvider().get_tracer("test") + with tracer.start_as_current_span("root") as span: + expected = format(span.get_span_context().trace_id, "032x") + result = _resolve_trace_id("fallback-id") + assert result == expected + assert len(result) == 32 # dashless OTel hex, not a dashed uuid + + +def test_resolve_trace_id_uses_fallback_without_context(): + """With no active span and no resolvable platform trace id, fallback wins.""" + import sys + + # Force the optional `uipath.platform` lookup to miss (it may or may not + # be installed in this repo's env), and we're outside any active span — + # so neither source can supply an id and the fallback must be returned. + with patch.dict(sys.modules, {"uipath.platform.common": None}): + assert _resolve_trace_id("fallback-id") == "fallback-id" + + +def test_submit_compensation_captures_live_trace_before_thread_hop(): + """End-to-end thread-boundary proof for the binding fix. + + ``submit_compensation`` runs on the caller (hook) thread, then hands the + ``/govern`` call to a background worker pool. This test asserts BOTH + halves of why the resolve must happen at the entry: + + 1. On the **worker thread**, the OTel context is gone — resolving there + would miss the live span (so the early capture is mandatory). + 2. Despite that, ``request_governance`` (on the worker) receives the + **live span's** trace id, not the stale fallback we passed in — + proving it was captured on the caller thread before the hop. + """ + from opentelemetry.sdk.trace import TracerProvider + + tracer = TracerProvider().get_tracer("test") + + done = threading.Event() + captured: dict[str, Any] = {} + + def _spy(**kwargs: Any) -> None: + # This runs on the background worker thread. + captured["trace_id"] = kwargs["trace_id"] + # Prove the worker has NO live context: if we resolved *here*, the + # sentinel would survive untouched. + captured["worker_resolves_to"] = _resolve_trace_id("WORKER-MISS") + done.set() + + with patch.object(guardrail_compensation, "request_governance", _spy): + with tracer.start_as_current_span("agent-run") as span: + expected = format(span.get_span_context().trace_id, "032x") + guardrail_compensation.submit_compensation( + rules=_rules("pii_detection"), + data={"content": "contact jane@acme.com"}, + hook="before_model", + trace_id="stale-fallback", # must be overridden by the live trace + src_timestamp="2026-06-06T00:00:00Z", + agent_name="agent", + runtime_id="rt", + ) + assert done.wait(timeout=2.0), "compensation worker never ran" + + # (1) worker thread could not see the span — fell back to the sentinel + assert captured["worker_resolves_to"] == "WORKER-MISS" + # (2) but the value it received is the live span trace, captured pre-hop + assert captured["trace_id"] == expected + assert captured["trace_id"] != "stale-fallback"