diff --git a/src/uipath/runtime/governance/native/__init__.py b/src/uipath/runtime/governance/native/__init__.py new file mode 100644 index 0000000..c7671b6 --- /dev/null +++ b/src/uipath/runtime/governance/native/__init__.py @@ -0,0 +1,51 @@ +"""Native UiPath governance policy evaluator. + +YAML-defined rules evaluated in-process at each agent lifecycle hook. +Reads policies from the UiPath governance backend +(``GET /api/v1/policy``) at startup and runs the deterministic +detectors backing ISO 42001 controls. + +This subpackage owns: + +- :class:`GovernanceEvaluator` – the evaluator implementation. +- The native policy model: :class:`Rule`, :class:`Check`, + :class:`Condition`, :class:`PolicyIndex`. +- Policy fetch + YAML compilation plumbing. + +Shared output types (``Action``, ``AuditRecord``, …) live in +:mod:`uipath.core.governance`. +""" + +from .evaluator import GovernanceEvaluator +from .loader import ( + get_policy_index, + load_policy_index, + prefetch_policy_index, + reset_policy_index, +) +from .models import ( + Check, + CheckContext, + Condition, + PolicyIndex, + PolicyPack, + Rule, + Severity, +) + +__all__ = [ + "GovernanceEvaluator", + # Loader + "get_policy_index", + "load_policy_index", + "prefetch_policy_index", + "reset_policy_index", + # Native policy model + "Check", + "CheckContext", + "Condition", + "PolicyIndex", + "PolicyPack", + "Rule", + "Severity", +] diff --git a/src/uipath/runtime/governance/native/evaluator.py b/src/uipath/runtime/governance/native/evaluator.py new file mode 100644 index 0000000..deaea39 --- /dev/null +++ b/src/uipath/runtime/governance/native/evaluator.py @@ -0,0 +1,1061 @@ +"""Governance rule evaluator.""" + +from __future__ import annotations + +import logging +import math +import re +from collections import Counter +from datetime import datetime, timezone +from functools import lru_cache +from typing import Any + +from uipath.core.governance.exceptions import GovernanceBlockException +from uipath.core.governance.models import ( + Action, + AuditRecord, + LifecycleHook, + RuleEvaluation, +) + +from uipath.runtime.governance.audit import get_audit_manager +from uipath.runtime.governance.config import EnforcementMode, get_enforcement_mode +from uipath.runtime.governance.native.guardrail_compensation import ( + disabled_guardrails, + submit_compensation, +) +from uipath.runtime.governance.native.models import ( + Check, + CheckContext, + Condition, + PolicyIndex, + Rule, +) + +logger = logging.getLogger(__name__) + + +def _compensation_data_for_hook(context: CheckContext) -> dict[str, Any]: + """Build the ``data`` payload for the /runtime/govern compensating call. + + The server runs the guardrail check against the same content the + evaluator was looking at — so we forward whichever + :class:`CheckContext` field is populated for the active hook. Fields + not relevant to the hook are omitted to keep the payload tight. + """ + if context.hook in (LifecycleHook.BEFORE_AGENT,): + return {"content": context.agent_input} + if context.hook in (LifecycleHook.AFTER_AGENT,): + return {"content": context.agent_output} + if context.hook in (LifecycleHook.BEFORE_MODEL,): + payload: dict[str, Any] = {"content": context.model_input} + if context.messages: + payload["messages"] = context.messages + return payload + if context.hook in (LifecycleHook.AFTER_MODEL,): + return {"content": context.model_output} + if context.hook in (LifecycleHook.TOOL_CALL,): + return {"tool_name": context.tool_name, "tool_args": context.tool_args} + if context.hook in (LifecycleHook.AFTER_TOOL,): + return {"tool_name": context.tool_name, "tool_result": context.tool_result} + # Memory-write and unknown hooks: pass an empty content so the + # server still receives a structurally-valid payload. + return {"content": ""} + + +@lru_cache(maxsize=256) +def _compile_regex(pattern: str) -> re.Pattern[str] | None: + """Compile and cache a regex pattern. + + Args: + pattern: The regex pattern string + + Returns: + Compiled pattern or None if invalid + """ + try: + return re.compile(pattern) + except re.error as e: + logger.warning("Invalid regex pattern '%s': %s", pattern, e) + return None + + +# --- vaderSentiment: lazy-imported singleton --- +# Hard dependency, but lazy-loaded to keep import-time cost off the +# critical path. The except branch is defence against a corrupted +# install (file present in METADATA but module unimportable) — the +# operator no-ops rather than crashing the agent. +_VADER_UNINITIALIZED = object() +_vader_analyzer: Any = _VADER_UNINITIALIZED + + +def _get_vader_analyzer() -> Any: + """Return a cached SentimentIntensityAnalyzer, or None if unavailable.""" + global _vader_analyzer + if _vader_analyzer is _VADER_UNINITIALIZED: + try: + from vaderSentiment.vaderSentiment import ( + SentimentIntensityAnalyzer, + ) + + _vader_analyzer = SentimentIntensityAnalyzer() + except ImportError: + logger.error( + "vaderSentiment failed to import despite being a hard dependency; " + "sentiment_concern checks will not fire. Reinstall uipath-core." + ) + _vader_analyzer = None + return _vader_analyzer + + +# --- chardet: lazy-imported module for encoding integrity (A.7.4) --- +# Hard dependency, lazy-loaded for symmetry with the other library +# wrappers. The except branch covers corrupted installs only. +_CHARDET_UNINITIALIZED = object() +_chardet_module: Any = _CHARDET_UNINITIALIZED + + +def _get_chardet() -> Any: + """Return the chardet module, or None if unavailable.""" + global _chardet_module + if _chardet_module is _CHARDET_UNINITIALIZED: + try: + import chardet + + _chardet_module = chardet + except ImportError: + logger.error( + "chardet failed to import despite being a hard dependency; " + "encoding_concern confidence check will not fire (stdlib " + "signals still apply). Reinstall uipath-core." + ) + _chardet_module = None + return _chardet_module + + +# --- Static patterns for encoding_concern (A.7.4) --- +# Latin-1-as-UTF-8 mojibake bigrams — the visible artefacts when +# UTF-8-encoded text is re-decoded as Latin-1 / Windows-1252. +_MOJIBAKE_BIGRAMS: tuple[str, ...] = ( + "é", + "è", + "â", + "à ", + "ù", + "î", + "ô", + "ç", # accented vowels + "Ä", + "Ö", + "Ü", + "ß", # German umlauts / eszett + "’", + "“", + "â€\x9d", + "–", + "—", + "•", # smart quotes / dashes + "£", + "°", + "§", + "¶", + "©", + "®", # NBSP-leading symbols + "ï¿", + "¿½", # mojibake'd U+FFFD (0xEF 0xBF 0xBD as Latin-1) + "ï»", + "»¿", # mojibake'd BOM (0xEF 0xBB 0xBF as Latin-1) +) + +# Literal hex escape sequences ("\x80" as 4 source chars) indicate raw +# bytes leaked through a string layer rather than being decoded. +_HEX_ESCAPE_PATTERN = re.compile(r"\\x[0-9a-fA-F]{2}") + + +# --- Static patterns for incident_concern (A.8.4) --- +# Stdlib-only categorical taxonomy. Mirrors sentry-sdk's incident shape +# (categorical types over stack/status), but for string payloads from +# model output / tool result rather than exception objects. +_INCIDENT_PATTERNS: dict[str, list[re.Pattern[str]]] = { + "safety_refusal": [ + re.compile( + r"(?i)\b(i\s+(?:cannot|can'?t|am\s+unable\s+to|won'?t\s+be\s+able\s+to)" + r"\s+(?:help|assist|provide|answer|do\s+that))\b" + ), + re.compile(r"(?i)\b(i'?m\s+sorry,?\s+but\s+i\s+(?:cannot|can'?t))\b"), + re.compile(r"(?i)\b(against\s+my\s+(?:guidelines|policies|programming))\b"), + ], + "tool_failure": [ + re.compile( + r"\b(5\d{2})\b\s*(?:internal\s+server\s+error|service\s+unavailable)" + ), + re.compile(r"(?i)\b(ERR_[A-Z_]+|connection\s+refused|ECONNREFUSED)\b"), + re.compile(r"(?i)\b(timed?\s*out|timeout)\b"), + ], + "auth_failure": [ + re.compile(r"\b(401|403)\b\s*(?:unauthori[sz]ed|forbidden)"), + re.compile( + r"(?i)\b(authentication\s+failed|invalid\s+(?:token|credentials))\b" + ), + ], + "quota_exceeded": [ + re.compile(r"\b(429)\b"), + re.compile( + r"(?i)\b(rate\s+limit\s+exceeded|quota\s+exceeded|too\s+many\s+requests)\b" + ), + ], + "hallucination": [ + re.compile(r"(?i)\b(i\s+(?:made\s+(?:that|this)\s+up|am\s+just\s+guessing))\b"), + re.compile(r"(?i)\b(i\s+don'?t\s+actually\s+know|i\s+fabricat(?:ed|ing))\b"), + ], +} + +# --- Static patterns for commitment_concern (A.10.4) --- +# Commitment-language signals. The verb pattern covers both first-person +# promise verbs ("we will refund") and formal-business commitment markers +# common in proposal / SOW outputs ("Cost: $X", "fixed scope", +# "Deliverables", "Timeline: N days", "I propose"). Verb, amount, and +# deadline signals combine via OR semantics — see +# :meth:`_check_commitment_concern`. +_COMMITMENT_VERB_PATTERN = re.compile( + r"(?i)(" + # First-person promise / liability verbs + r"\brefund\b|\breimburse\b|" + r"\bwarranty\b|\bwarrant(?:y|ed|ies)\b|\bguarante[ed]+\b|" + r"\bsla\b|" + r"\bwaive[d]?\b|" + r"\b(?:we|i)\s+(?:will|shall|promise|commit|guarantee)\b|" + r"\b(?:we|i|i'?ll)\s+(?:deliver|provide|complete|finish|" + r"handover|hand\s+over|ship)\b|" + # Proposal / SOW commitment markers + r"\bfixed\s+(?:price|cost|fee|scope|bid|rate)\b|" + r"\bcost\s*:\s*\$?\d|" + r"\bquote\s*:\s*\$?\d|" + r"\bdeliverables?\b|" + r"\btimeline\s*:\s*\d+\s*(?:second|minute|hour|day|week|month|year)s?\b|" + r"\bI\s+propose\b" + r")" +) +# Currency-anchored amount detection. Requires a currency marker adjacent +# to the number so URL fragments (e.g. ``/667851``) don't false-positive. +# Covers symbol-then-number ($780) and number-then-code (780 USD). +# +# Bare percentages (``75%``, ``99.9%``) are deliberately NOT matched +# here — they fire on benign status / progress text ("75% complete", +# "99.9% uptime") under OR semantics. Real percentage-bearing +# commitments ("we'll give you a 20% discount", "refund 100%") still +# fire via the verb pattern. +_COMMITMENT_AMOUNT_FALLBACK = re.compile( + r"(?:\$|€|£|¥|₹|USD|EUR|GBP|JPY|INR)\s*\d[\d,]*(?:\.\d+)?" + r"|\b\d[\d,]*(?:\.\d+)?\s*(?:USD|EUR|GBP|JPY|INR|" + r"dollars?|euros?|pounds?|yen|rupees?)\b" +) +_COMMITMENT_DEADLINE_PATTERN = re.compile( + r"(?i)\bwithin\s+\d+\s*(?:second|minute|hour|day|week|month|year)s?\b" + r"|\bby\s+(?:tomorrow|next\s+\w+|\d+/\d+(?:/\d+)?)\b" +) + + +class GovernanceEvaluator: + """Evaluates governance rules against check contexts. + + Supports two enforcement modes: + - AUDIT: Log all violations but never block (DENY becomes AUDIT in final action) + - ENFORCE: Actually block on DENY rules + + Default mode is AUDIT for safety. + """ + + def __init__( + self, + policy_index: PolicyIndex, + mode: EnforcementMode | None = None, + ) -> None: + """Initialize with a compiled policy index and optional mode override.""" + self._policy_index = policy_index + self._mode = mode + + @property + def policy_index(self) -> PolicyIndex: + """Return the compiled policy index this evaluator runs against.""" + return self._policy_index + + @property + def mode(self) -> EnforcementMode: + """Get the enforcement mode (uses config default if not set).""" + if self._mode is not None: + return self._mode + return get_enforcement_mode() + + @mode.setter + def mode(self, value: EnforcementMode) -> None: + """Set the enforcement mode.""" + self._mode = value + + def is_audit_mode(self) -> bool: + """Check if running in audit-only mode.""" + return self.mode == EnforcementMode.AUDIT + + def is_enforce_mode(self) -> bool: + """Check if running in enforce mode (will block on DENY).""" + return self.mode == EnforcementMode.ENFORCE + + def evaluate(self, context: CheckContext) -> AuditRecord: + """Evaluate rules registered for ``context.hook`` against the context. + + Only rules whose ``hook`` field matches the current lifecycle hook + are evaluated — a ``tool_call`` rule does not fire on + ``before_model``, and vice versa. This avoids running checks + against fields the context cannot provide and keeps the audit + stream scoped to the active phase. + + The final action depends on the enforcement mode: + - DISABLED mode: Short-circuit; no rules evaluated, no audit emitted. + - AUDIT mode: Even DENY rules result in AUDIT action (log only, don't block) + - ENFORCE mode: DENY rules result in DENY action AND a + :class:`GovernanceBlockException` is raised. + + Audit events (per-rule + hook summary) are emitted via the + global :func:`get_audit_manager` so callers do not need to do + any emission themselves. + + Args: + context: The check context with hook and content + + Returns: + AuditRecord with all evaluations and final action. + + Raises: + GovernanceBlockException: In ENFORCE mode when a DENY rule matches. + """ + mode = self.mode + if mode == EnforcementMode.DISABLED: + return AuditRecord( + timestamp=datetime.now(timezone.utc), + agent_name=context.agent_name, + runtime_id=context.runtime_id, + trace_id=context.trace_id, + hook=context.hook, + evaluations=[], + final_action=Action.ALLOW, + metadata={**context.metadata, "enforcement_mode": mode.value}, + ) + + rules = self._policy_index.get_rules_for_hook(context.hook) + + evaluations: list[RuleEvaluation] = [] + raw_action = Action.ALLOW # The action before mode adjustment + deny_would_fire = False # Track if DENY would have fired + + for rule in rules: + if not rule.enabled: + continue + + evaluation = self._evaluate_rule(rule, context) + evaluations.append(evaluation) + + if evaluation.matched: + # Take the most restrictive action + if rule.action == Action.DENY: + raw_action = Action.DENY + deny_would_fire = True + elif rule.action == Action.ESCALATE and raw_action != Action.DENY: + raw_action = Action.ESCALATE + elif rule.action == Action.AUDIT and raw_action == Action.ALLOW: + raw_action = Action.AUDIT + + # Apply enforcement mode + final_action = self._apply_enforcement_mode(raw_action) + + # Build metadata with mode info + record_metadata = dict(context.metadata) + record_metadata["enforcement_mode"] = mode.value + if deny_would_fire and self.is_audit_mode(): + record_metadata["audit_mode_would_deny"] = True + + audit = AuditRecord( + timestamp=datetime.now(timezone.utc), + agent_name=context.agent_name, + runtime_id=context.runtime_id, + trace_id=context.trace_id, + hook=context.hook, + evaluations=evaluations, + final_action=final_action, + metadata=record_metadata, + ) + + self._emit_audit(audit, mode) + + # For any guardrail mapped to UiPath but currently disabled, hand + # the disabled guardrails to the governance-server's + # /runtime/govern endpoint. The SERVER runs the guardrail check + # AND writes the trace (the payload carries traceId / src_timestamp + # / hook / agent so it can correlate) — the agent does NOT emit a + # trace itself, to avoid double-writing. Fire-and-forget on a + # daemon thread so a slow or unreachable endpoint never blocks + # the agent. + self._dispatch_compensation(audit, context) + + if final_action == Action.DENY: + raise GovernanceBlockException.from_audit_record(audit) + + return audit + + def _dispatch_compensation( + self, audit: AuditRecord, context: CheckContext + ) -> None: + """Schedule compensating governance for any matched fallback rules. + + Hands the call to the bounded background pool in + :func:`uipath.runtime.governance.native.guardrail_compensation.submit_compensation`. + That helper owns concurrency, queue caps, exception isolation, + and graceful process-exit cancellation — this method just + builds the payload, logs the summary, and submits. + """ + try: + disabled = disabled_guardrails(audit, self._policy_index) + if not disabled: + return + + validators = [rule["validator"] for rule in disabled] + + # Surface the disabled-guardrail fire-up: how many rules + # triggered the compensating call, and which validators + # they map to (e.g. pii_detection / prompt_injection / + # harmful_content). One line per dispatch so an operator + # can see the volume + breakdown at a glance. + logger.info( + "Compensating governance triggered: hook=%s, count=%d, validators=[%s]", + audit.hook.value, + len(disabled), + ", ".join(validators), + ) + + submit_compensation( + rules=disabled, + data=_compensation_data_for_hook(context), + hook=audit.hook.value, + trace_id=audit.trace_id, + src_timestamp=audit.timestamp.isoformat(), + agent_name=audit.agent_name, + runtime_id=audit.runtime_id, + ) + except Exception as exc: # noqa: BLE001 - fail-open + logger.warning( + "Failed to dispatch compensating governance call: %s", exc + ) + + def _emit_audit(self, audit: AuditRecord, mode: EnforcementMode) -> None: + """Emit per-rule and hook-summary events to the global audit manager. + + Failure-isolated: audit-sink errors must never break evaluation. + Sink-level circuit breaking is handled inside :class:`AuditManager`. + """ + try: + manager = get_audit_manager() + except Exception as exc: # pragma: no cover - defensive + logger.debug("Audit manager unavailable; skipping emission: %s", exc) + return + + hook_name = audit.hook.name + + # ``guardrail_fallback`` rules are server-traced: the agent POSTs + # to ``/runtime/govern`` (see :meth:`_dispatch_compensation`) and + # the governance-server emits the audit event with the actual + # validator verdict. Emitting a Python-side ``rule_evaluation`` + # event here would produce a duplicate trace carrying no + # verdict, so filter these rules out of every event the Python + # evaluator emits (per-rule AND the hook summary's counts). + emittable = [ + ev for ev in audit.evaluations + if not self._is_guardrail_fallback_rule(ev.rule_id) + ] + + for evaluation in emittable: + manager.emit_rule_evaluation( + rule_id=evaluation.rule_id, + rule_name=evaluation.rule_name, + pack_name=evaluation.pack_name, + hook=hook_name, + matched=evaluation.matched, + action=evaluation.action.value if evaluation.matched else "allow", + detail=evaluation.detail, + agent_name=audit.agent_name, + trace_id=audit.trace_id, + description=evaluation.description, + ) + + manager.emit_hook_summary( + hook=hook_name, + agent_name=audit.agent_name, + total_rules=len(emittable), + matched_rules=sum(1 for ev in emittable if ev.matched), + final_action=audit.final_action.value, + trace_id=audit.trace_id, + enforcement_mode=mode.value, + ) + + def _is_guardrail_fallback_rule(self, rule_id: str) -> bool: + """Return True if the rule is a UiPath-compensating fallback rule. + + Such rules carry a ``guardrail_fallback`` condition; their audit + trace is emitted by the governance-server in response to the + ``/runtime/govern`` POST, so the Python evaluator must not emit + a duplicate trace for them. + """ + rule = self._policy_index.get_rule(rule_id) + if rule is None: + return False + for check in rule.checks: + for cond in check.conditions: + if cond.operator == "guardrail_fallback": + return True + return False + + def _apply_enforcement_mode(self, raw_action: Action) -> Action: + """Apply enforcement mode to the raw action. + + In AUDIT mode: + - DENY becomes AUDIT (log but don't block) + - ESCALATE becomes AUDIT (log but don't escalate) + - AUDIT stays AUDIT + - ALLOW stays ALLOW + + In ENFORCE mode: + - All actions pass through unchanged + """ + if self.mode == EnforcementMode.AUDIT: + if raw_action in (Action.DENY, Action.ESCALATE): + return Action.AUDIT + return raw_action + + def evaluate_before_agent( + self, + agent_input: str, + agent_name: str, + runtime_id: str, + trace_id: str, + model_name: str = "", + **kwargs: Any, + ) -> AuditRecord: + """Evaluate BEFORE_AGENT rules.""" + context = CheckContext( + hook=LifecycleHook.BEFORE_AGENT, + agent_name=agent_name, + runtime_id=runtime_id, + trace_id=trace_id, + agent_input=agent_input, + model_name=model_name, + metadata=kwargs.get("metadata", {}), + ) + return self.evaluate(context) + + def evaluate_after_agent( + self, + agent_output: str, + agent_name: str, + runtime_id: str, + trace_id: str, + **kwargs: Any, + ) -> AuditRecord: + """Evaluate AFTER_AGENT rules.""" + context = CheckContext( + hook=LifecycleHook.AFTER_AGENT, + agent_name=agent_name, + runtime_id=runtime_id, + trace_id=trace_id, + agent_output=agent_output, + metadata=kwargs.get("metadata", {}), + ) + return self.evaluate(context) + + def evaluate_before_model( + self, + model_input: str, + agent_name: str, + runtime_id: str, + trace_id: str, + messages: list[dict[str, Any]] | None = None, + model_name: str = "", + **kwargs: Any, + ) -> AuditRecord: + """Evaluate BEFORE_MODEL rules.""" + context = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name=agent_name, + runtime_id=runtime_id, + trace_id=trace_id, + model_input=model_input, + model_name=model_name, + messages=messages or [], + metadata=kwargs.get("metadata", {}), + ) + return self.evaluate(context) + + def evaluate_after_model( + self, + model_output: str, + agent_name: str, + runtime_id: str, + trace_id: str, + **kwargs: Any, + ) -> AuditRecord: + """Evaluate AFTER_MODEL rules.""" + context = CheckContext( + hook=LifecycleHook.AFTER_MODEL, + agent_name=agent_name, + runtime_id=runtime_id, + trace_id=trace_id, + model_output=model_output, + metadata=kwargs.get("metadata", {}), + ) + return self.evaluate(context) + + def evaluate_tool_call( + self, + tool_name: str, + tool_args: dict[str, Any], + agent_name: str, + runtime_id: str, + trace_id: str, + session_state: dict[str, Any] | None = None, + **kwargs: Any, + ) -> AuditRecord: + """Evaluate TOOL_CALL rules.""" + context = CheckContext( + hook=LifecycleHook.TOOL_CALL, + agent_name=agent_name, + runtime_id=runtime_id, + trace_id=trace_id, + tool_name=tool_name, + tool_args=tool_args, + session_state=session_state or {}, + metadata=kwargs.get("metadata", {}), + ) + return self.evaluate(context) + + def evaluate_after_tool( + self, + tool_name: str, + tool_result: str, + agent_name: str, + runtime_id: str, + trace_id: str, + **kwargs: Any, + ) -> AuditRecord: + """Evaluate AFTER_TOOL rules.""" + context = CheckContext( + hook=LifecycleHook.AFTER_TOOL, + agent_name=agent_name, + runtime_id=runtime_id, + trace_id=trace_id, + tool_name=tool_name, + tool_result=tool_result, + metadata=kwargs.get("metadata", {}), + ) + return self.evaluate(context) + + def _evaluate_rule(self, rule: Rule, context: CheckContext) -> RuleEvaluation: + """Evaluate a single rule against the context.""" + if not rule.checks: + # No checks = always matches (for audit-only rules) + return RuleEvaluation( + rule_id=rule.rule_id, + rule_name=rule.name, + matched=True, + detail="Rule has no conditions (always matches)", + pack_name=rule.pack_name, + action=rule.action, + description=rule.description, + ) + + check_results: list[dict[str, Any]] = [] + any_check_matched = False + + for check in rule.checks: + matched, detail = self._evaluate_check(check, context) + check_results.append( + { + "matched": matched, + "detail": detail, + "action": check.action.value, + } + ) + if matched: + any_check_matched = True + + # Surface the FIRST matched check's message; falls back to the + # first check's detail (empty string when none matched) for + # backward compatibility with rules that have a single check. + first_matched_detail = next( + (cr["detail"] for cr in check_results if cr["matched"]), + check_results[0]["detail"] if check_results else "", + ) + + return RuleEvaluation( + rule_id=rule.rule_id, + rule_name=rule.name, + matched=any_check_matched, + detail=first_matched_detail, + pack_name=rule.pack_name, + action=rule.action if any_check_matched else Action.ALLOW, + description=rule.description, + check_results=check_results, + ) + + def _evaluate_check(self, check: Check, context: CheckContext) -> tuple[bool, str]: + """Evaluate a single check against the context.""" + if not check.conditions: + return True, "No conditions (always matches)" + + results = [] + for condition in check.conditions: + matched = self._evaluate_condition(condition, context) + results.append(matched) + + if check.logic == "any": + final_match = any(results) + else: # "all" is default + final_match = all(results) + + detail = check.message if final_match else "" + return final_match, detail + + def _evaluate_condition(self, condition: Condition, context: CheckContext) -> bool: + """Evaluate a single condition against the context.""" + field_value = self._get_field_value(condition.field, context) + result = self._apply_operator(condition.operator, field_value, condition.value) + + if condition.negate: + result = not result + + return result + + def _get_field_value(self, field: str, context: CheckContext) -> Any: + """Get a field value from the context.""" + parts = field.split(".") + + # Start with context + value: Any = context + + for part in parts: + if hasattr(value, part): + value = getattr(value, part) + elif isinstance(value, dict) and part in value: + value = value[part] + else: + return None + + return value + + def _apply_operator( + self, operator: str, field_value: Any, check_value: Any + ) -> bool: + """Apply an operator to compare field value against check value.""" + # Handle existence checks before the None check + if operator == "exists": + return field_value is not None + if operator == "not_exists": + return field_value is None + + # guardrail_fallback fires only when the guardrail is mapped to + # UiPath but its policy is disabled. Config travels in + # ``check_value``; the rule's ``field`` is unused (so + # ``field_value`` is ``None`` here, which is expected — we must + # special-case this before the generic ``None`` short-circuit + # below). + if operator == "guardrail_fallback": + cfg = check_value if isinstance(check_value, dict) else {} + return bool(cfg.get("mapped_to_uipath", False)) and not bool( + cfg.get("policy_enabled", True) + ) + + if field_value is None: + return False + + # Numeric operators don't need stringification — short-circuit + # before `str(field_value)` (expensive for dict / large payloads). + if operator in ("gt", "gte", "lt", "lte"): + try: + lhs = float(field_value) + rhs = float(check_value) + except (ValueError, TypeError): + return False + if operator == "gt": + return lhs > rhs + if operator == "gte": + return lhs >= rhs + if operator == "lt": + return lhs < rhs + return lhs <= rhs + + field_str = str(field_value) + + match operator: + case "equals" | "eq": + return field_str == str(check_value) + + case "not_equals" | "ne": + return field_str != str(check_value) + + case "contains": + return str(check_value).lower() in field_str.lower() + + case "not_contains": + return str(check_value).lower() not in field_str.lower() + + case "regex" | "matches": + compiled = _compile_regex(str(check_value)) + if compiled is None: + return False + return bool(compiled.search(field_str)) + + case "in_list": + if isinstance(check_value, list): + return field_str in check_value + return False + + case "not_in_list": + if isinstance(check_value, list): + return field_str not in check_value + return True + + case "vader_concern": + # VADER compound score <= threshold. + # check_value: dict like {"threshold": -0.3} (default -0.3) + return self._check_vader_concern(field_str, check_value) + + case "encoding_concern": + # chardet-backed encoding integrity check (A.7.4). + # check_value: dict with optional `min_confidence` (default 0.5) + # and `max_replacement_ratio` (default 0.05). + return self._check_encoding_concern(field_str, check_value) + + case "entropy_concern": + # Shannon entropy outside expected range (A.7.4). + # check_value: dict with optional `min` (default 1.5) and + # `max` (default 7.5) bits/byte. Stdlib only. + return self._check_entropy_concern(field_str, check_value) + + case "incident_concern": + # Categorical incident detection (A.8.4). + # check_value: dict with optional `categories` list + # (subset of safety_refusal/tool_failure/auth_failure/ + # quota_exceeded/hallucination). Default: all categories. + return self._check_incident_concern(field_str, check_value) + + case "commitment_concern": + # Customer commitment language detection (A.10.4). + # check_value: dict with optional `require_amount` (default + # True) and `require_deadline` (default False). Fires when + # a commitment verb co-occurs with the configured signals. + return self._check_commitment_concern(field_str, check_value) + + case _: + logger.debug("Unknown operator: %s", operator) + return False + + @staticmethod + def _check_vader_concern(text: str, params: Any) -> bool: + """Return True if VADER compound score on `text` is <= threshold. + + Args: + text: Text to analyse. + params: Either a dict with `threshold` key, or a numeric threshold + directly. Default threshold is -0.3 (clearly-negative). + + Returns: + True iff vaderSentiment is available AND compound score <= threshold. + Returns False on empty input or if the library is not installed — + sentiment checks no-op rather than crash. + """ + if not text or not text.strip(): + return False + + analyzer = _get_vader_analyzer() + if analyzer is None: + return False + + if isinstance(params, dict): + threshold = float(params.get("threshold", -0.2)) + else: + try: + threshold = float(params) + except (TypeError, ValueError): + threshold = -0.3 + + try: + compound = float(analyzer.polarity_scores(text)["compound"]) + except Exception as exc: # pragma: no cover - defensive + logger.debug("VADER analysis failed: %s", exc) + return False + + return compound <= threshold + + @staticmethod + def _check_encoding_concern(text: str, params: Any) -> bool: + r"""Return True if `text` shows encoding integrity issues. + + Sums multiple deterministic corruption signals against text length: + - U+FFFD replacement characters (already-decoded lossy text) + - Literal ``�`` escape sequences carried through a JSON + / repr layer rather than being decoded + - Literal ``\xHH`` hex escapes (raw bytes leaked into a string) + - Latin-1-as-UTF-8 mojibake bigrams (e.g. ``é``, ``’``) + If the corruption ratio exceeds ``max_replacement_ratio`` the + check fires. chardet (when installed) is consulted as a + secondary low-confidence signal. + """ + if not text or not text.strip(): + return False + + if not isinstance(params, dict): + params = {} + min_confidence = float(params.get("min_confidence", 0.5)) + max_replacement_ratio = float(params.get("max_replacement_ratio", 0.05)) + min_corruption_events = int(params.get("min_corruption_events", 2)) + + length = max(len(text), 1) + + replacement_chars = text.count("�") + literal_ufffd_escapes = text.count("\\ufffd") + hex_escapes = len(_HEX_ESCAPE_PATTERN.findall(text)) + mojibake_bigrams = sum(text.count(bigram) for bigram in _MOJIBAKE_BIGRAMS) + + # Absolute count of distinct corruption *events* (one per + # U+FFFD, one per literal escape sequence, one per mojibake + # bigram). Even diluted by a lot of clean text, a few of these + # in production output is a strong signal. + corruption_events = ( + replacement_chars + literal_ufffd_escapes + hex_escapes + mojibake_bigrams + ) + if corruption_events >= min_corruption_events: + return True + + # Ratio-based fallback for cases below the absolute floor: still + # catches very short payloads where a single corruption char is + # disproportionate. + # Weight each event by its source-char span so denser corruption + # in shorter text trips the ratio sooner: + # U+FFFD = 1 char, "�" = 6 chars, "\xHH" = 4 chars, + # mojibake bigram = 2 chars. + corruption_chars = ( + replacement_chars + + 6 * literal_ufffd_escapes + + 4 * hex_escapes + + 2 * mojibake_bigrams + ) + if corruption_chars / length > max_replacement_ratio: + return True + + # Secondary: chardet on the encoded bytes. For pure str input + # this almost always reports high UTF-8/ASCII confidence (the + # branch is intentionally permissive), but it does catch bytes + # routed through `repr()` or `__str__` of a `bytes` object that + # chardet recognises as a non-UTF8 encoding with low confidence. + chardet = _get_chardet() + if chardet is None: + return False + try: + detection = chardet.detect(text.encode("utf-8", errors="replace")) + confidence = float(detection.get("confidence") or 0.0) + except Exception as exc: # pragma: no cover - defensive + logger.debug("chardet detection failed: %s", exc) + return False + + return confidence < min_confidence + + @staticmethod + def _check_entropy_concern(text: str, params: Any) -> bool: + """Return True if Shannon entropy of `text` is outside an expected range. + + Stdlib-only. Entropy is computed in bits per symbol over byte + frequencies. English prose typically lands ~3.5–4.5 bits/byte; + binary noise approaches 8 bits/byte; constant/repetitive text + approaches 0. + """ + if not text or not text.strip(): + return False + + if not isinstance(params, dict): + params = {} + lo = float(params.get("min", 1.5)) + hi = float(params.get("max", 7.5)) + + data = text.encode("utf-8", errors="replace") + total = len(data) + if total == 0: + return False + + counts = Counter(data) + entropy = 0.0 + for c in counts.values(): + p = c / total + entropy -= p * math.log2(p) + + return entropy < lo or entropy > hi + + @staticmethod + def _check_incident_concern(text: str, params: Any) -> bool: + """Return True if `text` matches any configured incident pattern (A.8.4). + + Categories: safety_refusal, tool_failure, auth_failure, + quota_exceeded, hallucination. Pass ``{"categories": [...]}`` to + restrict; default scans all categories. + """ + if not text or not text.strip(): + return False + + if isinstance(params, dict): + requested = params.get("categories") + else: + requested = None + + if not requested: + categories = list(_INCIDENT_PATTERNS.keys()) + else: + categories = [c for c in requested if c in _INCIDENT_PATTERNS] + + for category in categories: + for pattern in _INCIDENT_PATTERNS[category]: + if pattern.search(text): + return True + return False + + @staticmethod + def _check_commitment_concern(text: str, params: Any) -> bool: + """Return True if `text` carries customer-commitment language (A.10.4). + + OR semantics: a commitment-verb match always fires; when + ``require_amount`` is true, a currency-anchored amount alone also + fires; when ``require_deadline`` is true, a deadline phrase alone + also fires. With both flags false the rule matches on verb only + (verb-only mode). + + The verb pattern covers first-person promise verbs *and* proposal + / SOW commitment markers ("Cost: $X", "fixed scope", + "Deliverables", "Timeline: N days", "I propose"). The amount + pattern requires a currency marker adjacent to the number so URL + fragments don't false-positive. + """ + if not text or not text.strip(): + return False + + if not isinstance(params, dict): + params = {} + require_amount = bool(params.get("require_amount", True)) + require_deadline = bool(params.get("require_deadline", False)) + + verb_match = bool(_COMMITMENT_VERB_PATTERN.search(text)) + + # Verb-only mode: neither supporting signal is enabled. + if not require_amount and not require_deadline: + return verb_match + + amount_match = require_amount and bool( + _COMMITMENT_AMOUNT_FALLBACK.search(text) + ) + deadline_match = require_deadline and bool( + _COMMITMENT_DEADLINE_PATTERN.search(text) + ) + return verb_match or amount_match or deadline_match diff --git a/tests/test_commitment_concern.py b/tests/test_commitment_concern.py new file mode 100644 index 0000000..a46149b --- /dev/null +++ b/tests/test_commitment_concern.py @@ -0,0 +1,205 @@ +"""Tests for the commitment_concern check (A.10.4). + +The check now uses OR semantics: a verb match, an amount match, or a +deadline match is each sufficient when its enabling flag is on. With +both flags false the rule matches verb-only. + +The verb pattern also covers proposal / SOW style commitment markers +("Cost: $X", "fixed scope", "Deliverables", "Timeline", "I propose") +so formal-business commitments without first-person verbs still fire. + +Amount detection requires a currency marker adjacent to the number to +prevent URL fragments (forum-post IDs, image dimensions, etc.) from +false-positiving. +""" + +from __future__ import annotations + +import pytest + +from uipath.runtime.governance.native.evaluator import GovernanceEvaluator + +# --------------------------------------------------------------------------- +# The proposal-style sample that originally slipped through the rule. +# Contains: "Cost: $780 (fixed for the above scope)", "Deliverables", +# "Timeline: 4 days total", "I propose", a forum URL with a 6-digit ID. +# Triple-quoted so we keep the line breaks the model produced. +# --------------------------------------------------------------------------- +SAMPLE_PROPOSAL = """To address your concerns, I reviewed the official UiPath site you referenced and relevant resources on uipath.com to inform a fast stabilization plan. Notable findings include: a community CI/CD sample for UiPath projects (https://forum.uipath.com/t/announcement-ci-cd-pipeline-sample-implementation-s-for-uipath-projects-alpha/667851). + +Here's how I propose we turn your software around quickly: + +Plan +- Triage (logs + reproduce) +- Quick stabilization + +Deliverables +- Defect triage report + +Timeline: 4 days total +- Day 1: Triage + reproduction + +Cost: $780 (fixed for the above scope) +""" + + +@pytest.mark.parametrize( + "text", + [ + "Cost: $780 (fixed for the above scope)", + "Deliverables: a, b, c", + "Timeline: 4 days total for the whole engagement", + "I propose we turn this around in a week", + "We will refund the difference", + "I'll deliver the report by Friday", + "the warranty covers parts only", + "fixed price of one hundred dollars", + ], +) +def test_verb_match_alone_fires(text: str) -> None: + """Each verb-style commitment marker fires on its own (verb-only mode).""" + assert ( + GovernanceEvaluator._check_commitment_concern( + text, {"require_amount": False, "require_deadline": False} + ) + is True + ) + + +def test_full_proposal_sample_fires() -> None: + """The originally-missed proposal output now fires.""" + assert ( + GovernanceEvaluator._check_commitment_concern( + SAMPLE_PROPOSAL, + {"require_amount": False, "require_deadline": False}, + ) + is True + ) + + +@pytest.mark.parametrize( + "text", + [ + "$780", + "We charge USD 1,200 per seat", + "The fee is 500 EUR", + ], +) +def test_amount_alone_fires_when_require_amount_true(text: str) -> None: + """Currency-anchored amount alone fires under OR semantics.""" + assert ( + GovernanceEvaluator._check_commitment_concern( + text, {"require_amount": True, "require_deadline": False} + ) + is True + ) + + +@pytest.mark.parametrize( + "text", + [ + "Task is 75% complete.", + "We maintain 99.9% uptime.", + "Battery at 50%.", + "Score: 12%.", + ], +) +def test_bare_percentage_does_not_fire(text: str) -> None: + """Status-only percentages must not trigger commitment_concern. + + Regression for the prior ``\\d{1,3}\\s*%`` branch in the amount + regex, which fired on benign status / progress text. Real + percentage-bearing commitments ("we'll give a 20% discount") + still fire via the verb pattern. + """ + assert ( + GovernanceEvaluator._check_commitment_concern( + text, {"require_amount": True, "require_deadline": False} + ) + is False + ) + + +def test_percentage_with_verb_still_fires() -> None: + """A commitment verb co-occurring with a percentage still fires.""" + assert ( + GovernanceEvaluator._check_commitment_concern( + "We will refund 100% of the purchase price.", + {"require_amount": True, "require_deadline": False}, + ) + is True + ) + + +def test_amount_alone_does_not_fire_when_require_amount_false() -> None: + """Amount-only text is silent when require_amount=False and no verb.""" + assert ( + GovernanceEvaluator._check_commitment_concern( + "The list price is $780.", + {"require_amount": False, "require_deadline": False}, + ) + is False + ) + + +def test_deadline_alone_fires_when_require_deadline_true() -> None: + """Deadline phrase alone fires under OR semantics.""" + assert ( + GovernanceEvaluator._check_commitment_concern( + "Will be done within 5 days.", + {"require_amount": False, "require_deadline": True}, + ) + is True + ) + + +def test_url_fragment_digits_do_not_false_positive() -> None: + """A long URL with embedded digits is not a 'commitment'. + + Catches the prior price-parser misbehaviour where Price.fromstring() + picked up forum-post IDs (e.g. ``667851``) and conflated them with + unrelated currency symbols elsewhere in the text. + """ + text = ( + "See https://forum.example.com/t/topic/667851 for details — " + "no commitment language here." + ) + assert ( + GovernanceEvaluator._check_commitment_concern( + text, {"require_amount": True, "require_deadline": True} + ) + is False + ) + + +@pytest.mark.parametrize( + "text", + [ + "", + " ", + "Just chatting about the weather today.", + "The product is durable and well-made.", + ], +) +def test_no_signal_does_not_fire(text: str) -> None: + """Text without any commitment signal stays silent regardless of flags.""" + assert ( + GovernanceEvaluator._check_commitment_concern( + text, {"require_amount": True, "require_deadline": True} + ) + is False + ) + + +def test_non_dict_params_treated_as_defaults() -> None: + """``params`` of the wrong type degrades to defaults rather than crashing.""" + assert ( + GovernanceEvaluator._check_commitment_concern("we will refund", None) + is True + ) + assert ( + GovernanceEvaluator._check_commitment_concern( + "no verbs here", "garbage" + ) + is False + ) diff --git a/tests/test_evaluator.py b/tests/test_evaluator.py new file mode 100644 index 0000000..d57e2de --- /dev/null +++ b/tests/test_evaluator.py @@ -0,0 +1,401 @@ +"""Tests for the audit + enforcement behavior of GovernanceEvaluator. + +The evaluator owns three responsibilities that used to be scattered +across wrapper.py and adapter callbacks: + +1. DISABLED enforcement mode short-circuits — no rules evaluated, no + audit events emitted, no exceptions raised. +2. AUDIT mode evaluates rules and emits audit events, but transforms + matched DENY actions into AUDIT so execution continues. +3. ENFORCE mode evaluates, emits audit, and raises + :class:`GovernanceBlockException` when a DENY rule matches. + +Plus a fail-safe contract: a misbehaving audit sink must not stop +evaluation from completing or propagate as an exception. +""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import patch + +import pytest +from uipath.core.governance.exceptions import GovernanceBlockException +from uipath.core.governance.models import Action, LifecycleHook + +from uipath.runtime.governance.audit import ( + AuditEvent, + AuditSink, + EventType, + get_audit_manager, + reset_audit_manager, +) +from uipath.runtime.governance.config import ( + EnforcementMode, + reset_enforcement_mode, + set_enforcement_mode, +) +from uipath.runtime.governance.native.evaluator import GovernanceEvaluator +from uipath.runtime.governance.native.models import ( + Check, + CheckContext, + Condition, + PolicyIndex, + PolicyPack, + Rule, +) + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +class _CapturingSink(AuditSink): + """Audit sink that records every event for assertions.""" + + def __init__(self) -> None: + self.events: list[AuditEvent] = [] + + @property + def name(self) -> str: + return "capturing" + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + +def _deny_rule_on_input_contains(needle: str) -> Rule: + """Build a rule that DENIES when agent_input contains ``needle``.""" + return Rule( + rule_id="TEST-01", + name="Test deny on input", + clause="A.1.1", + hook=LifecycleHook.BEFORE_AGENT, + action=Action.DENY, + checks=[ + Check( + conditions=[ + Condition( + operator="contains", + field="agent_input", + value=needle, + ) + ], + action=Action.DENY, + message=f"Input must not contain {needle!r}", + ) + ], + ) + + +def _build_index_with(rule: Rule) -> PolicyIndex: + """Wrap a single rule in a one-pack PolicyIndex.""" + idx = PolicyIndex() + idx.add_pack( + PolicyPack( + name="test_pack", + version="1.0", + description="test", + rules=[rule], + ) + ) + return idx + + +def _ctx(agent_input: str) -> CheckContext: + return CheckContext( + hook=LifecycleHook.BEFORE_AGENT, + agent_name="test-agent", + runtime_id="run-1", + trace_id="trace-1", + agent_input=agent_input, + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def capturing_audit(): + """Replace the global audit manager with a fresh one wired to a capturing sink. + + Yields the sink so tests can inspect emitted events. Restores the + global manager on teardown. + """ + reset_audit_manager() + manager = get_audit_manager() + # Default sinks (traces / console) are noisy here — drop them. + for existing_name in list(manager.list_sinks()): + manager.unregister_sink(existing_name) + sink = _CapturingSink() + manager.register_sink(sink) + # Force synchronous emission so assertions don't race the worker thread. + manager._async_mode = False + yield sink + reset_audit_manager() + + +@pytest.fixture(autouse=True) +def _reset_enforcement_mode(): + """Each test gets a clean enforcement-mode slate.""" + reset_enforcement_mode() + yield + reset_enforcement_mode() + + +# --------------------------------------------------------------------------- +# DISABLED mode +# --------------------------------------------------------------------------- + + +def test_disabled_mode_short_circuits_with_empty_record(capturing_audit): + """DISABLED returns an empty AuditRecord and emits nothing.""" + set_enforcement_mode(EnforcementMode.DISABLED) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("secret")) + ) + + audit = evaluator.evaluate(_ctx("definitely contains secret")) + + assert audit.evaluations == [] + assert audit.final_action == Action.ALLOW + assert audit.metadata["enforcement_mode"] == "disabled" + assert capturing_audit.events == [] + + +def test_disabled_mode_does_not_raise_on_deny_match(capturing_audit): + """Even when a DENY rule WOULD match, DISABLED never raises.""" + set_enforcement_mode(EnforcementMode.DISABLED) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("blocked")) + ) + + # Must not raise. + evaluator.evaluate(_ctx("this is blocked")) + + +# --------------------------------------------------------------------------- +# AUDIT mode +# --------------------------------------------------------------------------- + + +def test_audit_mode_transforms_deny_to_audit(capturing_audit): + """AUDIT mode evaluates rules but never returns a DENY final_action.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("secret")) + ) + + audit = evaluator.evaluate(_ctx("contains secret data")) + + assert len(audit.evaluations) == 1 + assert audit.evaluations[0].matched is True + assert audit.evaluations[0].action == Action.DENY # raw rule action preserved + assert audit.final_action == Action.AUDIT # mode-adjusted + assert audit.metadata["audit_mode_would_deny"] is True + + +def test_audit_mode_does_not_raise_on_deny_match(capturing_audit): + """AUDIT mode never raises GovernanceBlockException, even on a DENY hit.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("blocked")) + ) + + evaluator.evaluate(_ctx("this is blocked")) # must not raise + + +def test_audit_mode_emits_per_rule_and_summary_events(capturing_audit): + """One rule_evaluation event per rule + one hook_summary per evaluate().""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("secret")) + ) + + evaluator.evaluate(_ctx("contains secret")) + + rule_events = [ + e for e in capturing_audit.events if e.event_type == EventType.RULE_EVALUATION + ] + summary_events = [ + e for e in capturing_audit.events if e.event_type == EventType.HOOK_END + ] + assert len(rule_events) == 1 + assert rule_events[0].hook == "BEFORE_AGENT" + assert rule_events[0].data["rule_id"] == "TEST-01" + assert rule_events[0].data["matched"] is True + assert rule_events[0].data["action"] == "deny" + + assert len(summary_events) == 1 + assert summary_events[0].data["matched_rules"] == 1 + assert summary_events[0].data["final_action"] == "audit" + assert summary_events[0].data["enforcement_mode"] == "audit" + + +def test_audit_mode_unmatched_rule_logged_as_allow(capturing_audit): + """Unmatched rules still emit a rule_evaluation event with action='allow'.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("secret")) + ) + + evaluator.evaluate(_ctx("benign user query")) + + rule_events = [ + e for e in capturing_audit.events if e.event_type == EventType.RULE_EVALUATION + ] + assert len(rule_events) == 1 + assert rule_events[0].data["matched"] is False + assert rule_events[0].data["action"] == "allow" + + +# --------------------------------------------------------------------------- +# ENFORCE mode +# --------------------------------------------------------------------------- + + +def test_enforce_mode_raises_on_deny_match(capturing_audit): + """ENFORCE mode raises GovernanceBlockException when a DENY rule matches.""" + set_enforcement_mode(EnforcementMode.ENFORCE) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("blocked")) + ) + + with pytest.raises(GovernanceBlockException) as exc_info: + evaluator.evaluate(_ctx("input is blocked")) + + exc = exc_info.value + assert exc.rule_id == "TEST-01" + assert exc.rule_name == "Test deny on input" + assert exc.audit_record is not None + assert exc.audit_record.final_action == Action.DENY + + +def test_enforce_mode_emits_audit_before_raising(capturing_audit): + """The audit trail must be emitted even when the call raises.""" + set_enforcement_mode(EnforcementMode.ENFORCE) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("blocked")) + ) + + with pytest.raises(GovernanceBlockException): + evaluator.evaluate(_ctx("contains blocked")) + + rule_events = [ + e for e in capturing_audit.events if e.event_type == EventType.RULE_EVALUATION + ] + summary_events = [ + e for e in capturing_audit.events if e.event_type == EventType.HOOK_END + ] + assert len(rule_events) == 1 + assert summary_events[0].data["final_action"] == "deny" + assert summary_events[0].data["enforcement_mode"] == "enforce" + + +def test_enforce_mode_returns_record_when_no_rule_matches(capturing_audit): + """No DENY hit → no raise; the AuditRecord is returned normally.""" + set_enforcement_mode(EnforcementMode.ENFORCE) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("blocked")) + ) + + audit = evaluator.evaluate(_ctx("benign query")) + + assert audit.final_action == Action.ALLOW + assert audit.evaluations[0].matched is False + + +# --------------------------------------------------------------------------- +# Sink-failure isolation +# --------------------------------------------------------------------------- + + +def test_sink_failure_does_not_propagate_or_block_evaluation(capturing_audit): + """A broken sink must not make evaluate() raise or lose its return value. + + The contract: AuditManager wraps each sink's emit() in try/except with + a per-sink failure counter (circuit-breaker), so an exception inside a + sink never propagates back to the evaluator. + """ + + class _BrokenSink(AuditSink): + @property + def name(self) -> str: + return "broken" + + def emit(self, event: AuditEvent) -> None: + raise RuntimeError("sink broke") + + manager = get_audit_manager() + manager.register_sink(_BrokenSink()) + + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("secret")) + ) + + # Must complete without raising even with a broken sink registered. + audit = evaluator.evaluate(_ctx("contains secret")) + + assert audit.final_action == Action.AUDIT + # The non-broken capturing sink still got its events. + assert any( + e.event_type == EventType.RULE_EVALUATION for e in capturing_audit.events + ) + + +def test_unavailable_audit_manager_is_swallowed(): + """If get_audit_manager() itself raises, _emit_audit must swallow it.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_deny_rule_on_input_contains("secret")) + ) + + with patch( + "uipath.runtime.governance.native.evaluator.get_audit_manager", + side_effect=RuntimeError("manager unavailable"), + ): + # Must complete, return record, and not raise. + audit = evaluator.evaluate(_ctx("contains secret")) + + assert audit.final_action == Action.AUDIT + assert audit.evaluations[0].matched is True + + +# --------------------------------------------------------------------------- +# Protocol conformance smoke test +# --------------------------------------------------------------------------- + + +def test_governance_evaluator_satisfies_evaluator_protocol(): + """GovernanceEvaluator must be usable wherever EvaluatorProtocol is expected. + + Mirrors the pattern from test_detached_bridge_satisfies_debug_protocol — + an explicit assignment to the protocol-typed variable documents the + structural contract. + """ + from uipath.core.adapters import EvaluatorProtocol + + evaluator: EvaluatorProtocol = GovernanceEvaluator(PolicyIndex()) + assert isinstance(evaluator, EvaluatorProtocol) + + +def test_evaluator_protocol_methods_resolvable_on_concrete(): + """Every method the protocol declares must be callable on the concrete impl.""" + from uipath.core.adapters import EvaluatorProtocol + + evaluator: Any = GovernanceEvaluator(PolicyIndex()) + for method_name in ( + "evaluate_before_agent", + "evaluate_after_agent", + "evaluate_before_model", + "evaluate_after_model", + "evaluate_tool_call", + "evaluate_after_tool", + ): + assert callable(getattr(evaluator, method_name)) + # The variable annotation also asserts type compatibility at runtime + # because EvaluatorProtocol is @runtime_checkable. + assert isinstance(evaluator, EvaluatorProtocol) diff --git a/tests/test_evaluator_operators.py b/tests/test_evaluator_operators.py new file mode 100644 index 0000000..862cdfa --- /dev/null +++ b/tests/test_evaluator_operators.py @@ -0,0 +1,680 @@ +"""Tests for ``GovernanceEvaluator`` operators and field resolution. + +Covers each operator implemented in :meth:`_apply_operator` plus the +``_check_*`` helper functions (vader, encoding, entropy, incident, +commitment) and the ``evaluate_*`` dispatchers. +""" + +from __future__ import annotations + +import pytest +from uipath.core.governance.models import Action, LifecycleHook + +from uipath.runtime.governance.config import ( + EnforcementMode, + reset_enforcement_mode, + set_enforcement_mode, +) +from uipath.runtime.governance.native.evaluator import ( + _INCIDENT_PATTERNS, + GovernanceEvaluator, +) +from uipath.runtime.governance.native.models import ( + Check, + CheckContext, + Condition, + PolicyIndex, + PolicyPack, + Rule, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _evaluator() -> GovernanceEvaluator: + """Build a GovernanceEvaluator with an empty PolicyIndex (operators only).""" + return GovernanceEvaluator(policy_index=PolicyIndex()) + + +def _ctx(**fields) -> CheckContext: + """Construct a CheckContext with sensible defaults plus overrides.""" + defaults = dict( + hook=LifecycleHook.AFTER_MODEL, + agent_name="agent", + runtime_id="rt-1", + trace_id="tr-1", + ) + defaults.update(fields) + return CheckContext(**defaults) + + +def _rule_with_condition(operator: str, field: str, value, *, negate: bool = False) -> Rule: + return Rule( + rule_id="r1", + name="r1", + clause="", + hook=LifecycleHook.AFTER_MODEL, + action=Action.AUDIT, + checks=[ + Check( + conditions=[ + Condition(operator=operator, field=field, value=value, negate=negate) + ], + ) + ], + ) + + +@pytest.fixture(autouse=True) +def _isolate_mode() -> None: + reset_enforcement_mode() + set_enforcement_mode(EnforcementMode.AUDIT) + yield + reset_enforcement_mode() + + +# --------------------------------------------------------------------------- +# Field resolution — _get_field_value +# --------------------------------------------------------------------------- + + +def test_get_field_value_top_level_attr() -> None: + ev = _evaluator() + ctx = _ctx(model_output="hello") + assert ev._get_field_value("model_output", ctx) == "hello" + + +def test_get_field_value_dotted_path_into_dict() -> None: + ev = _evaluator() + ctx = _ctx(session_state={"tool_calls": 7}) + assert ev._get_field_value("session_state.tool_calls", ctx) == 7 + + +def test_get_field_value_missing_segment_returns_none() -> None: + ev = _evaluator() + ctx = _ctx() + assert ev._get_field_value("nonexistent", ctx) is None + assert ev._get_field_value("session_state.absent", ctx) is None + + +# --------------------------------------------------------------------------- +# Existence / guardrail_fallback (special-cased before the None check) +# --------------------------------------------------------------------------- + + +def test_exists_true_when_value_present() -> None: + ev = _evaluator() + ctx = _ctx(model_output="x") + assert ev._apply_operator("exists", ev._get_field_value("model_output", ctx), None) is True + + +def test_exists_false_when_missing() -> None: + ev = _evaluator() + assert ev._apply_operator("exists", None, None) is False + + +def test_not_exists_inverse() -> None: + ev = _evaluator() + assert ev._apply_operator("not_exists", None, None) is True + assert ev._apply_operator("not_exists", "x", None) is False + + +def test_guardrail_fallback_mapped_and_disabled_fires() -> None: + ev = _evaluator() + result = ev._apply_operator( + "guardrail_fallback", + None, + {"mapped_to_uipath": True, "policy_enabled": False, "validator": "pii"}, + ) + assert result is True + + +@pytest.mark.parametrize( + "cfg", + [ + {"mapped_to_uipath": False, "policy_enabled": False}, + {"mapped_to_uipath": True, "policy_enabled": True}, + {"mapped_to_uipath": False, "policy_enabled": True}, + ], +) +def test_guardrail_fallback_silent_when_not_mapped_or_enabled(cfg: dict) -> None: + ev = _evaluator() + assert ev._apply_operator("guardrail_fallback", None, cfg) is False + + +def test_guardrail_fallback_non_dict_value_silent() -> None: + ev = _evaluator() + assert ev._apply_operator("guardrail_fallback", None, "string") is False + + +# --------------------------------------------------------------------------- +# None-field short-circuit (everything except exists / guardrail_fallback) +# --------------------------------------------------------------------------- + + +def test_other_operators_short_circuit_when_field_is_none() -> None: + ev = _evaluator() + for op in ("contains", "regex", "in_list", "gt"): + assert ev._apply_operator(op, None, "anything") is False, op + + +# --------------------------------------------------------------------------- +# Numeric operators +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "op,lhs,rhs,expected", + [ + ("gt", 5, 3, True), + ("gt", 3, 5, False), + ("gt", 3, 3, False), + ("gte", 3, 3, True), + ("gte", 2, 3, False), + ("lt", 1, 3, True), + ("lt", 3, 3, False), + ("lte", 3, 3, True), + ("lte", 4, 3, False), + ], +) +def test_numeric_operators(op: str, lhs: float, rhs: float, expected: bool) -> None: + assert _evaluator()._apply_operator(op, lhs, rhs) is expected + + +def test_numeric_operators_handle_string_coercion() -> None: + ev = _evaluator() + assert ev._apply_operator("gt", "5", "3") is True + + +def test_numeric_operators_return_false_on_uncoercible() -> None: + ev = _evaluator() + assert ev._apply_operator("gt", "not-a-number", 3) is False + assert ev._apply_operator("gt", 3, "not-a-number") is False + + +# --------------------------------------------------------------------------- +# String operators +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "op,lhs,rhs,expected", + [ + ("equals", "abc", "abc", True), + ("equals", "abc", "ABC", False), # equals is case-sensitive + ("eq", "x", "x", True), + ("not_equals", "abc", "xyz", True), + ("ne", "x", "x", False), + ("contains", "Hello World", "world", True), # case-insensitive + ("contains", "Hello", "xyz", False), + ("not_contains", "Hello", "xyz", True), + ("not_contains", "Hello", "hello", False), + ], +) +def test_string_operators(op: str, lhs: str, rhs: str, expected: bool) -> None: + assert _evaluator()._apply_operator(op, lhs, rhs) is expected + + +def test_regex_matches_pattern() -> None: + ev = _evaluator() + assert ev._apply_operator("regex", "Cost: $1,200", r"\$\d+") is True + + +def test_regex_matches_alias() -> None: + """``matches`` is documented as a synonym for ``regex``.""" + ev = _evaluator() + assert ev._apply_operator("matches", "abc-123", r"\d+") is True + + +def test_regex_invalid_pattern_returns_false() -> None: + """Malformed regex is logged and silently returns False.""" + ev = _evaluator() + assert ev._apply_operator("regex", "anything", "(unclosed") is False + + +# --------------------------------------------------------------------------- +# List operators +# --------------------------------------------------------------------------- + + +def test_in_list_membership() -> None: + ev = _evaluator() + assert ev._apply_operator("in_list", "delete_file", ["shell", "delete_file"]) is True + assert ev._apply_operator("in_list", "ls", ["shell", "delete_file"]) is False + + +def test_in_list_non_list_value_returns_false() -> None: + ev = _evaluator() + assert ev._apply_operator("in_list", "x", "not a list") is False + + +def test_not_in_list_inverse() -> None: + ev = _evaluator() + assert ev._apply_operator("not_in_list", "ls", ["shell"]) is True + assert ev._apply_operator("not_in_list", "shell", ["shell"]) is False + + +def test_not_in_list_non_list_value_returns_true() -> None: + """``not_in_list`` against a non-list value safely returns True + (nothing is in a non-list).""" + ev = _evaluator() + assert ev._apply_operator("not_in_list", "x", "not a list") is True + + +# --------------------------------------------------------------------------- +# Unknown operator +# --------------------------------------------------------------------------- + + +def test_unknown_operator_returns_false() -> None: + """Unknown operator strings log a debug message and return False.""" + ev = _evaluator() + assert ev._apply_operator("never_heard_of_this", "x", "y") is False + + +# --------------------------------------------------------------------------- +# Negate flag — flips the result +# --------------------------------------------------------------------------- + + +def test_condition_negate_flips_result() -> None: + ev = _evaluator() + ctx = _ctx(model_output="hello") + # contains "hello" → matches; negate inverts to False. + cond = Condition( + operator="contains", field="model_output", value="hello", negate=True, + ) + assert ev._evaluate_condition(cond, ctx) is False + cond2 = Condition( + operator="contains", field="model_output", value="world", negate=True, + ) + assert ev._evaluate_condition(cond2, ctx) is True + + +# --------------------------------------------------------------------------- +# Check-level logic: "all" (AND) vs "any" (OR), and empty-conditions +# --------------------------------------------------------------------------- + + +def test_empty_check_conditions_always_match() -> None: + """A check with no conditions trivially matches — surfaces rule shape bugs.""" + ev = _evaluator() + check = Check(conditions=[], logic="all") + matched, _ = ev._evaluate_check(check, _ctx()) + assert matched is True + + +def test_check_logic_all_requires_every_condition() -> None: + ev = _evaluator() + check = Check( + conditions=[ + Condition(operator="contains", field="model_output", value="a"), + Condition(operator="contains", field="model_output", value="missing"), + ], + logic="all", + ) + matched, _ = ev._evaluate_check(check, _ctx(model_output="a only")) + assert matched is False + + +def test_check_logic_any_requires_one_condition() -> None: + ev = _evaluator() + check = Check( + conditions=[ + Condition(operator="contains", field="model_output", value="present"), + Condition(operator="contains", field="model_output", value="absent"), + ], + logic="any", + ) + matched, detail = ev._evaluate_check(check, _ctx(model_output="present text")) + assert matched is True + # detail is the check's message on match; empty by default in our builder. + assert detail == "" + + +# --------------------------------------------------------------------------- +# VADER sentiment +# --------------------------------------------------------------------------- + + +def test_vader_concern_negative_text_fires() -> None: + """A clearly-negative sentence trips the default threshold of -0.3.""" + assert ( + GovernanceEvaluator._check_vader_concern( + "I absolutely hate this terrible, awful product.", {"threshold": -0.3} + ) + is True + ) + + +def test_vader_concern_positive_text_does_not_fire() -> None: + assert ( + GovernanceEvaluator._check_vader_concern( + "This is wonderful and I love it!", {"threshold": -0.3} + ) + is False + ) + + +def test_vader_concern_empty_text_silent() -> None: + assert GovernanceEvaluator._check_vader_concern("", {}) is False + assert GovernanceEvaluator._check_vader_concern(" ", {}) is False + + +def test_vader_concern_threshold_as_scalar() -> None: + """``params`` may be a bare number; the operator coerces.""" + assert ( + GovernanceEvaluator._check_vader_concern("I hate everything", -0.3) is True + ) + + +def test_vader_concern_invalid_threshold_falls_back() -> None: + """Non-numeric scalar params fall back to the documented default.""" + # "garbage" -> default -0.3 → should still classify clear negative + assert ( + GovernanceEvaluator._check_vader_concern( + "I hate this awful, terrible thing", "garbage" + ) + is True + ) + + +# --------------------------------------------------------------------------- +# Encoding integrity +# --------------------------------------------------------------------------- + + +def test_encoding_concern_clean_text_silent() -> None: + assert ( + GovernanceEvaluator._check_encoding_concern( + "Just a normal English sentence with no corruption.", {} + ) + is False + ) + + +def test_encoding_concern_empty_silent() -> None: + assert GovernanceEvaluator._check_encoding_concern("", {}) is False + + +def test_encoding_concern_replacement_chars_fire() -> None: + """U+FFFD replacement chars are a strong corruption signal.""" + text = "Hello � � world" + assert ( + GovernanceEvaluator._check_encoding_concern( + text, {"min_corruption_events": 2} + ) + is True + ) + + +def test_encoding_concern_mojibake_bigrams_fire() -> None: + """Latin-1-as-UTF-8 mojibake patterns are a known corruption shape.""" + text = "é é hello é" + assert ( + GovernanceEvaluator._check_encoding_concern( + text, {"min_corruption_events": 2} + ) + is True + ) + + +def test_encoding_concern_hex_escape_literals_fire() -> None: + """Literal ``\\xHH`` sequences mean raw bytes leaked into a string.""" + text = r"Hello \x80 \x81 \x82 world" + assert ( + GovernanceEvaluator._check_encoding_concern( + text, {"min_corruption_events": 2} + ) + is True + ) + + +# --------------------------------------------------------------------------- +# Entropy (stdlib only — deterministic) +# --------------------------------------------------------------------------- + + +def test_entropy_concern_normal_english_does_not_fire() -> None: + """English prose entropy lands ~3.5–4.5 bits/byte — inside default range.""" + text = "The quick brown fox jumps over the lazy dog." * 5 + assert ( + GovernanceEvaluator._check_entropy_concern(text, {"min": 1.5, "max": 7.5}) + is False + ) + + +def test_entropy_concern_low_entropy_fires() -> None: + """Highly repetitive text approaches 0 bits/byte.""" + text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + assert ( + GovernanceEvaluator._check_entropy_concern(text, {"min": 1.5, "max": 7.5}) + is True + ) + + +def test_entropy_concern_high_entropy_fires() -> None: + """Random-ish bytes approach 8 bits/byte.""" + # Build text with many distinct chars to push entropy high. + text = "".join(chr(c) for c in range(32, 127)) * 5 + assert ( + GovernanceEvaluator._check_entropy_concern(text, {"min": 1.5, "max": 6.0}) + is True + ) + + +def test_entropy_concern_empty_silent() -> None: + assert GovernanceEvaluator._check_entropy_concern("", {}) is False + + +def test_entropy_concern_non_dict_params_uses_defaults() -> None: + """Non-dict params don't crash; defaults apply.""" + # Normal English prose still won't trip the default min=1.5, max=7.5 range. + text = "The quick brown fox jumps over the lazy dog." + assert ( + GovernanceEvaluator._check_entropy_concern(text, "garbage") is False + ) + + +# --------------------------------------------------------------------------- +# Incident taxonomy (regex-based, deterministic) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "text,expected_category", + [ + ("I cannot help with that.", "safety_refusal"), + ("I'm sorry, but I cannot answer.", "safety_refusal"), + ("500 internal server error", "tool_failure"), + ("Connection refused", "tool_failure"), + ("timed out", "tool_failure"), + ("401 unauthorized", "auth_failure"), + ("authentication failed", "auth_failure"), + ("429", "quota_exceeded"), + ("rate limit exceeded", "quota_exceeded"), + ("I made that up", "hallucination"), + ("I don't actually know", "hallucination"), + ], +) +def test_incident_concern_categorical_matches(text: str, expected_category: str) -> None: + """Each category in ``_INCIDENT_PATTERNS`` has at least one matching exemplar.""" + assert expected_category in _INCIDENT_PATTERNS + assert GovernanceEvaluator._check_incident_concern(text, {}) is True + + +def test_incident_concern_unmatched_silent() -> None: + assert ( + GovernanceEvaluator._check_incident_concern( + "All systems operating normally.", {} + ) + is False + ) + + +def test_incident_concern_empty_silent() -> None: + assert GovernanceEvaluator._check_incident_concern("", {}) is False + + +def test_incident_concern_category_filter() -> None: + """Limit scanning to a subset of categories via ``categories`` param.""" + # "401 unauthorized" hits auth_failure; with only quota_exceeded enabled, + # the scanner should miss it. + assert ( + GovernanceEvaluator._check_incident_concern( + "401 unauthorized", {"categories": ["quota_exceeded"]} + ) + is False + ) + # With auth_failure enabled, it fires. + assert ( + GovernanceEvaluator._check_incident_concern( + "401 unauthorized", {"categories": ["auth_failure"]} + ) + is True + ) + + +def test_incident_concern_unknown_category_silently_dropped() -> None: + """Categories the system doesn't know about are silently ignored.""" + # Only the unknown category is requested — falls back to no categories, + # so even matching text doesn't fire. + result = GovernanceEvaluator._check_incident_concern( + "401 unauthorized", {"categories": ["unknown_cat_xyz"]} + ) + assert result is False + + +# --------------------------------------------------------------------------- +# evaluate_* dispatchers — verify they build the right CheckContext +# --------------------------------------------------------------------------- + + +def _record_context_evaluator() -> tuple[GovernanceEvaluator, dict]: + """Patch evaluate() to capture the context it receives instead of running rules.""" + captured: dict = {} + ev = _evaluator() + + def _fake_evaluate(ctx): # type: ignore[no-untyped-def] + captured["ctx"] = ctx + from datetime import datetime, timezone + + from uipath.core.governance.models import AuditRecord + + return AuditRecord( + timestamp=datetime.now(timezone.utc), + agent_name=ctx.agent_name, + runtime_id=ctx.runtime_id, + trace_id=ctx.trace_id, + hook=ctx.hook, + evaluations=[], + final_action=Action.ALLOW, + ) + + ev.evaluate = _fake_evaluate # type: ignore[assignment] + return ev, captured + + +def test_evaluate_before_agent_builds_context() -> None: + ev, captured = _record_context_evaluator() + ev.evaluate_before_agent( + agent_input="user-text", + agent_name="a", + runtime_id="r", + trace_id="t", + model_name="gpt-5", + ) + ctx = captured["ctx"] + assert ctx.hook == LifecycleHook.BEFORE_AGENT + assert ctx.agent_input == "user-text" + assert ctx.model_name == "gpt-5" + + +def test_evaluate_after_agent_builds_context() -> None: + ev, captured = _record_context_evaluator() + ev.evaluate_after_agent( + agent_output="reply", agent_name="a", runtime_id="r", trace_id="t", + ) + ctx = captured["ctx"] + assert ctx.hook == LifecycleHook.AFTER_AGENT + assert ctx.agent_output == "reply" + + +def test_evaluate_before_model_carries_messages() -> None: + ev, captured = _record_context_evaluator() + ev.evaluate_before_model( + model_input="prompt", + agent_name="a", + runtime_id="r", + trace_id="t", + messages=[{"role": "user", "content": "hi"}], + model_name="gpt-5", + ) + ctx = captured["ctx"] + assert ctx.hook == LifecycleHook.BEFORE_MODEL + assert ctx.model_input == "prompt" + assert ctx.messages == [{"role": "user", "content": "hi"}] + + +def test_evaluate_after_model_builds_context() -> None: + ev, captured = _record_context_evaluator() + ev.evaluate_after_model( + model_output="resp", agent_name="a", runtime_id="r", trace_id="t", + ) + ctx = captured["ctx"] + assert ctx.hook == LifecycleHook.AFTER_MODEL + assert ctx.model_output == "resp" + + +def test_evaluate_tool_call_carries_args() -> None: + ev, captured = _record_context_evaluator() + ev.evaluate_tool_call( + tool_name="search", + tool_args={"q": "x"}, + agent_name="a", + runtime_id="r", + trace_id="t", + session_state={"tool_calls": 1}, + ) + ctx = captured["ctx"] + assert ctx.hook == LifecycleHook.TOOL_CALL + assert ctx.tool_name == "search" + assert ctx.tool_args == {"q": "x"} + assert ctx.session_state == {"tool_calls": 1} + + +def test_evaluate_after_tool_carries_result() -> None: + ev, captured = _record_context_evaluator() + ev.evaluate_after_tool( + tool_name="search", + tool_result="some-data", + agent_name="a", + runtime_id="r", + trace_id="t", + ) + ctx = captured["ctx"] + assert ctx.hook == LifecycleHook.AFTER_TOOL + assert ctx.tool_name == "search" + assert ctx.tool_result == "some-data" + + +# --------------------------------------------------------------------------- +# DISABLED mode — evaluate() short-circuits without emitting audit +# --------------------------------------------------------------------------- + + +def test_disabled_mode_returns_empty_audit_record() -> None: + """DISABLED mode short-circuits the rule loop and audit emission.""" + set_enforcement_mode(EnforcementMode.DISABLED) + + rule = _rule_with_condition("contains", "model_output", "anything") + pack = PolicyPack(name="p", version="1", description="", rules=[rule]) + idx = PolicyIndex() + idx.add_pack(pack) + ev = GovernanceEvaluator(policy_index=idx) + + audit = ev.evaluate(_ctx(model_output="contains anything")) + assert audit.final_action == Action.ALLOW + assert audit.evaluations == [] diff --git a/tests/test_text_extraction.py b/tests/test_text_extraction.py new file mode 100644 index 0000000..50a15df --- /dev/null +++ b/tests/test_text_extraction.py @@ -0,0 +1,301 @@ +"""Tests for ``_extract_governable_text`` content extraction. + +Replaces the old ``str(value)[:2000]`` path in ``_check_before_agent`` +and ``_check_after_agent``. Pulls clean text out of structured shapes +(dicts, list-of-blocks, pydantic models) instead of letting dict-repr +noise leak into the regex-scanned blob. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from uipath.runtime.governance.wrapper import ( + _GOVERNANCE_TEXT_CAP, + _extract_governable_text, +) + + +def test_plain_string_passes_through() -> None: + assert _extract_governable_text("hello world") == "hello world" + + +def test_none_returns_empty() -> None: + assert _extract_governable_text(None) == "" + + +def test_dict_with_content_key_extracts_content_first() -> None: + """The classic coded-agent output shape — content comes through clean.""" + out = _extract_governable_text( + {"content": "Estimated cost: $780", "_meta": {"id": "abc"}} + ) + assert out.startswith("Estimated cost: $780") + # No dict-syntax noise — the prior str(...) path produced ``{'content': '...'}``. + assert "{'content'" not in out + assert "'_meta'" not in out + + +def test_dict_priority_keys_lead() -> None: + """``content`` / ``text`` / etc. lead before remaining keys.""" + out = _extract_governable_text( + {"trailing_meta": "noise-meta", "content": "primary-text"} + ) + assert out.index("primary-text") < out.index("noise-meta") + + +def test_list_of_text_blocks_concatenates() -> None: + """Anthropic-style content blocks.""" + out = _extract_governable_text( + [ + {"type": "text", "text": "first part"}, + {"type": "image", "source": {"data": "..."}}, + {"type": "text", "text": "second part"}, + ] + ) + assert "first part" in out + assert "second part" in out + + +def test_openai_function_call_shape_extracts_arguments() -> None: + """``arguments`` field on OpenAI-style function-call blocks.""" + out = _extract_governable_text( + [ + { + "type": "function_call", + "name": "end_execution", + "arguments": '{"content":"Cost: $1,200"}', + "id": "fc_abc", + } + ] + ) + assert "Cost: $1,200" in out + + +def test_numeric_scalars_are_skipped() -> None: + """Numbers / booleans aren't governance text — they shouldn't pad the blob.""" + out = _extract_governable_text( + {"content": "hello", "count": 42, "ok": True, "rate": 3.14} + ) + assert out == "hello" + + +def test_pydantic_like_model_dump_is_walked() -> None: + """Anything with ``model_dump()`` is walked as its dict form.""" + + class Stub: + def model_dump(self) -> dict: + return {"content": "from pydantic"} + + assert _extract_governable_text(Stub()) == "from pydantic" + + +def test_dataclass_via_dict_method() -> None: + """Objects exposing a ``dict()`` callable also walk via that path.""" + + class Stub: + def dict(self) -> dict: + return {"content": "from dict"} + + assert _extract_governable_text(Stub()) == "from dict" + + +def test_plain_object_attribute_fallback() -> None: + """Public attributes on opaque objects feed the walker.""" + + @dataclass + class Result: + content: str + _private: str = "ignored" + + out = _extract_governable_text(Result(content="visible")) + assert "visible" in out + assert "ignored" not in out + + +def test_cycle_in_structure_does_not_recurse_forever() -> None: + a: dict = {"content": "outer"} + b: dict = {"loop": a} + a["loop"] = b + # Should return without recursing infinitely. + out = _extract_governable_text(a) + assert "outer" in out + + +def test_text_is_capped_at_budget() -> None: + """Long content is truncated so a runaway payload can't dominate scans.""" + big = "x" * (_GOVERNANCE_TEXT_CAP + 1000) + out = _extract_governable_text(big) + assert len(out) == _GOVERNANCE_TEXT_CAP + + +def test_nested_dict_content_extracted() -> None: + """LangGraph-style state with messages nested under a key.""" + out = _extract_governable_text( + { + "messages": [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "Cost: $50"}, + ] + } + ) + assert "Cost: $50" in out + + +def test_unknown_block_type_with_no_text_returns_empty() -> None: + """Image-only block with no text payload contributes nothing.""" + out = _extract_governable_text( + [{"type": "image", "source": {"type": "base64", "data": "..."}}] + ) + # Could be empty or contain just the base64 data — but should NOT + # contain Python dict syntax characters that the old path emitted. + assert "{'type'" not in out + + +# --------------------------------------------------------------------------- +# Budget — 64K is the current cap (raised from 8K to fit multi-turn chat). +# --------------------------------------------------------------------------- + + +def test_budget_cap_is_64k() -> None: + """Documents the cap so a future drop won't go unnoticed.""" + assert _GOVERNANCE_TEXT_CAP == 64000 + + +# --------------------------------------------------------------------------- +# Reverse list iteration — latest entry gets the budget first. +# --------------------------------------------------------------------------- + + +def test_lists_are_walked_in_reverse() -> None: + """Latest list entry leads the extracted blob. + + Critical for chat history: the new user message lives at the end of + the messages list and must be visible even when prior turns would + otherwise fill the budget first. + """ + out = _extract_governable_text( + [{"text": "earliest"}, {"text": "middle"}, {"text": "latest"}] + ) + assert out.index("latest") < out.index("middle") < out.index("earliest") + + +def test_long_chat_history_keeps_latest_user_message() -> None: + """A long history must not push the latest message out of the budget. + + Regression for the prior 8K-cap + forward-walk combination, which + silently dropped the latest user message once the conversation + grew past ~7,800 chars of prior content. + """ + bulky_prior = "x" * 2000 + messages = [{"role": "user", "content": bulky_prior}] * 40 # ~80K chars + messages.append({"role": "user", "content": "Cost: $1,200 — latest"}) + + out = _extract_governable_text({"messages": messages}) + assert "Cost: $1,200 — latest" in out + + +# --------------------------------------------------------------------------- +# latest_only — BEFORE_AGENT in a conversational agent +# --------------------------------------------------------------------------- + + +def test_latest_only_extracts_just_the_last_list_item() -> None: + """``latest_only=True`` drops every list entry but the last one.""" + out = _extract_governable_text( + { + "messages": [ + {"role": "user", "content": "old message"}, + {"role": "assistant", "content": "old response"}, + {"role": "user", "content": "Cost: $1,200"}, + ] + }, + latest_only=True, + ) + assert "Cost: $1,200" in out + assert "old message" not in out + assert "old response" not in out + + +def test_latest_only_resets_inside_chosen_item() -> None: + """Multi-block content inside the latest message is still walked fully. + + ``latest_only`` reduces the OUTER list (chat history) to its last + entry, but multi-block content (text + tool_call + thinking) + inside that latest message must still be extracted in full — + otherwise we'd lose answer text that arrives in a non-final block. + """ + out = _extract_governable_text( + { + "messages": [ + {"role": "user", "content": "old"}, + { + "role": "assistant", + "content": [ + {"type": "text", "text": "part A"}, + { + "type": "function_call", + "arguments": '{"answer":"part B"}', + }, + ], + }, + ] + }, + latest_only=True, + ) + assert "part A" in out + assert "part B" in out + assert "old" not in out + + +def test_latest_only_top_level_list() -> None: + """``latest_only`` applies when the input itself is a list.""" + out = _extract_governable_text( + [ + {"content": "history item 1"}, + {"content": "history item 2"}, + {"content": "latest input"}, + ], + latest_only=True, + ) + assert "latest input" in out + assert "history item 1" not in out + assert "history item 2" not in out + + +def test_latest_only_default_false_still_walks_all() -> None: + """Default behavior unchanged — AFTER_AGENT etc. still see everything.""" + out = _extract_governable_text( + { + "messages": [ + {"role": "user", "content": "first"}, + {"role": "user", "content": "second"}, + ] + } + ) + assert "first" in out + assert "second" in out + + +def test_latest_only_empty_list_is_empty() -> None: + """Empty history → empty extraction.""" + assert _extract_governable_text({"messages": []}, latest_only=True) == "" + + +def test_messages_is_a_priority_content_key() -> None: + """``messages`` (plural) leads ahead of non-priority keys. + + Without ``messages`` in the priority list, an input that also + carries siblings like ``thread_id`` / ``metadata`` could siphon + budget before the actual chat history is walked. + """ + out = _extract_governable_text( + { + "thread_id": "abc-xyz", + "metadata": {"foo": "bar"}, + "messages": [{"role": "user", "content": "primary content"}], + } + ) + assert "primary content" in out + assert out.index("primary content") < ( + out.find("abc-xyz") if "abc-xyz" in out else len(out) + )