diff --git a/src/uipath/runtime/governance/native/_yaml_to_index.py b/src/uipath/runtime/governance/native/_yaml_to_index.py new file mode 100644 index 0000000..2deb463 --- /dev/null +++ b/src/uipath/runtime/governance/native/_yaml_to_index.py @@ -0,0 +1,459 @@ +"""Runtime YAML → PolicyIndex parser. + +Mirrors the shape produced by ``packs/compile_packs.py`` but builds the +PolicyIndex directly from parsed YAML data rather than generating Python +source. Used by :mod:`uipath.runtime.governance.native.loader` when policies are fetched +from the governance backend at startup. + +Accepts either a single YAML document (one pack) or a multi-document +stream (``---``-separated packs). Unknown check types and malformed +rules are skipped with a warning — partial packs are preferred over +failing the whole load. +""" + +from __future__ import annotations + +import logging +from typing import Any + +import yaml +from uipath.core.governance.models import Action, LifecycleHook + +from uipath.runtime.governance.native.models import ( + Check, + Condition, + PolicyIndex, + PolicyPack, + Rule, + Severity, +) + +logger = logging.getLogger(__name__) + + +_HOOK_MAP: dict[str, LifecycleHook] = { + "before_agent": LifecycleHook.BEFORE_AGENT, + "after_agent": LifecycleHook.AFTER_AGENT, + "before_model": LifecycleHook.BEFORE_MODEL, + "after_model": LifecycleHook.AFTER_MODEL, + "wrap_tool_call": LifecycleHook.TOOL_CALL, + "tool_call": LifecycleHook.TOOL_CALL, + "after_tool": LifecycleHook.AFTER_TOOL, +} + +_ACTION_MAP: dict[str, Action] = { + "block": Action.DENY, + "deny": Action.DENY, + "log": Action.AUDIT, + "audit": Action.AUDIT, + "allow": Action.ALLOW, + "require_approval": Action.ESCALATE, + "escalate": Action.ESCALATE, +} + +_SEVERITY_MAP: dict[str, Severity] = { + "low": Severity.LOW, + "medium": Severity.MEDIUM, + "high": Severity.HIGH, + "critical": Severity.CRITICAL, +} + + +def build_policy_index_from_yaml(yaml_text: str) -> PolicyIndex: + """Parse YAML policy packs into a PolicyIndex. + + Args: + yaml_text: YAML body, either a single document or ``---``-separated + multi-document stream. Each document is one pack. + + Returns: + PolicyIndex with all successfully parsed packs added. Empty when + the input has no parseable packs. + + Raises: + yaml.YAMLError: If the YAML itself is malformed. Callers are + expected to fall back to the compiled index on this error. + """ + index = PolicyIndex() + documents = list(yaml.safe_load_all(yaml_text)) + + for doc in documents: + if not isinstance(doc, dict): + continue + pack = _build_pack(doc) + if pack is not None and pack.rules: + index.add_pack(pack) + + logger.debug( + "Built PolicyIndex from YAML: packs=%s, rules=%d", + index.pack_names, + index.total_rules, + ) + return index + + +def _build_pack(data: dict[str, Any]) -> PolicyPack | None: + """Build a PolicyPack from one YAML document.""" + name = data.get("standard") or data.get("name") + if not name: + logger.warning("Skipping pack: missing 'standard'/'name' field") + return None + + default_action_str = data.get("default_action", "block") + default_action = _ACTION_MAP.get(default_action_str, Action.DENY) + + rules: list[Rule] = [] + for i, rule_data in enumerate(data.get("rules", []) or []): + if not isinstance(rule_data, dict): + continue + rule = _build_rule(rule_data, default_action, i) + if rule is not None: + rules.append(rule) + + return PolicyPack( + name=str(name), + version=str(data.get("version", "1.0.0")), + description=str(data.get("description", "")), + rules=rules, + ) + + +def _build_rule( + data: dict[str, Any], default_action: Action, index: int +) -> Rule | None: + """Build a single Rule from a YAML rule entry.""" + hook = _HOOK_MAP.get(data.get("hook", "before_model")) + if hook is None: + logger.warning( + "Skipping rule %s: unknown hook %r", data.get("id"), data.get("hook") + ) + return None + + action_str = data.get("action") + action = ( + _ACTION_MAP.get(action_str, default_action) if action_str else default_action + ) + + default_sev = "high" if action == Action.DENY else "medium" + severity = _SEVERITY_MAP.get(data.get("severity", default_sev), Severity.HIGH) + + checks = _build_checks( + data.get("checks", []) or [], + action, + mapped_to_uipath=bool(data.get("mapped_to_uipath", False)), + policy_enabled=bool(data.get("policy_enabled", True)), + ) + + # If checks were declared but none could be parsed (e.g. all unknown + # types), skip the rule. A rule with zero checks "always matches" in + # the evaluator, so keeping it would make it fire on every request. + declared = data.get("checks", []) or [] + if declared and not checks: + logger.warning( + "Skipping rule %s: none of its %d declared check(s) could be parsed", + data.get("id"), + len(declared), + ) + return None + + return Rule( + rule_id=str(data.get("id", f"RULE-{index}")), + name=str(data.get("name", data.get("id", f"RULE-{index}"))), + clause=str(data.get("clause", data.get("owasp_ref", ""))), + hook=hook, + action=action, + severity=severity, + checks=checks, + enabled=bool(data.get("enabled", True)), + description=str(data.get("description", "")), + ) + + +def _build_checks( + checks_data: list[dict[str, Any]], + default_action: Action, + *, + mapped_to_uipath: bool = False, + policy_enabled: bool = True, +) -> list[Check]: + """Build the checks list for a rule. + + ``mapped_to_uipath`` / ``policy_enabled`` are rule-level flags read + by ``guardrail_fallback`` checks so the per-check condition can + decide whether to fire the compensating governance call. + """ + checks: list[Check] = [] + for check_data in checks_data: + if not isinstance(check_data, dict): + continue + check = _build_check( + check_data, + default_action, + mapped_to_uipath=mapped_to_uipath, + policy_enabled=policy_enabled, + ) + if check is not None: + checks.append(check) + return checks + + +def _build_check( + data: dict[str, Any], + default_action: Action, + *, + mapped_to_uipath: bool = False, + policy_enabled: bool = True, +) -> Check | None: + """Build one Check from a YAML check entry. + + Supports the same check types as ``compile_packs.py``: explicit + conditions, regex, budget, tool_allowlist, parameter_validation, + rate_limit, field_regex, sentiment_concern, data_quality_score, + incident_taxonomy, commitment_extractor, plus ``guardrail_fallback`` + (reads the rule-level ``mapped_to_uipath`` / ``policy_enabled`` flags + threaded in from ``_build_rule``). + """ + conditions: list[Condition] = [] + message = "" + + raw_conditions = data.get("conditions") + has_explicit_conditions = ( + isinstance(raw_conditions, list) + and raw_conditions + and isinstance(raw_conditions[0], dict) + and "operator" in raw_conditions[0] + ) + + check_type = data.get("type", "regex") + + if has_explicit_conditions: + assert isinstance(raw_conditions, list) # narrowed by has_explicit_conditions + conditions.extend(_make_conditions(raw_conditions)) + message = str(data.get("message", "")) + + elif check_type == "regex": + patterns = data.get("patterns", []) or [] + scope = data.get("scope", ["human", "ai"]) + field = _field_for_scope(scope) + for pattern in patterns: + conditions.append(Condition(operator="regex", field=field, value=pattern)) + message = f"Pattern matched in {scope}" + + elif check_type == "budget": + if "max_tool_calls_per_session" in data: + conditions.append( + Condition( + operator="gt", + field="session_state.tool_calls", + value=data["max_tool_calls_per_session"], + ) + ) + if "max_tool_calls_per_minute" in data: + conditions.append( + Condition( + operator="gt", + field="session_state.tool_calls_per_minute", + value=data["max_tool_calls_per_minute"], + ) + ) + if "max_consecutive_tool_calls" in data: + conditions.append( + Condition( + operator="gt", + field="session_state.consecutive_tool_calls", + value=data["max_consecutive_tool_calls"], + ) + ) + message = "Tool budget exceeded" + + elif check_type == "tool_allowlist": + blocked_tools = data.get("blocked_tools", []) or [] + if blocked_tools: + conditions.append( + Condition(operator="in_list", field="tool_name", value=blocked_tools) + ) + message = "Tool not allowed" + + elif check_type == "parameter_validation": + for pattern in data.get("additional_patterns", []) or []: + conditions.append( + Condition(operator="regex", field="tool_args", value=pattern) + ) + message = "Suspicious pattern in tool parameters" + + elif check_type == "rate_limit": + if "max_llm_calls_per_session" in data: + conditions.append( + Condition( + operator="gt", + field="session_state.llm_calls", + value=data["max_llm_calls_per_session"], + ) + ) + if "max_llm_calls_per_minute" in data: + conditions.append( + Condition( + operator="gt", + field="session_state.llm_calls_per_minute", + value=data["max_llm_calls_per_minute"], + ) + ) + message = "Rate limit exceeded" + + elif check_type == "field_regex": + conditions.extend(_make_conditions(data.get("conditions", []) or [])) + message = str(data.get("message", "Field regex check failed")) + + elif check_type == "data_quality_score": + field = data.get("field", "tool_result") + if data.get("check_encoding", True): + conditions.append( + Condition( + operator="encoding_concern", + field=field, + value={ + "min_confidence": float(data.get("min_confidence", 0.5)), + "max_replacement_ratio": float( + data.get("max_replacement_ratio", 0.05) + ), + "min_corruption_events": int( + data.get("min_corruption_events", 2) + ), + }, + ) + ) + if data.get("check_entropy", True): + conditions.append( + Condition( + operator="entropy_concern", + field=field, + value={ + "min": float(data.get("entropy_min", 1.5)), + "max": float(data.get("entropy_max", 7.5)), + }, + ) + ) + message = str( + data.get("message", "A.7.4: Data quality signal (encoding or entropy)") + ) + + elif check_type == "incident_taxonomy": + field = data.get("field", "model_output") + categories = data.get("categories") + value: dict[str, Any] = {} + if categories: + value["categories"] = list(categories) + conditions.append( + Condition(operator="incident_concern", field=field, value=value) + ) + message = str(data.get("message", "A.8.4: Incident signal detected")) + + elif check_type == "commitment_extractor": + field = data.get("field", "model_output") + conditions.append( + Condition( + operator="commitment_concern", + field=field, + value={ + "require_amount": bool(data.get("require_amount", True)), + "require_deadline": bool(data.get("require_deadline", False)), + }, + ) + ) + message = str( + data.get("message", "A.10.4: Customer commitment language detected") + ) + + elif check_type == "sentiment_concern": + field = data.get("field", "model_input") + threshold = float(data.get("threshold", -0.3)) + conditions.append( + Condition( + operator="vader_concern", + field=field, + value={"threshold": threshold}, + ) + ) + message = str( + data.get( + "message", + f"Negative sentiment detected (VADER compound <= {threshold})", + ) + ) + + elif check_type == "guardrail_fallback": + # Centralized guardrail compensating control. The on/off state + # lives at the RULE level (mapped_to_uipath / policy_enabled), + # threaded in from ``_build_rule``; ``validator`` names which + # guardrail check the server should run on behalf of the agent. + # The condition matches only when the guardrail is mapped to + # UiPath but disabled — see the ``guardrail_fallback`` operator + # in :class:`GovernanceEvaluator`. + conditions.append( + Condition( + operator="guardrail_fallback", + field="", + value={ + "validator": str(data.get("validator", "")), + "mapped_to_uipath": mapped_to_uipath, + "policy_enabled": policy_enabled, + }, + ) + ) + message = str( + data.get("message", "Guardrail disabled — compensating check needed.") + ) + + else: + logger.debug("Skipping check: unknown type %r", check_type) + return None + + if not conditions: + return None + + action_str = data.get("action") + action = ( + _ACTION_MAP.get(action_str, default_action) if action_str else default_action + ) + + message = str(data.get("message", message)) + + # Multi-pattern regex/parameter_validation defaults to OR semantics + # (any pattern indicates a hit); explicit `logic` in YAML wins. + if check_type in ("parameter_validation", "regex") and len(conditions) > 1: + default_logic = "any" + else: + default_logic = "all" + logic = str(data.get("logic", default_logic)) + + return Check(conditions=conditions, action=action, message=message, logic=logic) + + +def _make_conditions(raw: list[dict[str, Any]]) -> list[Condition]: + """Translate a list of YAML condition dicts into Condition objects.""" + out: list[Condition] = [] + for cond in raw: + if not isinstance(cond, dict): + continue + out.append( + Condition( + operator=str(cond.get("operator", "regex")), + field=str(cond.get("field", "model_input")), + value=cond.get("value", ""), + negate=bool(cond.get("negate", False)), + ) + ) + return out + + +def _field_for_scope(scope: list[str] | str) -> str: + """Map a YAML `scope` value to the CheckContext field it targets.""" + if isinstance(scope, str): + scope = [scope] + if "system" in scope or "human" in scope: + return "model_input" + if "ai" in scope: + return "model_output" + if "tool_result" in scope: + return "tool_result" + return "model_input" diff --git a/src/uipath/runtime/governance/native/backend_client.py b/src/uipath/runtime/governance/native/backend_client.py new file mode 100644 index 0000000..8269ea7 --- /dev/null +++ b/src/uipath/runtime/governance/native/backend_client.py @@ -0,0 +1,383 @@ +"""Governance backend client. + +Hosts the shared infrastructure used by every governance-backend call: + +- :func:`get_backend_base_url` — resolves the cloud host (with the + org/tenant path segments stripped) so each endpoint builder can + append its own scoped path. +- :func:`governance_request_headers` — composes the headers shared by + the policy fetch and the ``/runtime/govern`` compensating POST + (Accept, User-Agent, optional Content-Type, optional Bearer auth). +- :func:`build_governance_url` — composes an org-scoped URL against + the ``agenticgovernance_`` ingress. +- :func:`resolve_organization_id` / :func:`resolve_tenant_id` — read + the active org/tenant from ``UiPathConfig`` with an env-var fallback + for installations that don't have ``uipath-platform``. +- :func:`safe_call` — fail-open helper that catches every non-block + exception so governance hooks never crash an agent run. +- Module-level constants — request timeout, service path prefix, + compensation pool size — all the tunables an operator might care + about. Defined once here so the policy fetch, the compensating + ``/runtime/govern`` call, and the loader share one definition. + +The endpoint clients live next door: + +- :mod:`uipath.runtime.governance.native.policy_api_client` — policy fetch +- :mod:`uipath.runtime.governance.native.guardrail_compensation` — /runtime/govern +""" + +from __future__ import annotations + +import logging +import os +from functools import lru_cache +from typing import Callable +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + +# ---------------------------------------------------------------------------- +# Env-var names (consumed by the helpers below + diagnostic messages) +# ---------------------------------------------------------------------------- + +# Explicit dev/test override — used verbatim, no path-stripping. +ENV_BACKEND_BASE_URL = "UIPATH_GOVERNANCE_BACKEND_URL" +# The canonical platform URL env var (also backs ``UiPathConfig.base_url``). +ENV_PLATFORM_BASE_URL = "UIPATH_URL" +# Bearer token; missing means the policy fetch and compensating call are +# skipped (and that fact is logged) rather than producing 401s on every call. +ENV_ACCESS_TOKEN = "UIPATH_ACCESS_TOKEN" +# Org / tenant scoping for the agenticgovernance_ ingress. +ENV_ORGANIZATION_ID = "UIPATH_ORGANIZATION_ID" +ENV_TENANT_ID = "UIPATH_TENANT_ID" +# Job-execution context forwarded in the /runtime/govern payload so the +# server can populate the LLMOps trace record (Doc-2 audit structure). +# Each falls back to the named env var when uipath-platform isn't present. +ENV_FOLDER_KEY = "UIPATH_FOLDER_KEY" +ENV_JOB_KEY = "UIPATH_JOB_KEY" +ENV_PROCESS_KEY = "UIPATH_PROCESS_UUID" +ENV_REFERENCE_ID = "UIPATH_AGENT_ID" +ENV_AGENT_VERSION = "UIPATH_PROCESS_VERSION" + +# ---------------------------------------------------------------------------- +# Endpoint shape — all governance calls hit the org-scoped agenticgovernance_ +# service. Centralised so adding a third endpoint is "one new path constant" +# instead of "a new path template that someone forgets to keep in sync." +# ---------------------------------------------------------------------------- + +GOVERNANCE_SERVICE_PREFIX = "agenticgovernance_" +POLICY_API_PATH = "api/v1/runtime/policy" +GOVERN_API_PATH = "api/v1/runtime/govern" +TENANT_HEADER = "x-uipath-internal-tenantid" +# Query param on the policy fetch that selects the agent-type view of the +# policy: the server's clause-resolver reads the matching container key +# (``*-in-flight-conversational-agents`` vs ``*-in-flight-agents``). It's a +# representation selector (it changes the returned policy), so it travels as a +# query param — cache-correct and part of resource identification — not a +# header. Values: "conversational" | "autonomous". +AGENT_TYPE_PARAM = "agentType" +AGENT_TYPE_CONVERSATIONAL = "conversational" +AGENT_TYPE_AUTONOMOUS = "autonomous" + +# Default base URL when no override and no UiPathConfig / UIPATH_URL value is +# available. Used only on developer machines doing fully-offline work; real +# deployments always have UIPATH_URL injected by the host. +_DEFAULT_BACKEND_BASE_URL = "https://alpha.uipath.com" + +# ---------------------------------------------------------------------------- +# Tunables — one place so an ops change is one edit. The values that bound +# how long a single agent run can spend on governance traffic. +# ---------------------------------------------------------------------------- + +# Per-request timeout for any governance backend HTTP call (policy fetch, +# /runtime/govern compensating POST). Same value used everywhere so an agent +# can't accidentally end up with a "long" timeout on one call and "short" on +# another. +BACKEND_REQUEST_TIMEOUT_SECONDS = 10.0 + +# Bound on concurrent /runtime/govern requests in flight. A misbehaving +# agent that fires `before_model` 100 times in a session with three matched +# fallback rules each would otherwise spawn 100 daemon threads; this pool +# caps the concurrency. Saturated submissions are logged and dropped — the +# server still receives traces from the requests that did land. +COMPENSATION_MAX_WORKERS = 4 + +# Browser-shaped User-Agent. Required because the alpha/production +# governance ingress runs a WAF whose default scanner rule set blocks +# ``Python-urllib/``. Identifying as a real browser keeps the +# request from being rejected before any auth/tenant logic runs. +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/148.0.0.0 Safari/537.36" +) + + +# ---------------------------------------------------------------------------- +# Headers +# ---------------------------------------------------------------------------- + + +def governance_request_headers(*, json_body: bool = False) -> dict[str, str]: + """Return the common HTTP headers for governance backend requests. + + Centralises the headers shared between the policy fetch and the + compensating ``/runtime/govern`` POST so the UA and auth shape are + declared once. + + Args: + json_body: When ``True`` (POST/PATCH/etc. with a JSON payload), + adds ``Content-Type: application/json``. GETs leave it off + so origin servers that 415 on unexpected Content-Type stay + happy. + + Returns: + A new dict with: + + - ``Accept: application/json`` + - ``User-Agent`` (the browser-shaped string above) + - ``Content-Type: application/json`` when ``json_body=True`` + - ``Authorization: Bearer `` when the env + var is set; omitted otherwise (caller decides whether the + missing token is fatal). + + Endpoint-specific headers (e.g. ``x-uipath-internal-tenantid``) are + added by the caller after this helper returns. + """ + headers: dict[str, str] = { + "Accept": "application/json", + "User-Agent": USER_AGENT, + } + if json_body: + headers["Content-Type"] = "application/json" + token = os.environ.get(ENV_ACCESS_TOKEN) + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + +# ---------------------------------------------------------------------------- +# URL composition +# ---------------------------------------------------------------------------- + + +def _strip_to_origin(raw_url: str) -> str: + """Return ``scheme://host[:port]`` for ``raw_url``, dropping any path. + + Platform URLs are commonly ``https://cloud.uipath.com//``; + the governance endpoints construct their own + ``/{org}/agenticgovernance_/...`` suffix, so the org/tenant segments + in the base must be stripped to avoid a duplicated org path. + """ + parsed = urlparse(raw_url) + if not parsed.scheme or not parsed.netloc: + # Not a parseable absolute URL — leave it to the caller. + return raw_url.rstrip("/") + return f"{parsed.scheme}://{parsed.netloc}" + + +def get_backend_base_url() -> str: + """Resolve the governance backend base URL on each call. + + Resolution order (first hit wins): + + 1. ``UIPATH_GOVERNANCE_BACKEND_URL`` — explicit dev/test override, + used verbatim. + 2. ``UiPathConfig.base_url`` from ``uipath-platform`` — the + canonical platform URL. Org/tenant path segments are stripped + so the caller can append its own org-scoped path. + 3. ``UIPATH_URL`` env var — same as (2) but works when + ``uipath-platform`` is not installed. + 4. ``https://alpha.uipath.com`` — last-resort default for offline + development; real deployments always have ``UIPATH_URL`` set. + + Reading on each call (not at import) lets the runtime entrypoint + configure the env vars after this module is already loaded. + """ + explicit_override = os.environ.get(ENV_BACKEND_BASE_URL) + if explicit_override: + return explicit_override.rstrip("/") + + # Lazy import — uipath-platform is optional; falls through to the + # env-var path when only uipath-core / uipath-runtime are installed. + platform_url: str | None = None + try: + from uipath.platform.common import UiPathConfig + + platform_url = UiPathConfig.base_url + except (ImportError, AttributeError): + pass + + raw = platform_url or os.environ.get(ENV_PLATFORM_BASE_URL) + if raw: + return _strip_to_origin(raw) + + return _DEFAULT_BACKEND_BASE_URL + + +def build_governance_url(org_id: str, path: str) -> str: + """Compose an org-scoped governance backend URL. + + Final shape: ``{backend_base}/{org_id}/{GOVERNANCE_SERVICE_PREFIX}/{path}``. + + Args: + org_id: Active organization id; the URL is meaningless without it. + path: API suffix WITHOUT the org/service prefix + (e.g. :data:`POLICY_API_PATH` or :data:`GOVERN_API_PATH`). + """ + base = get_backend_base_url() + return f"{base}/{org_id}/{GOVERNANCE_SERVICE_PREFIX}/{path}" + + +# ---------------------------------------------------------------------------- +# Org / tenant resolution +# ---------------------------------------------------------------------------- + + +def _resolve_uipath_config_field(attr: str, env_var: str) -> str | None: + """Read a single ``UiPathConfig`` attribute with an env-var fallback. + + Lazy-imports ``UiPathConfig`` so ``uipath-runtime`` doesn't require + ``uipath-platform`` at install time. When the platform package is + missing (``ImportError``) or the attribute isn't yet exposed + (``AttributeError``), falls back to reading the named env var. + """ + try: + from uipath.platform.common import UiPathConfig + + return getattr(UiPathConfig, attr, None) or os.environ.get(env_var) + except ImportError: + return os.environ.get(env_var) + + +# ---------------------------------------------------------------------------- +# Agent-type selector (conversational vs autonomous) +# +# Set once by the governance wrapper at runtime init (before the background +# policy prefetch is kicked off) and read by the policy fetch when composing +# the request URL. A process-level holder — not a ContextVar — because the +# prefetch runs on a separate thread that wouldn't inherit a ContextVar, and a +# coded-agent process hosts a single agent so the value is stable per process. +# ---------------------------------------------------------------------------- + +_agent_is_conversational: bool | None = None + + +def set_agent_conversational(value: bool | None) -> None: + """Record whether the hosted agent is conversational. + + ``None`` clears the selector (used by tests / direct callers); the policy + fetch then omits the param and the server applies its default. + """ + global _agent_is_conversational + _agent_is_conversational = value + + +def agent_type_param() -> str | None: + """Return the ``agentType`` query value, or ``None`` when unknown. + + ``"conversational"`` / ``"autonomous"`` map to the server's + conversational-vs-autonomous container keys; ``None`` (selector never set) + omits the param so the server's default applies. + """ + if _agent_is_conversational is None: + return None + return AGENT_TYPE_CONVERSATIONAL if _agent_is_conversational else AGENT_TYPE_AUTONOMOUS + + +def resolve_organization_id() -> str | None: + """Return the current organization id from ``UiPathConfig`` / env. + + Returns ``None`` when neither source yields a value — callers skip + the backend interaction (no URL can be built without an org id) + and the agent runs with no policies / no compensation. + """ + return _resolve_uipath_config_field("organization_id", ENV_ORGANIZATION_ID) + + +def resolve_tenant_id() -> str | None: + """Return the current tenant id from ``UiPathConfig`` / env. + + Returns ``None`` when neither source yields a value — callers skip + the backend interaction since the ``x-uipath-internal-tenantid`` + header would be missing. + """ + return _resolve_uipath_config_field("tenant_id", ENV_TENANT_ID) + + +@lru_cache(maxsize=1) +def _resolved_job_context() -> tuple[tuple[str, str], ...]: + """Resolve and freeze the job context once per process. + + Returned as a tuple of ``(key, value)`` pairs so the cached value is + immutable — callers materialize a fresh dict each call. Tests that + mutate env vars can invalidate via ``resolve_job_context.cache_clear()``. + """ + candidates = { + "folderKey": _resolve_uipath_config_field("folder_key", ENV_FOLDER_KEY), + "jobKey": _resolve_uipath_config_field("job_key", ENV_JOB_KEY), + "processKey": _resolve_uipath_config_field("process_uuid", ENV_PROCESS_KEY), + "referenceId": _resolve_uipath_config_field("agent_id", ENV_REFERENCE_ID), + "agentVersion": _resolve_uipath_config_field( + "process_version", ENV_AGENT_VERSION + ), + } + return tuple((k, v) for k, v in candidates.items() if v) + + +def resolve_job_context() -> dict[str, str]: + """Return the agent's job-execution context for the govern payload. + + Each field is read from ``UiPathConfig`` (env-var fallback) and only + included when it resolves to a truthy value, so the server receives + exactly the keys the agent actually knows. Cached per-process — the + underlying values are immutable for the agent's lifetime. The server + maps these onto the LLMOps trace record: + + - ``folderKey`` → ``FolderKey`` / ``uipath.folder_key`` + - ``jobKey`` → ``JobKey`` / ``uipath.job_key`` + - ``processKey`` → ``ProcessKey`` + - ``referenceId`` → ``ReferenceId`` (typically the agent id) + - ``agentVersion`` → ``AgentVersion`` + """ + return dict(_resolved_job_context()) + + +resolve_job_context.cache_clear = _resolved_job_context.cache_clear # type: ignore[attr-defined] + + +# ---------------------------------------------------------------------------- +# Generic safe-call helper. Used by callers that want "log and continue" on +# any unexpected failure path without spelling out the same try/except every +# time. The intentional GovernanceBlockException ALWAYS propagates — only +# this exception type carries policy intent; anything else is a bug. +# ---------------------------------------------------------------------------- + + +def safe_call( + fn: Callable[..., None], + *args: object, + what: str, + **kwargs: object, +) -> None: + """Call ``fn(*args, **kwargs)`` and swallow any non-block exception. + + ``GovernanceBlockException`` propagates (intentional policy block); + everything else is logged at WARNING with the ``what`` label and + swallowed so the agent can continue. Designed for fire-and-forget + governance paths that should never fail an agent run. + + Args: + fn: Callable to invoke. + what: Short label used in the log line on failure + (e.g. ``"BEFORE_AGENT governance check"``). + """ + # Lazy import to avoid pulling uipath-core into module load. + from uipath.core.governance.exceptions import GovernanceBlockException + + try: + fn(*args, **kwargs) + except GovernanceBlockException: + raise + except Exception as exc: # noqa: BLE001 - fail-open by contract + logger.warning("%s failed (continuing): %s", what, exc) diff --git a/src/uipath/runtime/governance/native/loader.py b/src/uipath/runtime/governance/native/loader.py new file mode 100644 index 0000000..e2fd138 --- /dev/null +++ b/src/uipath/runtime/governance/native/loader.py @@ -0,0 +1,340 @@ +"""Policy pack loader. + +Resolves the active PolicyIndex at startup. Policies are fetched +exclusively from the governance backend (``api/v1/policy``); there is +no local compiled fallback. When the backend is unavailable, the +access token is unset, or the fetch times out, the loader returns an +empty PolicyIndex and the agent runs without any rules. +""" + +from __future__ import annotations + +import logging +import os +import threading +import time +from collections import Counter + +import yaml +from uipath.core.governance.config import is_governance_enabled + +from uipath.runtime.governance.config import EnforcementMode, set_enforcement_mode +from uipath.runtime.governance.native._yaml_to_index import build_policy_index_from_yaml +from uipath.runtime.governance.native.backend_client import ENV_ACCESS_TOKEN +from uipath.runtime.governance.native.models import PolicyIndex +from uipath.runtime.governance.native.policy_api_client import ( + ENV_ORGANIZATION_ID, + ENV_TENANT_ID, + POLICY_API_TIMEOUT_SECONDS, + fetch_policy_response, + resolve_organization_id, + resolve_tenant_id, +) + +logger = logging.getLogger(__name__) + +# Pack name aliases for backward compatibility +PACK_ALIASES: dict[str, str] = { + "owasp": "owasp_agentic", + "hipaa": "hipaa_runtime", + "soc2": "soc2_runtime", + "nist": "nist_ai_rmf_runtime", + "eu_ai": "eu_ai_act_runtime", + "iso": "iso42001_runtime", +} + + +# Module-level cache +_policy_index: PolicyIndex | None = None + +# Background-prefetch coordination. ``_prefetch_event`` is set once the +# background load_policy_index() call finishes (success OR failure); +# callers of ``get_policy_index()`` wait on it. ``_prefetch_lock`` +# protects the start-once semantics so concurrent ``prefetch`` calls +# don't kick off duplicate threads. +_prefetch_event: threading.Event | None = None +_prefetch_lock = threading.Lock() + +# Default wait when ``get_policy_index()`` blocks on an in-flight +# prefetch. Matched to the policy-API HTTP timeout so a stuck backend +# bounds the total time spent waiting at first hook fire to +# ~POLICY_API_TIMEOUT_SECONDS. If the wait expires we return an empty +# PolicyIndex — the agent runs without any policies rather than +# blocking further or retrying. +_PREFETCH_WAIT_SECONDS = POLICY_API_TIMEOUT_SECONDS + + +def prefetch_policy_index() -> None: + """Kick off a background load of the policy index. + + Non-blocking. Designed to be called as early as possible (at + ``GovernanceRuntime.__init__``) so the HTTP call to the governance + backend overlaps with the rest of agent setup. The result lands in + the same module cache that ``get_policy_index()`` reads from; + ``get_policy_index()`` waits on this prefetch when it's in flight. + + Idempotent: subsequent calls while the first is running are no-ops, + and calls after completion are no-ops. Skipped entirely when the + governance feature flag is OFF so no network call is made. + """ + global _prefetch_event + + if not is_governance_enabled(): + return + + with _prefetch_lock: + if _policy_index is not None: + return # already loaded + if _prefetch_event is not None: + return # already in flight + event = threading.Event() + _prefetch_event = event + + def _worker() -> None: + global _policy_index + try: + loaded = load_policy_index() + except Exception as exc: # noqa: BLE001 - logged; first hook will retry sync + logger.warning("Policy prefetch failed: %s", exc) + else: + with _prefetch_lock: + _policy_index = loaded + finally: + event.set() + + threading.Thread( + target=_worker, + name="governance-policy-prefetch", + daemon=True, + ).start() + + +def get_policy_index() -> PolicyIndex: + """Get the cached policy index, loading if necessary. + + Resolution order on first call: + 1. If the governance feature flag is OFF, return an empty + PolicyIndex (cached). No network call. + 2. If a prefetch (see :func:`prefetch_policy_index`) is in flight, + wait for it to complete (bounded by ``_PREFETCH_WAIT_SECONDS``). + 3. Governance backend at ``api/v1/policy`` (one HTTP GET, cached). + 4. Empty PolicyIndex when the backend is unavailable or times out. + + Result is cached for the process lifetime; per-hook evaluation never + touches the network. Call :func:`clear_policy_cache` to force a + refetch (mainly for tests). + """ + global _policy_index + + if _policy_index is not None: + return _policy_index + + if not is_governance_enabled(): + logger.info( + "Governance feature flag is OFF; returning empty PolicyIndex. " + "No rules will fire. Set EnablePythonGovernanceChecker=True to enable." + ) + _policy_index = PolicyIndex() + return _policy_index + + event = _prefetch_event + if event is not None: + completed = event.wait(timeout=_PREFETCH_WAIT_SECONDS) + if completed and _policy_index is not None: + return _policy_index + if not completed: + logger.warning( + "Policy prefetch did not complete in %.1fs; " + "agent will run without any policies", + _PREFETCH_WAIT_SECONDS, + ) + else: + # Distinguish from the timeout path so production triage + # can tell "prefetch hung" from "prefetch returned empty" + # (auth failure, server error, parse failure). + logger.warning( + "Policy prefetch completed but produced no PolicyIndex " + "(see prior WARN for the root cause); agent will run " + "without any policies" + ) + _policy_index = PolicyIndex() + return _policy_index + + # No prefetch was started (direct callers / tests). Sync load — bounded + # by the HTTP timeout in the API client. + _policy_index = load_policy_index() + return _policy_index + + +def load_policy_index(pack_name: str | None = None) -> PolicyIndex: + """Load the active PolicyIndex from the governance backend. + + Args: + pack_name: Ignored. Pack selection is controlled entirely by the + backend. + + Returns: + PolicyIndex parsed from the backend response. Empty PolicyIndex + when the backend is unavailable, the token is unset, the YAML + is malformed, or the response yields zero rules. + """ + start = time.perf_counter() + + api_index = _load_from_api() + if api_index is not None: + _log_index_summary(api_index) + logger.info( + "Policy index ready: source=backend, total_ms=%.1f", + (time.perf_counter() - start) * 1000, + ) + return api_index + + reason = _empty_index_reason() + logger.info( + "Policy index ready: source=empty (%s), total_ms=%.1f", + reason, + (time.perf_counter() - start) * 1000, + ) + return PolicyIndex() + + +def _empty_index_reason() -> str: + """Diagnose why the policy fetch produced nothing.""" + if not resolve_organization_id(): + return ( + f"UiPathConfig.organization_id unavailable — set {ENV_ORGANIZATION_ID} " + "or install uipath-platform; backend API not contacted" + ) + if not resolve_tenant_id(): + return ( + f"UiPathConfig.tenant_id unavailable — set {ENV_TENANT_ID} " + "or install uipath-platform; backend API not contacted" + ) + if not os.environ.get(ENV_ACCESS_TOKEN): + return f"{ENV_ACCESS_TOKEN} unset — backend API not contacted" + return "backend returned no policies (timeout / error / empty body)" + + +def _apply_enforcement_mode(mode_str: str | None) -> None: + """Map a backend-supplied mode string onto :class:`EnforcementMode`. + + Unknown values log a warning and leave the existing mode untouched. + """ + if not mode_str: + return + try: + mode = EnforcementMode(mode_str.lower()) + except ValueError: + logger.warning( + "Backend returned unknown enforcement mode %r; keeping current mode", + mode_str, + ) + return + set_enforcement_mode(mode) + logger.info("Enforcement mode set from backend: %s", mode.value) + + +def _load_from_api() -> PolicyIndex | None: + """Fetch and parse the policy index from the governance backend. + + Applies the backend-supplied enforcement mode as a side effect. + Returns ``None`` when the backend skips/errors, when the YAML is + malformed, or when the resulting index has no rules — caller returns + an empty PolicyIndex in those cases. + """ + start = time.perf_counter() + response = fetch_policy_response() + if response is None: + return None + + # Apply the platform-controlled enforcement mode before building the + # index, so anything that reads ``get_enforcement_mode()`` during + # index compilation already sees the right value. + _apply_enforcement_mode(response.mode) + + if not response.policy: + logger.warning( + "Policy fetch returned empty policy field; " + "agent will run without any policies" + ) + return None + + try: + index = build_policy_index_from_yaml(response.policy) + except yaml.YAMLError as exc: + logger.warning("Policy YAML from backend was malformed: %s", exc) + return None + except Exception as exc: # noqa: BLE001 - never let load break agent startup + logger.warning("Failed to build PolicyIndex from backend YAML: %s", exc) + return None + + if index.total_rules == 0: + logger.warning( + "Policy YAML from backend yielded zero rules; " + "agent will run without any policies" + ) + return None + + elapsed_ms = (time.perf_counter() - start) * 1000 + logger.info( + "Loaded policy index from backend: packs=%s, rules=%d, elapsed_ms=%.1f", + index.pack_names, + index.total_rules, + elapsed_ms, + ) + return index + + +def _backend_base_url() -> str: + """Return the backend base URL for logging; imported lazily to avoid cycles.""" + try: + from uipath.runtime.governance.native.backend_client import ( + get_backend_base_url, + ) + + return get_backend_base_url() + except Exception: # noqa: BLE001 + return "backend" + + +def _log_index_summary(index: PolicyIndex) -> None: + """Log summary of loaded policy index.""" + # Count rules by hook + hook_counts: Counter[str] = Counter() + for rule in index.all_rules: + hook_counts[rule.hook.value] += 1 + + logger.debug( + "Policy packs: %s, total rules: %d, by hook: %s", + index.pack_names, + index.total_rules, + dict(hook_counts), + ) + + +def get_available_packs() -> list[str]: + """Get list of pack names from the currently loaded policy index. + + Returns whatever the backend supplied on the most recent load. + Empty list if no index has been loaded yet or the backend yielded + no packs. + """ + if _policy_index is None: + return [] + return _policy_index.pack_names + + +def clear_policy_cache() -> None: + """Clear the cached policy index and any in-flight prefetch state. + + Next call to ``get_policy_index()`` will refetch from the backend. + """ + global _policy_index, _prefetch_event + with _prefetch_lock: + _policy_index = None + _prefetch_event = None + logger.debug("Policy index cache cleared") + + +# Backward compatibility alias +reset_policy_index = clear_policy_cache diff --git a/src/uipath/runtime/governance/native/policy_api_client.py b/src/uipath/runtime/governance/native/policy_api_client.py new file mode 100644 index 0000000..325b4e0 --- /dev/null +++ b/src/uipath/runtime/governance/native/policy_api_client.py @@ -0,0 +1,227 @@ +"""Governance policy API client. + +Fetches the governance backend response so policies can be controlled +centrally without redeploying agents. Called once at process startup +from :mod:`uipath.runtime.governance.native.loader`; per-hook evaluation +stays in-process. + +Response shape (JSON):: + + { + "mode": "audit" | "enforce" | "disabled", + "policies": "" + } + +``mode`` is the platform-controlled enforcement mode for the tenant; +the loader applies it via +:func:`uipath.runtime.governance.config.set_enforcement_mode`. ``policies`` +is the YAML the evaluator compiles into a :class:`PolicyIndex`. + +Failure mode is fail-open: when the organization id is unknown, the +access token is missing, the backend errors, or the body can't be +parsed, the caller falls back to an empty PolicyIndex. The fetch is +single-shot (no retry by design — see :func:`_get_once`) so a slow +backend can't extend agent startup beyond +:data:`BACKEND_REQUEST_TIMEOUT_SECONDS`. Nothing in this module ever +raises to the caller. +""" + +from __future__ import annotations + +import json +import logging +import os +import urllib.error +import urllib.request +from dataclasses import dataclass +from urllib.parse import urlencode + +from uipath.runtime.governance.native.backend_client import ( + AGENT_TYPE_PARAM, + BACKEND_REQUEST_TIMEOUT_SECONDS, + ENV_ACCESS_TOKEN, + ENV_ORGANIZATION_ID, + ENV_TENANT_ID, + POLICY_API_PATH, + TENANT_HEADER, + agent_type_param, + build_governance_url, + governance_request_headers, + resolve_organization_id, + resolve_tenant_id, +) + +logger = logging.getLogger(__name__) + +# Re-exported alias kept for callers that imported the old name. +POLICY_API_TIMEOUT_SECONDS = BACKEND_REQUEST_TIMEOUT_SECONDS + + +@dataclass(frozen=True) +class PolicyResponse: + """Parsed governance backend response. + + Attributes: + mode: Enforcement mode string the backend returned + (``"audit"`` / ``"enforce"`` / ``"disabled"``), or ``None`` + when the backend omitted it. Loader applies this via + :func:`uipath.runtime.governance.config.set_enforcement_mode`. + policy: Policy pack YAML to compile into a ``PolicyIndex``. May + be an empty string if the backend returned no rules. + """ + + mode: str | None + policy: str + + +def build_policy_url(org_id: str) -> str: + """Build the policy endpoint URL for the given organization id. + + The tenant id is not part of the URL; it travels in the + ``x-uipath-internal-tenantid`` request header (see + :func:`fetch_policy_response`). + + When the hosted agent's type is known (see + :func:`uipath.runtime.governance.native.backend_client.set_agent_conversational`), + an ``agentType`` query param is appended so the server resolves the + conversational-vs-autonomous container key. Omitted when unknown — the + server then applies its default. + """ + url = build_governance_url(org_id, POLICY_API_PATH) + agent_type = agent_type_param() + if agent_type: + url = f"{url}?{urlencode({AGENT_TYPE_PARAM: agent_type})}" + return url + + +def fetch_policy_response() -> PolicyResponse | None: + """Fetch the governance backend's policy response. + + Single shot, no retry: a failed fetch (timeout / network error / + HTTP error / malformed body) returns ``None`` and the caller falls + back to an empty PolicyIndex. The agent must not spend time on a + second attempt — keeping governance off the critical path is more + important than maximising policy availability. + + Returns: + :class:`PolicyResponse` on success. ``None`` on any failure + path — caller falls back to an empty PolicyIndex. + + Never raises. + """ + try: + return _fetch_policy_response_inner() + except Exception as exc: # noqa: BLE001 - loader path must never raise + logger.warning("Policy fetch failed unexpectedly: %s", exc) + return None + + +def _fetch_policy_response_inner() -> PolicyResponse | None: + org_id = resolve_organization_id() + if not org_id: + logger.warning( + "Policy fetch skipped: UiPathConfig.organization_id is not " + "available (set %s in the environment, or ensure uipath-platform " + "is installed); governance will run with no policies. The " + "backend API was NOT contacted.", + ENV_ORGANIZATION_ID, + ) + return None + + tenant_id = resolve_tenant_id() + if not tenant_id: + logger.warning( + "Policy fetch skipped: UiPathConfig.tenant_id is not " + "available (set %s in the environment, or ensure uipath-platform " + "is installed); governance will run with no policies. The " + "backend API was NOT contacted.", + ENV_TENANT_ID, + ) + return None + + policy_url = build_policy_url(org_id) + + token = os.environ.get(ENV_ACCESS_TOKEN) + if not token: + logger.warning( + "Policy fetch skipped: %s is not set in the environment; " + "governance will run with no policies.", + ENV_ACCESS_TOKEN, + ) + return None + + # Policy fetch is a GET; ``json_body=False`` so ``Content-Type`` is + # omitted. Strict origin servers may 415 on unexpected Content-Type + # for GETs (see :func:`governance_request_headers` docstring). + headers = governance_request_headers(json_body=False) + headers[TENANT_HEADER] = tenant_id + logger.info("Policy fetch starting (org=%s, tenant=%s)", org_id, tenant_id) + + body = _get_once(policy_url, headers) + if body is None: + return None + return _parse_policy_body(body) + + +def _get_once(url: str, headers: dict[str, str]) -> bytes | None: + """GET ``url`` once. Returns body bytes, or ``None`` on any failure. + + No retry by design — see :func:`fetch_policy_response` for the + rationale. Every failure path logs a single WARNING and returns + ``None`` so the caller (the loader) falls back to an empty + PolicyIndex without delay. + """ + request = urllib.request.Request(url, headers=headers, method="GET") + try: + with urllib.request.urlopen( # noqa: S310 - URL is built from config + request, timeout=BACKEND_REQUEST_TIMEOUT_SECONDS + ) as response: + return response.read() + except urllib.error.HTTPError as exc: + logger.warning("Policy fetch returned HTTP %d: %s", exc.code, exc) + except (urllib.error.URLError, TimeoutError, OSError) as exc: + logger.warning("Policy fetch failed: %s", exc) + return None + + +def _parse_policy_body(body: bytes) -> PolicyResponse | None: + """Parse the JSON envelope into a :class:`PolicyResponse`.""" + if not body: + logger.warning("Policy fetch returned empty body") + return None + + try: + payload = json.loads(body.decode("utf-8")) + except UnicodeDecodeError as exc: + logger.warning("Policy fetch returned non-UTF8 body: %s", exc) + return None + except json.JSONDecodeError as exc: + logger.warning( + "Policy fetch returned malformed JSON " + "(server may have returned an HTML error page): %s", + exc, + ) + return None + + if not isinstance(payload, dict): + logger.warning( + "Policy fetch returned unexpected JSON shape (expected object, got %s)", + type(payload).__name__, + ) + return None + + raw_mode = payload.get("mode") + mode = raw_mode if isinstance(raw_mode, str) and raw_mode else None + + raw_policy = payload.get("policies", "") + if not isinstance(raw_policy, str): + logger.warning( + "Policy fetch returned non-string 'policies' field (got %s)", + type(raw_policy).__name__, + ) + return None + + logger.info( + "Policy fetch ok: mode=%s, policy_bytes=%d", mode, len(raw_policy) + ) + return PolicyResponse(mode=mode, policy=raw_policy) diff --git a/tests/test_loader.py b/tests/test_loader.py new file mode 100644 index 0000000..1ccc15c --- /dev/null +++ b/tests/test_loader.py @@ -0,0 +1,379 @@ +"""Tests for the policy loader module. + +Covers prefetch / get_policy_index / load_policy_index / _apply_enforcement_mode +plus the empty-index reason helper. +""" + +from __future__ import annotations + +import threading +import time +from unittest.mock import patch + +import pytest +import yaml + +from uipath.runtime.governance.config import ( + EnforcementMode, + get_enforcement_mode, + reset_enforcement_mode, +) +from uipath.runtime.governance.native import loader +from uipath.runtime.governance.native.loader import ( + _apply_enforcement_mode, + _empty_index_reason, + _load_from_api, + clear_policy_cache, + get_available_packs, + get_policy_index, + load_policy_index, + prefetch_policy_index, +) +from uipath.runtime.governance.native.models import PolicyIndex +from uipath.runtime.governance.native.policy_api_client import PolicyResponse + +SIMPLE_POLICY_YAML = """ +standard: test-pack +version: "1.0" +rules: + - id: r1 + hook: before_model + checks: + - type: regex + patterns: ["leak"] +""" + + +@pytest.fixture(autouse=True) +def _clean_loader_state(monkeypatch: pytest.MonkeyPatch): + """Each test starts with a fresh loader cache and a known env. + + Without this, tests leak the policy_index module global and + `_prefetch_event` into one another. + """ + clear_policy_cache() + reset_enforcement_mode() + # Enable the FF so the loader doesn't short-circuit immediately. + from uipath.core.feature_flags import FeatureFlags + + FeatureFlags.configure_flags({"EnablePythonGovernanceChecker": True}) + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "org-1") + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-1") + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "tok") + yield + clear_policy_cache() + reset_enforcement_mode() + FeatureFlags.reset_flags() + + +# --------------------------------------------------------------------------- +# _empty_index_reason +# --------------------------------------------------------------------------- + + +def test_empty_index_reason_missing_org_id(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("UIPATH_ORGANIZATION_ID", raising=False) + msg = _empty_index_reason() + assert "organization_id" in msg + + +def test_empty_index_reason_missing_tenant_id(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("UIPATH_TENANT_ID", raising=False) + msg = _empty_index_reason() + assert "tenant_id" in msg + + +def test_empty_index_reason_missing_token(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + msg = _empty_index_reason() + assert "UIPATH_ACCESS_TOKEN" in msg + + +def test_empty_index_reason_backend_returned_nothing() -> None: + """All env present → reason is 'backend returned no policies'.""" + msg = _empty_index_reason() + assert "backend returned no policies" in msg + + +# --------------------------------------------------------------------------- +# _apply_enforcement_mode +# --------------------------------------------------------------------------- + + +def test_apply_enforcement_mode_none_leaves_current() -> None: + """Calling with ``None`` is a no-op — the existing mode is preserved.""" + from uipath.runtime.governance.config import set_enforcement_mode + + set_enforcement_mode(EnforcementMode.ENFORCE) + _apply_enforcement_mode(None) + assert get_enforcement_mode() == EnforcementMode.ENFORCE + + +def test_apply_enforcement_mode_empty_string_leaves_current() -> None: + from uipath.runtime.governance.config import set_enforcement_mode + + set_enforcement_mode(EnforcementMode.AUDIT) + _apply_enforcement_mode("") + assert get_enforcement_mode() == EnforcementMode.AUDIT + + +@pytest.mark.parametrize( + "mode_str,expected", + [ + ("audit", EnforcementMode.AUDIT), + ("enforce", EnforcementMode.ENFORCE), + ("disabled", EnforcementMode.DISABLED), + ("AUDIT", EnforcementMode.AUDIT), # case-insensitive + ], +) +def test_apply_enforcement_mode_known_values( + mode_str: str, expected: EnforcementMode +) -> None: + _apply_enforcement_mode(mode_str) + assert get_enforcement_mode() == expected + + +def test_apply_enforcement_mode_unknown_value_keeps_current() -> None: + from uipath.runtime.governance.config import set_enforcement_mode + + set_enforcement_mode(EnforcementMode.AUDIT) + _apply_enforcement_mode("not-a-real-mode") + # Mode is unchanged after the warning. + assert get_enforcement_mode() == EnforcementMode.AUDIT + + +# --------------------------------------------------------------------------- +# _load_from_api +# --------------------------------------------------------------------------- + + +def test_load_from_api_returns_none_when_fetch_returns_none() -> None: + with patch.object(loader, "fetch_policy_response", return_value=None): + assert _load_from_api() is None + + +def test_load_from_api_returns_none_when_policy_is_empty() -> None: + """A response with mode but empty policies field is treated as nothing.""" + response = PolicyResponse(mode="audit", policy="") + with patch.object(loader, "fetch_policy_response", return_value=response): + assert _load_from_api() is None + + +def test_load_from_api_applies_mode_then_parses() -> None: + """The mode is applied BEFORE the YAML is parsed, so downstream sees it.""" + response = PolicyResponse(mode="enforce", policy=SIMPLE_POLICY_YAML) + with patch.object(loader, "fetch_policy_response", return_value=response): + index = _load_from_api() + assert isinstance(index, PolicyIndex) + assert index.total_rules == 1 + assert get_enforcement_mode() == EnforcementMode.ENFORCE + + +def test_load_from_api_swallows_yaml_error() -> None: + """A malformed YAML body produces None, not an exception.""" + response = PolicyResponse(mode="audit", policy="key: : invalid: : yaml") + with patch.object(loader, "fetch_policy_response", return_value=response): + with patch.object( + loader, + "build_policy_index_from_yaml", + side_effect=yaml.YAMLError("bad yaml"), + ): + assert _load_from_api() is None + + +def test_load_from_api_swallows_unexpected_exception() -> None: + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + with patch.object(loader, "fetch_policy_response", return_value=response): + with patch.object( + loader, + "build_policy_index_from_yaml", + side_effect=RuntimeError("library bug"), + ): + assert _load_from_api() is None + + +def test_load_from_api_returns_none_when_zero_rules() -> None: + """YAML parses cleanly but yields no rules → treated as no-op.""" + empty_pack_yaml = "standard: empty\nrules: []\n" + response = PolicyResponse(mode="audit", policy=empty_pack_yaml) + with patch.object(loader, "fetch_policy_response", return_value=response): + assert _load_from_api() is None + + +# --------------------------------------------------------------------------- +# load_policy_index — public entry +# --------------------------------------------------------------------------- + + +def test_load_policy_index_success_path() -> None: + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + with patch.object(loader, "fetch_policy_response", return_value=response): + index = load_policy_index() + assert isinstance(index, PolicyIndex) + assert "test-pack" in index.pack_names + + +def test_load_policy_index_returns_empty_on_failure() -> None: + """When the API yields None, the loader returns an empty PolicyIndex.""" + with patch.object(loader, "fetch_policy_response", return_value=None): + index = load_policy_index() + assert isinstance(index, PolicyIndex) + assert index.total_rules == 0 + + +# --------------------------------------------------------------------------- +# get_policy_index — caching + FF gate +# --------------------------------------------------------------------------- + + +def test_get_policy_index_caches_after_first_call() -> None: + """A second call returns the cached index without re-fetching.""" + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + with patch.object( + loader, "fetch_policy_response", return_value=response + ) as mock_fetch: + a = get_policy_index() + b = get_policy_index() + assert a is b + assert mock_fetch.call_count == 1 + + +def test_get_policy_index_short_circuits_when_ff_off() -> None: + """FF off → return an empty index without contacting the backend.""" + from uipath.core.feature_flags import FeatureFlags + + FeatureFlags.configure_flags({"EnablePythonGovernanceChecker": False}) + with patch.object(loader, "fetch_policy_response") as mock_fetch: + index = get_policy_index() + assert index.total_rules == 0 + assert not mock_fetch.called + + +def test_get_policy_index_sync_load_when_no_prefetch() -> None: + """Without a prefetch in flight, get_policy_index synchronously loads.""" + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + with patch.object(loader, "fetch_policy_response", return_value=response): + index = get_policy_index() + assert index.total_rules == 1 + + +# --------------------------------------------------------------------------- +# Prefetch — idempotency + completion + timeout +# --------------------------------------------------------------------------- + + +def test_prefetch_is_idempotent() -> None: + """Second call while first is in flight is a no-op (no second thread).""" + block = threading.Event() + + def _slow_fetch(): + block.wait(timeout=2.0) + return None + + with patch.object(loader, "fetch_policy_response", side_effect=_slow_fetch): + prefetch_policy_index() + first_event = loader._prefetch_event + prefetch_policy_index() + assert loader._prefetch_event is first_event + # Let the worker finish so the autouse fixture's clear runs cleanly. + block.set() + if first_event is not None: + first_event.wait(timeout=2.0) + + +def test_prefetch_skipped_when_ff_off() -> None: + """FF off → no prefetch thread started.""" + from uipath.core.feature_flags import FeatureFlags + + FeatureFlags.configure_flags({"EnablePythonGovernanceChecker": False}) + with patch.object(loader, "fetch_policy_response") as mock_fetch: + prefetch_policy_index() + assert not mock_fetch.called + assert loader._prefetch_event is None + + +def test_prefetch_no_op_when_index_already_loaded() -> None: + """If the index is already cached, prefetch is a no-op.""" + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + with patch.object(loader, "fetch_policy_response", return_value=response): + get_policy_index() # populate the cache + with patch.object(loader, "fetch_policy_response") as mock_fetch: + prefetch_policy_index() + assert not mock_fetch.called + + +def test_get_policy_index_waits_for_prefetch_then_returns() -> None: + """When a prefetch is in flight, get_policy_index waits for completion.""" + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + started = threading.Event() + release = threading.Event() + + def _fetch(): + started.set() + release.wait(timeout=2.0) + return response + + with patch.object(loader, "fetch_policy_response", side_effect=_fetch): + prefetch_policy_index() + assert started.wait(timeout=2.0) + # Release the worker in a side thread so get_policy_index's wait + # actually overlaps with the slow fetch. + threading.Thread( + target=lambda: (time.sleep(0.05), release.set()), daemon=True + ).start() + index = get_policy_index() + assert index.total_rules == 1 + + +def test_get_policy_index_logs_when_prefetch_completes_with_empty_index( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The 'completed but produced no PolicyIndex' branch fires on auth/parse fail. + + Capturing via a logger mock instead of caplog because some + test-isolation paths (other tests installing log interceptors) + can prevent records from reaching caplog's root-attached handler. + """ + event = threading.Event() + event.set() # prefetch already completed + monkeypatch.setattr(loader, "_prefetch_event", event) + # _policy_index stays None — simulating "prefetch completed but produced nothing" + with patch.object(loader.logger, "warning") as mock_warning: + index = get_policy_index() + assert index.total_rules == 0 + assert any( + "completed but produced no PolicyIndex" in str(call.args[0]) + for call in mock_warning.call_args_list + ) + + +# --------------------------------------------------------------------------- +# get_available_packs / clear_policy_cache +# --------------------------------------------------------------------------- + + +def test_get_available_packs_before_load_returns_empty() -> None: + assert get_available_packs() == [] + + +def test_get_available_packs_after_load() -> None: + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + with patch.object(loader, "fetch_policy_response", return_value=response): + get_policy_index() + assert "test-pack" in get_available_packs() + + +def test_clear_policy_cache_forces_refetch() -> None: + response = PolicyResponse(mode="audit", policy=SIMPLE_POLICY_YAML) + with patch.object( + loader, "fetch_policy_response", return_value=response + ) as mock_fetch: + get_policy_index() + clear_policy_cache() + get_policy_index() + assert mock_fetch.call_count == 2 + + +def test_reset_policy_index_alias_for_clear() -> None: + """``reset_policy_index`` is the legacy alias for ``clear_policy_cache``.""" + assert loader.reset_policy_index is loader.clear_policy_cache diff --git a/tests/test_policy_agent_type.py b/tests/test_policy_agent_type.py new file mode 100644 index 0000000..4eb30f9 --- /dev/null +++ b/tests/test_policy_agent_type.py @@ -0,0 +1,99 @@ +"""Tests for the conversational-vs-autonomous agent-type selector. + +The governance wrapper records whether the hosted agent is conversational; +the policy fetch then appends an ``agentType`` query param so the server's +clause-resolver reads the matching container key (``*-in-flight-agents`` vs +``*-in-flight-conversational-agents``). +""" + +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from uipath.runtime.governance.native import backend_client +from uipath.runtime.governance.native.backend_client import ( + agent_type_param, + set_agent_conversational, +) +from uipath.runtime.governance.native.policy_api_client import build_policy_url +from uipath.runtime.governance.wrapper import GovernanceRuntime + + +def _extract(delegate, context=None) -> bool: + """Call _extract_is_conversational without running __init__.""" + runtime = object.__new__(GovernanceRuntime) + return runtime._extract_is_conversational(delegate, context) + + +@pytest.fixture(autouse=True) +def _reset_selector(): + """Clear the process-level selector around each test.""" + set_agent_conversational(None) + yield + set_agent_conversational(None) + + +def test_agent_type_param_unset_is_none(): + assert agent_type_param() is None + + +def test_agent_type_param_conversational(): + set_agent_conversational(True) + assert agent_type_param() == "conversational" + + +def test_agent_type_param_autonomous(): + set_agent_conversational(False) + assert agent_type_param() == "autonomous" + + +def test_build_policy_url_omits_param_when_unset(monkeypatch): + monkeypatch.setattr(backend_client, "get_backend_base_url", lambda: "https://alpha.uipath.com") + url = build_policy_url("my-org") + assert url == "https://alpha.uipath.com/my-org/agenticgovernance_/api/v1/runtime/policy" + assert "agentType" not in url + + +def test_build_policy_url_appends_conversational(monkeypatch): + monkeypatch.setattr(backend_client, "get_backend_base_url", lambda: "https://alpha.uipath.com") + set_agent_conversational(True) + assert build_policy_url("my-org").endswith( + "/my-org/agenticgovernance_/api/v1/runtime/policy?agentType=conversational" + ) + + +def test_build_policy_url_appends_autonomous(monkeypatch): + monkeypatch.setattr(backend_client, "get_backend_base_url", lambda: "https://alpha.uipath.com") + set_agent_conversational(False) + assert build_policy_url("my-org").endswith("?agentType=autonomous") + + +# ── _extract_is_conversational ────────────────────────────────────────────── + + +def test_extract_conversational_from_agent_definition(): + delegate = SimpleNamespace(_agent_definition=SimpleNamespace(is_conversational=True)) + assert _extract(delegate) is True + + +def test_extract_autonomous_from_agent_definition(): + delegate = SimpleNamespace(_agent_definition=SimpleNamespace(is_conversational=False)) + assert _extract(delegate) is False + + +def test_extract_unwraps_delegate_chain(): + inner = SimpleNamespace(_agent_definition=SimpleNamespace(is_conversational=True)) + outer = SimpleNamespace(_delegate=inner) # no _agent_definition on the outer + assert _extract(outer) is True + + +def test_extract_falls_back_to_context_conversation_id(): + delegate = SimpleNamespace() # nothing reachable + context = SimpleNamespace(conversation_id="conv-1") + assert _extract(delegate, context) is True + + +def test_extract_defaults_to_autonomous_when_unknown(): + assert _extract(SimpleNamespace(), SimpleNamespace()) is False \ No newline at end of file diff --git a/tests/test_policy_api_client.py b/tests/test_policy_api_client.py new file mode 100644 index 0000000..9ebcdb5 --- /dev/null +++ b/tests/test_policy_api_client.py @@ -0,0 +1,258 @@ +"""Tests for ``fetch_policy_response`` and the body parser. + +Covers the skip paths (missing org / tenant / token), HTTP failures +(HTTPError, URLError, TimeoutError, OSError), and body parsing +(empty body, non-UTF8, malformed JSON, wrong top-level shape, bad +``policies`` type). +""" + +from __future__ import annotations + +import io +import json +import urllib.error +from unittest.mock import MagicMock, patch + +import pytest + +from uipath.runtime.governance.native import policy_api_client +from uipath.runtime.governance.native.policy_api_client import ( + PolicyResponse, + _parse_policy_body, + build_policy_url, + fetch_policy_response, +) + + +@pytest.fixture +def _fresh_env(monkeypatch: pytest.MonkeyPatch): + """Clear the env vars that the fetch path depends on.""" + for var in ( + "UIPATH_ORGANIZATION_ID", + "UIPATH_TENANT_ID", + "UIPATH_ACCESS_TOKEN", + "UIPATH_URL", + ): + monkeypatch.delenv(var, raising=False) + yield + + +@pytest.fixture +def _populated_env(monkeypatch: pytest.MonkeyPatch): + """All three vars present — the fetch path can reach urlopen.""" + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "org-1") + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-1") + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "tok-abc") + monkeypatch.setenv("UIPATH_URL", "https://alpha.uipath.com") + yield + + +def _ok_response(body: bytes) -> MagicMock: + """urlopen()-compatible context manager that returns ``body``.""" + resp = MagicMock() + resp.read.return_value = body + resp.__enter__.return_value = resp + resp.__exit__.return_value = False + return resp + + +# --------------------------------------------------------------------------- +# Skip paths — fail-open without contacting the backend +# --------------------------------------------------------------------------- + + +def test_skip_when_org_id_missing(_fresh_env, monkeypatch) -> None: + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-1") + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "tok") + with patch.object( + policy_api_client.urllib.request, "urlopen" + ) as mock_urlopen: + assert fetch_policy_response() is None + assert not mock_urlopen.called + + +def test_skip_when_tenant_id_missing(_fresh_env, monkeypatch) -> None: + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "org-1") + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "tok") + with patch.object( + policy_api_client.urllib.request, "urlopen" + ) as mock_urlopen: + assert fetch_policy_response() is None + assert not mock_urlopen.called + + +def test_skip_when_token_missing(_fresh_env, monkeypatch) -> None: + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "org-1") + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-1") + with patch.object( + policy_api_client.urllib.request, "urlopen" + ) as mock_urlopen: + assert fetch_policy_response() is None + assert not mock_urlopen.called + + +# --------------------------------------------------------------------------- +# HTTP failure paths — fail-open with a warning +# --------------------------------------------------------------------------- + + +def test_returns_none_on_http_error(_populated_env) -> None: + err = urllib.error.HTTPError( + url="x", code=500, msg="Server Error", hdrs=None, fp=io.BytesIO(b"") + ) + with patch.object( + policy_api_client.urllib.request, "urlopen", side_effect=err + ): + assert fetch_policy_response() is None + + +def test_returns_none_on_url_error(_populated_env) -> None: + err = urllib.error.URLError("connection refused") + with patch.object( + policy_api_client.urllib.request, "urlopen", side_effect=err + ): + assert fetch_policy_response() is None + + +def test_returns_none_on_timeout(_populated_env) -> None: + with patch.object( + policy_api_client.urllib.request, "urlopen", side_effect=TimeoutError() + ): + assert fetch_policy_response() is None + + +def test_returns_none_on_os_error(_populated_env) -> None: + with patch.object( + policy_api_client.urllib.request, + "urlopen", + side_effect=OSError("disk full"), + ): + assert fetch_policy_response() is None + + +def test_outer_swallows_unexpected_exception(_populated_env) -> None: + """Even non-HTTP exceptions from urlopen don't escape the fetch helper.""" + with patch.object( + policy_api_client.urllib.request, + "urlopen", + side_effect=RuntimeError("library bug"), + ): + assert fetch_policy_response() is None + + +# --------------------------------------------------------------------------- +# Headers / URL composition +# --------------------------------------------------------------------------- + + +def test_sends_no_content_type_on_get(_populated_env) -> None: + """The GET must NOT carry Content-Type — some servers 415 on it.""" + with patch.object( + policy_api_client.urllib.request, + "urlopen", + return_value=_ok_response(b'{"mode": "audit", "policies": ""}'), + ) as mock_urlopen: + fetch_policy_response() + request_arg = mock_urlopen.call_args.args[0] + assert request_arg.get_header("Content-type") is None + assert request_arg.get_header("Accept") == "application/json" + assert request_arg.get_header("Authorization") == "Bearer tok-abc" + assert request_arg.get_header("X-uipath-internal-tenantid") == "tenant-1" + assert request_arg.get_method() == "GET" + + +def test_url_includes_agent_type_when_set(_populated_env, monkeypatch) -> None: + """``build_policy_url`` appends ``?agentType=...`` from the selector.""" + from uipath.runtime.governance.native import backend_client + + monkeypatch.setattr(backend_client, "_agent_is_conversational", True) + url = build_policy_url("org-x") + assert "agentType=conversational" in url + + +def test_url_omits_agent_type_when_unset(_populated_env, monkeypatch) -> None: + from uipath.runtime.governance.native import backend_client + + monkeypatch.setattr(backend_client, "_agent_is_conversational", None) + url = build_policy_url("org-x") + assert "agentType=" not in url + + +# --------------------------------------------------------------------------- +# Body parser — _parse_policy_body +# --------------------------------------------------------------------------- + + +def test_parse_empty_body_returns_none() -> None: + assert _parse_policy_body(b"") is None + + +def test_parse_non_utf8_body_returns_none() -> None: + # 0xff isn't valid UTF-8. + assert _parse_policy_body(b"\xff\xfe") is None + + +def test_parse_malformed_json_returns_none() -> None: + # A common shape: server returns HTML when it should return JSON. + assert _parse_policy_body(b"oops") is None + + +def test_parse_non_object_top_level_returns_none() -> None: + """Server returning a bare JSON array is rejected — expected an object.""" + assert _parse_policy_body(b'["audit", "policies"]') is None + + +def test_parse_non_string_policies_field_returns_none() -> None: + """``policies`` must be a string YAML body, not a number / dict / list.""" + assert _parse_policy_body(b'{"mode": "audit", "policies": 42}') is None + + +def test_parse_ok_yields_policy_response() -> None: + resp = _parse_policy_body( + b'{"mode": "enforce", "policies": "standard: p\\nrules: []"}' + ) + assert resp is not None + assert resp.mode == "enforce" + assert "standard: p" in resp.policy + + +def test_parse_ok_with_missing_mode_yields_none_mode() -> None: + """A response without ``mode`` is still valid — server may not override.""" + resp = _parse_policy_body(b'{"policies": ""}') + assert resp is not None + assert resp.mode is None + assert resp.policy == "" + + +def test_parse_empty_string_mode_treated_as_unset() -> None: + """Empty-string ``mode`` is normalized to ``None`` (don't override default).""" + resp = _parse_policy_body(b'{"mode": "", "policies": ""}') + assert resp is not None + assert resp.mode is None + + +def test_parse_non_string_mode_treated_as_unset() -> None: + """If the server sends mode as a number / null, treat as unset.""" + resp = _parse_policy_body(b'{"mode": 5, "policies": ""}') + assert resp is not None + assert resp.mode is None + + +# --------------------------------------------------------------------------- +# Full happy-path round-trip +# --------------------------------------------------------------------------- + + +def test_full_fetch_round_trip(_populated_env) -> None: + body = json.dumps( + {"mode": "audit", "policies": "standard: p\nrules: []"} + ).encode("utf-8") + with patch.object( + policy_api_client.urllib.request, + "urlopen", + return_value=_ok_response(body), + ): + resp = fetch_policy_response() + assert isinstance(resp, PolicyResponse) + assert resp.mode == "audit" + assert "standard: p" in resp.policy diff --git a/tests/test_yaml_to_index.py b/tests/test_yaml_to_index.py new file mode 100644 index 0000000..5e8d338 --- /dev/null +++ b/tests/test_yaml_to_index.py @@ -0,0 +1,795 @@ +"""Tests for ``build_policy_index_from_yaml``. + +Covers every supported check type plus the pack / rule plumbing +(default action, severity defaults, hook resolution, multi-doc YAML, +malformed input handling). +""" + +from __future__ import annotations + +import pytest +from uipath.core.governance.models import Action, LifecycleHook + +from uipath.runtime.governance.native._yaml_to_index import ( + build_policy_index_from_yaml, +) +from uipath.runtime.governance.native.models import Severity + + +def _single_rule(yaml_text: str): + """Compile YAML and return the single rule; fail if not exactly one.""" + idx = build_policy_index_from_yaml(yaml_text) + rules = idx.all_rules + assert len(rules) == 1, f"expected 1 rule, got {len(rules)}" + return rules[0] + + +# --------------------------------------------------------------------------- +# Pack / document handling +# --------------------------------------------------------------------------- + + +def test_empty_yaml_returns_empty_index() -> None: + idx = build_policy_index_from_yaml("") + assert idx.total_rules == 0 + assert idx.pack_names == [] + + +def test_pack_without_rules_is_omitted() -> None: + """Packs with no parseable rules are dropped — never registered.""" + idx = build_policy_index_from_yaml( + """ + standard: empty-pack + version: "1.0" + rules: [] + """ + ) + assert idx.total_rules == 0 + assert "empty-pack" not in idx.pack_names + + +def test_pack_missing_name_is_skipped() -> None: + idx = build_policy_index_from_yaml( + """ + version: "1.0" + rules: + - id: r1 + hook: before_model + checks: + - type: regex + patterns: ["foo"] + """ + ) + assert idx.total_rules == 0 + + +def test_pack_uses_standard_or_name_field() -> None: + """Either ``standard:`` or ``name:`` works as the pack identifier.""" + a = build_policy_index_from_yaml( + """ + standard: iso42001 + rules: + - id: r + hook: before_model + checks: [{type: regex, patterns: ["x"]}] + """ + ) + b = build_policy_index_from_yaml( + """ + name: iso42001 + rules: + - id: r + hook: before_model + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert "iso42001" in a.pack_names + assert "iso42001" in b.pack_names + + +def test_multi_document_yaml_concatenates_packs() -> None: + # YAML doc separators must be at column 0; dedent inline. + yaml_text = ( + "standard: pack-a\n" + "rules:\n" + " - id: a-r1\n" + " hook: before_model\n" + ' checks: [{type: regex, patterns: ["a"]}]\n' + "---\n" + "standard: pack-b\n" + "rules:\n" + " - id: b-r1\n" + " hook: after_model\n" + ' checks: [{type: regex, patterns: ["b"]}]\n' + ) + idx = build_policy_index_from_yaml(yaml_text) + assert set(idx.pack_names) == {"pack-a", "pack-b"} + assert idx.total_rules == 2 + + +def test_non_dict_top_level_documents_are_ignored() -> None: + """A YAML doc that's a string / list at top level is skipped silently.""" + yaml_text = ( + "just_a_string\n" + "---\n" + "standard: real-pack\n" + "rules:\n" + " - id: r\n" + " hook: before_model\n" + ' checks: [{type: regex, patterns: ["x"]}]\n' + ) + idx = build_policy_index_from_yaml(yaml_text) + assert idx.pack_names == ["real-pack"] + + +# --------------------------------------------------------------------------- +# Rule-level plumbing +# --------------------------------------------------------------------------- + + +def test_unknown_hook_skips_rule() -> None: + """A rule referencing an unknown hook is dropped, the rest survive.""" + idx = build_policy_index_from_yaml( + """ + standard: p + rules: + - id: bad + hook: invented_hook + checks: [{type: regex, patterns: ["x"]}] + - id: good + hook: before_model + checks: [{type: regex, patterns: ["x"]}] + """ + ) + rule_ids = [r.rule_id for r in idx.all_rules] + assert "bad" not in rule_ids + assert "good" in rule_ids + + +def test_non_dict_rule_entry_ignored() -> None: + """Rules entries that aren't dicts (lists, scalars) are skipped.""" + idx = build_policy_index_from_yaml( + """ + standard: p + rules: + - "this is a string, not a rule" + - id: good + hook: before_model + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert [r.rule_id for r in idx.all_rules] == ["good"] + + +def test_action_resolution_inherits_pack_default() -> None: + """When the rule omits action, the pack's default_action is used.""" + rule = _single_rule( + """ + standard: p + default_action: log + rules: + - id: r + hook: before_model + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert rule.action == Action.AUDIT # log -> AUDIT per _ACTION_MAP + + +def test_action_resolution_unknown_falls_back_to_default() -> None: + """Unknown action string falls back to the pack default.""" + rule = _single_rule( + """ + standard: p + default_action: deny + rules: + - id: r + hook: before_model + action: bogus + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert rule.action == Action.DENY + + +def test_severity_resolution_explicit() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + severity: critical + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert rule.severity == Severity.CRITICAL + + +def test_severity_default_high_for_deny_action() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + action: deny + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert rule.severity == Severity.HIGH + + +def test_severity_default_medium_for_non_deny_action() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + action: log + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert rule.severity == Severity.MEDIUM + + +def test_unknown_severity_falls_back_to_high() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + severity: ridiculous + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert rule.severity == Severity.HIGH + + +def test_disabled_flag_propagates() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + enabled: false + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert rule.enabled is False + + +def test_rule_without_id_gets_index_based_id() -> None: + """When ``id:`` is missing, a positional fallback ``RULE-N`` is used.""" + idx = build_policy_index_from_yaml( + """ + standard: p + rules: + - hook: before_model + checks: [{type: regex, patterns: ["x"]}] + """ + ) + assert idx.all_rules[0].rule_id == "RULE-0" + + +def test_rule_with_zero_parsed_checks_is_skipped() -> None: + """A rule whose declared checks all fail to parse is dropped. + + Without this guard, a rule with no checks ``always matches`` in the + evaluator and would fire on every request. + """ + idx = build_policy_index_from_yaml( + """ + standard: p + rules: + - id: junk + hook: before_model + checks: + - type: totally_unknown_check_type + """ + ) + assert idx.total_rules == 0 + + +# --------------------------------------------------------------------------- +# Check types +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "hook_name,expected", + [ + ("before_agent", LifecycleHook.BEFORE_AGENT), + ("after_agent", LifecycleHook.AFTER_AGENT), + ("before_model", LifecycleHook.BEFORE_MODEL), + ("after_model", LifecycleHook.AFTER_MODEL), + ("tool_call", LifecycleHook.TOOL_CALL), + ("wrap_tool_call", LifecycleHook.TOOL_CALL), # alias + ("after_tool", LifecycleHook.AFTER_TOOL), + ], +) +def test_hook_resolution(hook_name: str, expected: LifecycleHook) -> None: + rule = _single_rule( + f""" + standard: p + rules: + - id: r + hook: {hook_name} + checks: [{{type: regex, patterns: ["x"]}}] + """ + ) + assert rule.hook == expected + + +def test_regex_check_multi_pattern_defaults_to_any_logic() -> None: + """Multiple regex patterns default to OR (any) — common case for ASI rules.""" + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: regex + patterns: ["pwn", "ignore_previous"] + """ + ) + assert rule.checks[0].logic == "any" + assert len(rule.checks[0].conditions) == 2 + + +def test_regex_check_single_pattern_defaults_to_all_logic() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: regex + patterns: ["pwn"] + """ + ) + assert rule.checks[0].logic == "all" + + +def test_regex_check_explicit_logic_wins() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: regex + patterns: ["a", "b"] + logic: all + """ + ) + assert rule.checks[0].logic == "all" + + +@pytest.mark.parametrize( + "scope,expected_field", + [ + (["human"], "model_input"), + (["system"], "model_input"), + (["ai"], "model_output"), + ("ai", "model_output"), # string form + (["tool_result"], "tool_result"), + (["unknown_thing"], "model_input"), # fallback + ], +) +def test_regex_scope_maps_to_field(scope, expected_field: str) -> None: + rule = _single_rule( + f""" + standard: p + rules: + - id: r + hook: before_model + checks: + - type: regex + patterns: ["x"] + scope: {scope!r} + """ + ) + assert rule.checks[0].conditions[0].field == expected_field + + +def test_budget_check_max_per_session() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: tool_call + checks: + - type: budget + max_tool_calls_per_session: 5 + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.operator == "gt" + assert cond.field == "session_state.tool_calls" + assert cond.value == 5 + + +def test_budget_check_multiple_thresholds() -> None: + """All three budget knobs become independent conditions.""" + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: tool_call + checks: + - type: budget + max_tool_calls_per_session: 10 + max_tool_calls_per_minute: 5 + max_consecutive_tool_calls: 3 + """ + ) + assert len(rule.checks[0].conditions) == 3 + + +def test_tool_allowlist_check() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: tool_call + checks: + - type: tool_allowlist + blocked_tools: ["delete_file", "shell"] + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.operator == "in_list" + assert cond.field == "tool_name" + assert cond.value == ["delete_file", "shell"] + + +def test_tool_allowlist_empty_blocked_list_skipped() -> None: + """Empty ``blocked_tools`` means there's nothing to enforce — drop the rule.""" + idx = build_policy_index_from_yaml( + """ + standard: p + rules: + - id: r + hook: tool_call + checks: + - type: tool_allowlist + blocked_tools: [] + """ + ) + assert idx.total_rules == 0 + + +def test_parameter_validation_check() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: tool_call + checks: + - type: parameter_validation + additional_patterns: ["rm -rf", "/etc/passwd"] + """ + ) + check = rule.checks[0] + assert len(check.conditions) == 2 + assert all(c.field == "tool_args" for c in check.conditions) + # Multi-pattern parameter_validation defaults to OR logic + assert check.logic == "any" + + +def test_rate_limit_check_session_and_minute() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: rate_limit + max_llm_calls_per_session: 20 + max_llm_calls_per_minute: 5 + """ + ) + fields = {c.field for c in rule.checks[0].conditions} + assert fields == { + "session_state.llm_calls", + "session_state.llm_calls_per_minute", + } + + +def test_field_regex_check_threads_through_conditions() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: after_model + checks: + - type: field_regex + conditions: + - operator: regex + field: model_output + value: "(?i)password" + message: "leaked password" + """ + ) + check = rule.checks[0] + assert check.message == "leaked password" + assert check.conditions[0].operator == "regex" + + +def test_data_quality_score_both_encoding_and_entropy() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: after_tool + checks: + - type: data_quality_score + field: tool_result + min_confidence: 0.8 + entropy_min: 2.0 + entropy_max: 6.0 + """ + ) + ops = {c.operator for c in rule.checks[0].conditions} + assert ops == {"encoding_concern", "entropy_concern"} + + +def test_data_quality_score_check_encoding_disabled() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: after_tool + checks: + - type: data_quality_score + check_encoding: false + check_entropy: true + """ + ) + ops = [c.operator for c in rule.checks[0].conditions] + assert "encoding_concern" not in ops + assert "entropy_concern" in ops + + +def test_incident_taxonomy_with_categories() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: after_model + checks: + - type: incident_taxonomy + field: model_output + categories: [safety_refusal, tool_failure] + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.operator == "incident_concern" + assert cond.value == {"categories": ["safety_refusal", "tool_failure"]} + + +def test_incident_taxonomy_without_categories_uses_empty_dict() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: after_model + checks: + - type: incident_taxonomy + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.value == {} + + +def test_commitment_extractor_default_flags() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: after_model + checks: + - type: commitment_extractor + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.operator == "commitment_concern" + assert cond.value == {"require_amount": True, "require_deadline": False} + + +def test_commitment_extractor_custom_flags() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: after_model + checks: + - type: commitment_extractor + require_amount: false + require_deadline: true + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.value == {"require_amount": False, "require_deadline": True} + + +def test_sentiment_concern_check() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: sentiment_concern + threshold: -0.5 + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.operator == "vader_concern" + assert cond.value == {"threshold": -0.5} + + +def test_guardrail_fallback_inherits_rule_flags() -> None: + """Rule-level ``mapped_to_uipath`` / ``policy_enabled`` thread into the condition.""" + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + mapped_to_uipath: true + policy_enabled: false + checks: + - type: guardrail_fallback + validator: pii_detection + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.operator == "guardrail_fallback" + assert cond.value == { + "validator": "pii_detection", + "mapped_to_uipath": True, + "policy_enabled": False, + } + + +def test_guardrail_fallback_default_flags_are_unmapped_and_enabled() -> None: + """When the rule omits the flags, the fallback never fires (disabled-only contract).""" + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: guardrail_fallback + validator: pii_detection + """ + ) + cond = rule.checks[0].conditions[0] + # ``guardrail_fallback`` operator fires only when mapped=True AND + # enabled=False; defaults of False / True ensure it stays silent. + assert cond.value["mapped_to_uipath"] is False + assert cond.value["policy_enabled"] is True + + +def test_explicit_conditions_win_over_check_type() -> None: + """Explicit ``conditions:`` short-circuits the per-type templating.""" + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: regex # ignored, conditions wins + conditions: + - operator: contains + field: model_input + value: "secret" + message: "no secrets" + """ + ) + cond = rule.checks[0].conditions[0] + assert cond.operator == "contains" # not "regex" + assert cond.value == "secret" + assert rule.checks[0].message == "no secrets" + + +def test_explicit_conditions_negate_flag_propagates() -> None: + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - conditions: + - operator: contains + field: model_input + value: "allowed" + negate: true + """ + ) + assert rule.checks[0].conditions[0].negate is True + + +def test_non_dict_condition_in_explicit_list_is_skipped() -> None: + """A condition entry that isn't a dict is silently dropped. + + The first dict-with-``operator`` entry is what trips the + "explicit conditions" branch in ``_build_check``; out-of-order + scalar entries appear after the leading dict. + """ + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - conditions: + - operator: contains + field: model_input + value: "x" + - "not a dict" + """ + ) + assert len(rule.checks[0].conditions) == 1 + + +def test_unknown_check_type_skipped() -> None: + """Unknown check types are dropped without taking down sibling checks.""" + idx = build_policy_index_from_yaml( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - type: future_check_type + - type: regex + patterns: ["x"] + """ + ) + rule = idx.all_rules[0] + # Only the regex check survived. + assert len(rule.checks) == 1 + assert rule.checks[0].conditions[0].operator == "regex" + + +def test_non_dict_check_entry_skipped() -> None: + """Checks list entries that aren't dicts are silently ignored.""" + rule = _single_rule( + """ + standard: p + rules: + - id: r + hook: before_model + checks: + - "scalar instead of mapping" + - type: regex + patterns: ["x"] + """ + ) + assert len(rule.checks) == 1