diff --git a/evaluators/builtin/pyproject.toml b/evaluators/builtin/pyproject.toml index 75d76c77..167888cb 100644 --- a/evaluators/builtin/pyproject.toml +++ b/evaluators/builtin/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ galileo = ["agent-control-evaluator-galileo>=7.5.0"] budget = ["agent-control-evaluator-budget>=7.5.0"] cisco = ["agent-control-evaluator-cisco>=7.5.0"] +detect_secrets = ["agent-control-evaluator-detect_secrets>=7.5.0"] dev = ["pytest>=8.0.0", "pytest-asyncio>=0.23.0"] [project.entry-points."agent_control.evaluators"] @@ -40,3 +41,4 @@ agent-control-models = { workspace = true } agent-control-evaluator-galileo = { path = "../contrib/galileo", editable = true } agent-control-evaluator-budget = { path = "../contrib/budget", editable = true } agent-control-evaluator-cisco = { path = "../contrib/cisco", editable = true } +agent-control-evaluator-detect_secrets = { path = "../contrib/detect_secrets", editable = true } diff --git a/evaluators/builtin/tests/test_contrib_packages.py b/evaluators/builtin/tests/test_contrib_packages.py index 9f25186d..0ff5f3fc 100644 --- a/evaluators/builtin/tests/test_contrib_packages.py +++ b/evaluators/builtin/tests/test_contrib_packages.py @@ -39,6 +39,7 @@ def test_discover_contrib_packages_returns_expected_metadata() -> None: assert [(package.name, package.package, package.extra) for package in packages] == [ ("budget", "agent-control-evaluator-budget", "budget"), ("cisco", "agent-control-evaluator-cisco", "cisco"), + ("detect_secrets", "agent-control-evaluator-detect_secrets", "detect_secrets"), ("galileo", "agent-control-evaluator-galileo", "galileo"), ] diff --git a/evaluators/contrib/README.md b/evaluators/contrib/README.md index 38338fad..d71769e8 100644 --- a/evaluators/contrib/README.md +++ b/evaluators/contrib/README.md @@ -3,6 +3,7 @@ Contributed evaluators and templates for extending Agent Control. - `galileo/` — Luna-2 evaluator integration +- `detect_secrets/` — detect-secrets runtime scanner integration - `template/` — Starter template for adding new evaluators Full guide: https://docs.agentcontrol.dev/concepts/evaluators/custom-evaluators diff --git a/evaluators/contrib/detect_secrets/Makefile b/evaluators/contrib/detect_secrets/Makefile new file mode 100644 index 00000000..a68934d4 --- /dev/null +++ b/evaluators/contrib/detect_secrets/Makefile @@ -0,0 +1,19 @@ +.PHONY: sync test lint typecheck check build + +sync: + uv sync --group dev + +test: + uv run --group dev pytest --cov=src --cov-report=xml:../../../coverage-evaluators-detect-secrets.xml -q + +lint: + uv run --group dev ruff check . + uv run --group dev ruff format --check . + +typecheck: + uv run --group dev mypy . + +check: sync lint typecheck test build + +build: + uv build diff --git a/evaluators/contrib/detect_secrets/README.md b/evaluators/contrib/detect_secrets/README.md new file mode 100644 index 00000000..1611cd4f --- /dev/null +++ b/evaluators/contrib/detect_secrets/README.md @@ -0,0 +1,91 @@ +# Agent Control Evaluator - detect-secrets + +External evaluator that scans selector-selected payloads for likely secrets using +[`detect-secrets-async`](https://pypi.org/project/detect-secrets-async/), which wraps Yelp +`detect-secrets` in a bounded subprocess runtime. + +- Entry point name: `yelp.detect_secrets` +- Transport/runtime: `detect-secrets-async` + +## Installation + +Install the evaluator package: + +```bash +pip install agent-control-evaluator-detect_secrets +``` + +For local development from this repo: + +```bash +uv pip install -e evaluators/contrib/detect_secrets +``` + +## Configuration + +Evaluator config fields: + +- `timeout_ms: int = 10000` +- `on_error: "allow" | "deny" = "allow"` +- `max_bytes: int = 1048576` +- `enabled_plugins: list[str] | None = None` +- `exclude_lines_regex: list[str] = []` + +Notes: + +- `enabled_plugins` takes upstream `detect-secrets` plugin class names such as + `GitHubTokenDetector`. +- If `enabled_plugins` is omitted, the evaluator uses the pinned upstream default plugin set from + `detect-secrets-async`. +- `exclude_lines_regex` uses RE2 syntax and blanks matching lines before scan submission so line + numbering stays stable for plain string payloads. + +## Behavior + +- selector-selected `str` payloads are scanned directly +- selector-selected `dict` / `list` payloads are normalized to deterministic pretty JSON before + scanning +- scalar numbers / booleans are normalized to JSON scalar text +- `None` produces `matched=False` + +Safe metadata: + +- `findings_count` +- `findings[]` with `type`, plus: + - `line_number` for plain selected strings + - `json_pointer` for normalized `dict` / `list` payloads when a finding maps back to a structural + location; pointers are conservatively truncated to the nearest safe ancestor when a key segment + looks secret-like +- `normalized_payload_type` +- `detect_secrets_version` +- `failure_mode` on evaluator failures +- `fallback_action` on fail-closed paths + +Plaintext secrets, snippets, matching lines, and upstream `hashed_secret` are never surfaced. + +## Usage + +Once installed, the evaluator is auto-discovered: + +```python +from agent_control_evaluators import discover_evaluators, get_evaluator + +discover_evaluators() +DetectSecretsEvaluator = get_evaluator("yelp.detect_secrets") +``` + +Example control fragment: + +```json +{ + "selector": { "path": "output" }, + "evaluator": { + "name": "yelp.detect_secrets", + "config": { + "timeout_ms": 10000, + "on_error": "allow", + "enabled_plugins": ["GitHubTokenDetector"] + } + } +} +``` diff --git a/evaluators/contrib/detect_secrets/pyproject.toml b/evaluators/contrib/detect_secrets/pyproject.toml new file mode 100644 index 00000000..29892b50 --- /dev/null +++ b/evaluators/contrib/detect_secrets/pyproject.toml @@ -0,0 +1,60 @@ +[project] +name = "agent-control-evaluator-detect_secrets" +version = "7.6.0" +description = "detect-secrets evaluator for agent-control" +readme = "README.md" +requires-python = ">=3.12" +license = { text = "Apache-2.0" } +authors = [{ name = "Agent Control Team" }] +dependencies = [ + "agent-control-evaluators>=7.5.0", + "agent-control-models>=7.5.0", + "detect-secrets-async>=0.2.0,<0.3.0", + "google-re2>=1.1", + "pydantic>=2.12.4", +] + +[project.entry-points."agent_control.evaluators"] +"yelp.detect_secrets" = "agent_control_evaluator_detect_secrets.detect_secrets:DetectSecretsEvaluator" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/agent_control_evaluator_detect_secrets"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "I", "UP"] + +[tool.mypy] +python_version = "3.12" +strict = true +files = ["src", "tests"] + +[[tool.mypy.overrides]] +module = "re2" +ignore_missing_imports = true + +[tool.uv.sources] +agent-control-evaluators = { path = "../../builtin", editable = true } +agent-control-models = { path = "../../../models", editable = true } + +[tool.uv] +default-groups = ["dev"] + +[dependency-groups] +dev = [ + "mypy>=1.8.0", + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "pytest-cov>=4.0.0", + "ruff>=0.1.0", +] diff --git a/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/__init__.py b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/__init__.py new file mode 100644 index 00000000..c4270d5a --- /dev/null +++ b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/__init__.py @@ -0,0 +1,18 @@ +"""Agent Control evaluator package for detect-secrets.""" + +from importlib.metadata import PackageNotFoundError, version + +try: + __version__ = version("agent-control-evaluator-detect_secrets") +except PackageNotFoundError: + __version__ = "0.0.0.dev" + +from agent_control_evaluator_detect_secrets.detect_secrets import ( + DetectSecretsEvaluator, + DetectSecretsEvaluatorConfig, +) + +__all__ = [ + "DetectSecretsEvaluator", + "DetectSecretsEvaluatorConfig", +] diff --git a/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/__init__.py b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/__init__.py new file mode 100644 index 00000000..dcc0f54f --- /dev/null +++ b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/__init__.py @@ -0,0 +1,13 @@ +"""detect-secrets evaluator exports.""" + +from agent_control_evaluator_detect_secrets.detect_secrets.config import ( + DetectSecretsEvaluatorConfig, +) +from agent_control_evaluator_detect_secrets.detect_secrets.evaluator import ( + DetectSecretsEvaluator, +) + +__all__ = [ + "DetectSecretsEvaluator", + "DetectSecretsEvaluatorConfig", +] diff --git a/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/config.py b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/config.py new file mode 100644 index 00000000..7b36dec1 --- /dev/null +++ b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/config.py @@ -0,0 +1,81 @@ +"""Configuration for the detect-secrets evaluator.""" + +from __future__ import annotations + +from typing import Literal + +import re2 +from agent_control_evaluators import EvaluatorConfig +from detect_secrets_async import get_runtime_info +from pydantic import Field, field_validator + +DEFAULT_TIMEOUT_MS = 10_000 +DEFAULT_MAX_BYTES = 1_048_576 + + +class DetectSecretsEvaluatorConfig(EvaluatorConfig): + """Typed configuration for the detect-secrets evaluator.""" + + timeout_ms: int = Field( + default=DEFAULT_TIMEOUT_MS, + gt=0, + description="End-to-end timeout in milliseconds for queue wait and scan execution.", + ) + on_error: Literal["allow", "deny"] = Field( + default="allow", + description="Whether evaluator failures should fail open or fail closed.", + ) + max_bytes: int = Field( + default=DEFAULT_MAX_BYTES, + gt=0, + description="Maximum UTF-8 payload size after normalization and line filtering.", + ) + enabled_plugins: list[str] | None = Field( + default=None, + description="Optional explicit upstream detect-secrets plugin class names.", + ) + exclude_lines_regex: list[str] = Field( + default_factory=list, + description="RE2 patterns for lines that should be blanked before scanning.", + ) + + @field_validator("enabled_plugins") + @classmethod + def validate_enabled_plugins(cls, value: list[str] | None) -> list[str] | None: + """Validate explicit upstream plugin names against detect-secrets-async introspection.""" + if value is None: + return None + + try: + available = set(get_runtime_info().available_plugin_names) + except Exception as exc: + raise ValueError( + "Unable to validate detect-secrets plugins because runtime introspection failed" + ) from exc + normalized: list[str] = [] + seen: set[str] = set() + + for plugin_name in value: + candidate = plugin_name.strip() + if not candidate: + raise ValueError("enabled_plugins entries must be non-empty") + if candidate not in available: + raise ValueError(f"Unknown detect-secrets plugin: {candidate}") + if candidate not in seen: + normalized.append(candidate) + seen.add(candidate) + + return normalized + + @field_validator("exclude_lines_regex") + @classmethod + def validate_exclude_lines_regex(cls, value: list[str]) -> list[str]: + """Validate each configured exclude pattern as a RE2 regex.""" + for pattern in value: + if pattern == "": + raise ValueError("exclude_lines_regex entries must be non-empty") + try: + re2.compile(pattern) + except re2.error as exc: + raise ValueError(f"Invalid RE2 pattern '{pattern}': {exc}") from exc + return value diff --git a/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/evaluator.py b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/evaluator.py new file mode 100644 index 00000000..e3f1c430 --- /dev/null +++ b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/evaluator.py @@ -0,0 +1,315 @@ +"""Agent Control evaluator adapter for detect-secrets-async.""" + +from __future__ import annotations + +import time +from importlib.metadata import PackageNotFoundError, version +from typing import Any + +import re2 +from agent_control_evaluators import Evaluator, EvaluatorMetadata, register_evaluator +from agent_control_models import EvaluatorResult +from detect_secrets_async import ( + RuntimeConfigConflictError, + RuntimeScanError, + ScanConfig, + ScanFinding, + ScanRequest, + get_runtime, + get_runtime_info, +) + +from agent_control_evaluator_detect_secrets.detect_secrets.config import ( + DetectSecretsEvaluatorConfig, +) +from agent_control_evaluator_detect_secrets.detect_secrets.normalization import ( + LineLocation, + NormalizationError, + NormalizedPayload, + apply_line_exclusions, + normalize_payload, +) + +try: + PACKAGE_VERSION = version("agent-control-evaluator-detect_secrets") +except PackageNotFoundError: + PACKAGE_VERSION = "0.0.0.dev" + +FAILURE_MESSAGES: dict[str, str] = { + "invalid_config": "detect-secrets runtime rejected the scan configuration", + "normalization_error": "selected payload could not be normalized for secret scanning", + "payload_too_large": "normalized payload exceeded the configured size limit", + "queue_full": "detect-secrets runtime queue is full", + "queue_timeout": "secret scan timed out while waiting for runtime capacity", + "worker_startup_error": "detect-secrets worker failed to start", + "worker_timeout": "secret scan timed out", + "worker_crash": "detect-secrets worker exited unexpectedly", + "worker_protocol_error": "detect-secrets worker protocol error", + "runtime_error": "detect-secrets runtime error", +} + +IDENTIFIER_LIKE_KEY_PATTERN = re2.compile(r"^[A-Za-z_][A-Za-z0-9_.:-]{0,127}$") +JSON_SCALAR_LIKE_KEY_PATTERN = re2.compile( + r"^(?:-?(?:0|[1-9][0-9]*)(?:\.[0-9]+)?(?:[eE][+-]?[0-9]+)?|true|false|null)$" +) + + +@register_evaluator +class DetectSecretsEvaluator(Evaluator[DetectSecretsEvaluatorConfig]): + """Scan selector-selected content for likely secrets using detect-secrets-async.""" + + metadata = EvaluatorMetadata( + name="yelp.detect_secrets", + version=PACKAGE_VERSION, + description="Potential secret detection via detect-secrets-async", + timeout_ms=10_000, + ) + config_model = DetectSecretsEvaluatorConfig + + def __init__(self, config: DetectSecretsEvaluatorConfig) -> None: + super().__init__(config) + self._exclude_line_patterns = tuple( + re2.compile(pattern) for pattern in config.exclude_lines_regex + ) + + async def evaluate(self, data: Any) -> EvaluatorResult: + """Normalize selector output, run detect-secrets, and map results into EvaluatorResult.""" + started_at = time.monotonic() + try: + normalized = normalize_payload(data) + except (NormalizationError, RecursionError): + return self._failure_result( + failure_mode="normalization_error", + normalized_payload_type=None, + detect_secrets_version=self._runtime_version_or_unknown(), + ) + + if normalized.payload_type == "none": + return self._success_result( + normalized=normalized, + detect_secrets_version=self._runtime_version_or_unknown(), + findings=[], + ) + + try: + runtime_info = get_runtime_info() + except Exception: + return self._failure_result( + failure_mode="runtime_error", + normalized_payload_type=None, + detect_secrets_version="unknown", + ) + + assert normalized.text is not None + try: + filtered_text = apply_line_exclusions(normalized.text, self._exclude_line_patterns) + filtered_bytes = filtered_text.encode("utf-8") + except UnicodeError: + return self._failure_result( + failure_mode="normalization_error", + normalized_payload_type=normalized.payload_type, + detect_secrets_version=runtime_info.detect_secrets_version, + ) + + if len(filtered_bytes) > self.config.max_bytes: + return self._failure_result( + failure_mode="payload_too_large", + normalized_payload_type=normalized.payload_type, + detect_secrets_version=runtime_info.detect_secrets_version, + ) + + if self._remaining_timeout_ms(started_at) <= 0: + return self._failure_result( + failure_mode="queue_timeout", + normalized_payload_type=normalized.payload_type, + detect_secrets_version=runtime_info.detect_secrets_version, + ) + + request = ScanRequest( + content=filtered_text, + timeout_ms=self._bounded_remaining_timeout_ms(started_at), + config=ScanConfig( + enabled_plugins=tuple(self.config.enabled_plugins) + if self.config.enabled_plugins is not None + else None + ), + ) + + try: + configured_runtime = runtime_info.configured_runtime + runtime = ( + get_runtime(configured_runtime) if configured_runtime is not None else get_runtime() + ) + scan_result = await runtime.scan(request) + except RuntimeConfigConflictError: + return self._failure_result( + failure_mode="runtime_error", + normalized_payload_type=normalized.payload_type, + detect_secrets_version=runtime_info.detect_secrets_version, + ) + except RuntimeScanError as exc: + failure_mode = exc.code.value + return self._failure_result( + failure_mode=failure_mode, + normalized_payload_type=normalized.payload_type, + detect_secrets_version=runtime_info.detect_secrets_version, + ) + except Exception: + return self._failure_result( + failure_mode="runtime_error", + normalized_payload_type=normalized.payload_type, + detect_secrets_version=runtime_info.detect_secrets_version, + ) + + findings = self._map_findings(normalized=normalized, findings=scan_result.findings) + return self._success_result( + normalized=normalized, + detect_secrets_version=scan_result.detect_secrets_version, + findings=findings, + ) + + def _map_findings( + self, + *, + normalized: NormalizedPayload, + findings: tuple[ScanFinding, ...], + ) -> list[dict[str, Any]]: + mapped: list[dict[str, Any]] = [] + + for finding in findings: + finding_metadata: dict[str, Any] = {"type": finding.type} + if normalized.payload_type == "str": + if finding.line_number is not None: + finding_metadata["line_number"] = finding.line_number + elif normalized.payload_type in {"dict", "list"}: + if finding.line_number is not None: + location = normalized.line_locations_by_line.get(finding.line_number) + json_pointer = self._safe_structured_pointer(location=location) + if json_pointer is not None: + finding_metadata["json_pointer"] = json_pointer + + mapped.append(finding_metadata) + + return mapped + + def _safe_structured_pointer(self, *, location: LineLocation | None) -> str | None: + if location is None: + return None + + pointer = location.json_pointer + if location.key_name is not None and self._key_name_is_secret_like(location.key_name): + pointer = location.parent_pointer + + return self._truncate_pointer_at_secret_like_segment(pointer) + + def _truncate_pointer_at_secret_like_segment(self, pointer: str | None) -> str | None: + if pointer is None or pointer == "": + return pointer + + safe_segments: list[str] = [] + for encoded_segment in pointer.split("/")[1:]: + segment = self._decode_json_pointer_segment(encoded_segment) + if self._pointer_segment_is_secret_like(segment): + break + safe_segments.append(segment) + else: + return pointer + + if not safe_segments: + return "" + + encoded_segments = [self._encode_json_pointer_segment(segment) for segment in safe_segments] + return "/" + "/".join(encoded_segments) + + def _pointer_segment_is_secret_like(self, segment: str) -> bool: + if segment.isdigit(): + return False + return self._key_name_is_secret_like(segment) + + def _decode_json_pointer_segment(self, segment: str) -> str: + return segment.replace("~1", "/").replace("~0", "~") + + def _encode_json_pointer_segment(self, segment: str) -> str: + return segment.replace("~", "~0").replace("/", "~1") + + def _key_name_is_secret_like(self, key_name: str | None) -> bool: + if key_name is None: + return False + + if JSON_SCALAR_LIKE_KEY_PATTERN.fullmatch(key_name): + return False + + if not IDENTIFIER_LIKE_KEY_PATTERN.fullmatch(key_name): + return True + + return len(key_name) >= 20 + + def _runtime_version_or_unknown(self) -> str: + try: + return get_runtime_info().detect_secrets_version + except Exception: + return "unknown" + + def _remaining_timeout_ms(self, started_at: float) -> int: + return int(self.config.timeout_ms - ((time.monotonic() - started_at) * 1000)) + + def _bounded_remaining_timeout_ms(self, started_at: float) -> int: + return max(1, self._remaining_timeout_ms(started_at)) + + def _success_result( + self, + *, + normalized: NormalizedPayload, + detect_secrets_version: str, + findings: list[dict[str, Any]], + ) -> EvaluatorResult: + matched = bool(findings) + return EvaluatorResult( + matched=matched, + confidence=1.0, + message=( + f"Potential secrets detected ({len(findings)} findings)" + if matched + else "No potential secrets detected" + ), + metadata={ + "findings_count": len(findings), + "findings": findings, + "normalized_payload_type": normalized.payload_type, + "detect_secrets_version": detect_secrets_version, + }, + ) + + def _failure_result( + self, + *, + failure_mode: str, + normalized_payload_type: str | None, + detect_secrets_version: str, + ) -> EvaluatorResult: + detail = FAILURE_MESSAGES.get(failure_mode, FAILURE_MESSAGES["runtime_error"]) + metadata: dict[str, Any] = { + "findings_count": 0, + "findings": [], + "detect_secrets_version": detect_secrets_version, + "failure_mode": failure_mode, + } + if normalized_payload_type is not None: + metadata["normalized_payload_type"] = normalized_payload_type + + if self.config.on_error == "deny": + metadata["fallback_action"] = "deny" + return EvaluatorResult( + matched=True, + confidence=0.0, + message=f"Denied due to evaluator failure ({failure_mode}): {detail}", + metadata=metadata, + ) + + metadata["fallback_action"] = "allow" + return EvaluatorResult( + matched=False, + confidence=0.0, + message=f"Secret scan failed ({failure_mode}): {detail}; allowing request", + metadata=metadata, + ) diff --git a/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/normalization.py b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/normalization.py new file mode 100644 index 00000000..ba7f317c --- /dev/null +++ b/evaluators/contrib/detect_secrets/src/agent_control_evaluator_detect_secrets/detect_secrets/normalization.py @@ -0,0 +1,317 @@ +"""Payload normalization helpers for the detect-secrets evaluator.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any, Literal + +NormalizedPayloadType = Literal["none", "str", "dict", "list", "primitive"] + + +class NormalizationError(ValueError): + """Raised when selector-selected payloads cannot be normalized safely.""" + + +@dataclass(frozen=True, slots=True) +class NormalizedPayload: + """Normalized text payload and line-to-JSON-pointer metadata.""" + + payload_type: NormalizedPayloadType + text: str | None + line_locations_by_line: dict[int, LineLocation] + + +@dataclass(frozen=True, slots=True) +class LineLocation: + """Safe structured-location metadata for a rendered line.""" + + json_pointer: str | None + parent_pointer: str | None = None + key_probe_text: str | None = None + key_name: str | None = None + + +@dataclass(frozen=True, slots=True) +class RenderedLine: + """A rendered JSON line plus optional structural pointer metadata.""" + + text: str + location: LineLocation | None = None + + +def _json_dumps(value: Any, **kwargs: Any) -> str: + """Serialize JSON while keeping Unicode line separators escaped on one logical line.""" + dumped = json.dumps(value, **kwargs) + return ( + dumped.replace("\u0085", "\\u0085") + .replace("\u2028", "\\u2028") + .replace("\u2029", "\\u2029") + ) + + +def normalize_payload(data: Any) -> NormalizedPayload: + """Normalize selector output to deterministic text for detect-secrets scanning.""" + if data is None: + return NormalizedPayload(payload_type="none", text=None, line_locations_by_line={}) + + if isinstance(data, str): + return NormalizedPayload(payload_type="str", text=data, line_locations_by_line={}) + + if isinstance(data, dict): + return _normalize_structured_payload(data, payload_type="dict") + + if isinstance(data, list | tuple): + return _normalize_structured_payload(data, payload_type="list") + + if isinstance(data, bool | int | float): + return _normalize_primitive_payload(data) + + raise NormalizationError(f"Unsupported payload type for normalization: {type(data).__name__}") + + +def apply_line_exclusions(text: str, patterns: tuple[Any, ...]) -> str: + """Blank matching lines without changing line numbering.""" + if not patterns: + return text + + filtered_lines = [ + "" if any(pattern.search(line) for pattern in patterns) else line + for line in text.split("\n") + ] + return "\n".join(filtered_lines) + + +def _normalize_structured_payload( + data: dict[Any, Any] | list[Any] | tuple[Any, ...], + *, + payload_type: Literal["dict", "list"], +) -> NormalizedPayload: + try: + normalized_data = _normalize_json_value(data) + except (TypeError, ValueError) as exc: + raise NormalizationError(f"Failed to normalize structured payload: {exc}") from exc + + try: + text = _json_dumps( + normalized_data, + sort_keys=True, + indent=2, + ensure_ascii=False, + allow_nan=False, + ) + except (TypeError, ValueError) as exc: + raise NormalizationError(f"Failed to normalize structured payload: {exc}") from exc + + try: + rendered_lines = _render_json_lines(normalized_data) + except (TypeError, ValueError) as exc: + raise NormalizationError(f"Failed to map structured payload lines: {exc}") from exc + + rendered_text = "\n".join(line.text for line in rendered_lines) + if rendered_text != text: + raise NormalizationError("Structured payload rendering mismatch during normalization") + + line_locations_by_line = { + line_number: rendered_line.location + for line_number, rendered_line in enumerate(rendered_lines, start=1) + if rendered_line.location is not None + } + return NormalizedPayload( + payload_type=payload_type, + text=text, + line_locations_by_line=line_locations_by_line, + ) + + +def _normalize_primitive_payload(data: bool | int | float) -> NormalizedPayload: + try: + text = _json_dumps(data, ensure_ascii=False, allow_nan=False) + except (TypeError, ValueError) as exc: + raise NormalizationError(f"Failed to normalize scalar payload: {exc}") from exc + + return NormalizedPayload(payload_type="primitive", text=text, line_locations_by_line={}) + + +def _normalize_json_value(value: Any) -> Any: + """Convert supported Python payloads into a deterministic JSON-compatible shape.""" + if isinstance(value, dict): + normalized_object: dict[str, Any] = {} + for raw_key, child in value.items(): + normalized_key = _json_object_key_name(raw_key) + if normalized_key in normalized_object: + raise ValueError(f"JSON key collision after normalization: {normalized_key!r}") + normalized_object[normalized_key] = _normalize_json_value(child) + return normalized_object + + if isinstance(value, list | tuple): + return [_normalize_json_value(child) for child in value] + + return value + + +def _render_json_lines( + value: Any, + *, + indent_level: int = 0, + prefix: str = "", + pointer: str = "", +) -> list[RenderedLine]: + indent = " " * (indent_level * 2) + + if isinstance(value, dict): + return _render_dict_lines(value, indent_level=indent_level, prefix=prefix, pointer=pointer) + + if isinstance(value, list | tuple): + return _render_list_lines(value, indent_level=indent_level, prefix=prefix, pointer=pointer) + + scalar_text = _json_dumps(value, ensure_ascii=False, allow_nan=False) + scalar_pointer = pointer or None + return [ + RenderedLine( + text=f"{indent}{prefix}{scalar_text}", + location=LineLocation(json_pointer=scalar_pointer), + ) + ] + + +def _render_dict_lines( + value: dict[Any, Any], + *, + indent_level: int, + prefix: str, + pointer: str, +) -> list[RenderedLine]: + indent = " " * (indent_level * 2) + if not value: + return [RenderedLine(text=f"{indent}{prefix}{{}}")] + + lines = [RenderedLine(text=f"{indent}{prefix}{{")] + items = sorted(value.items(), key=lambda item: _json_object_key_name(item[0])) + last_index = len(items) - 1 + + for index, (raw_key, child) in enumerate(items): + suffix = "," if index < last_index else "" + key_name = _json_object_key_name(raw_key) + key_literal = _json_dumps(key_name, ensure_ascii=False, allow_nan=False) + child_prefix = f"{key_literal}: " + child_pointer = _append_json_pointer(pointer, key_name) + child_lines = _render_json_lines( + child, + indent_level=indent_level + 1, + prefix=child_prefix, + pointer=child_pointer, + ) + child_lines = _attach_dict_child_location( + child=child, + child_lines=child_lines, + child_pointer=child_pointer, + parent_pointer=pointer if pointer else "", + key_literal=key_literal, + key_name=key_name, + ) + child_lines[-1] = RenderedLine( + text=f"{child_lines[-1].text}{suffix}", + location=child_lines[-1].location, + ) + lines.extend(child_lines) + + lines.append(RenderedLine(text=f"{indent}}}")) + return lines + + +def _render_list_lines( + value: list[Any] | tuple[Any, ...], + *, + indent_level: int, + prefix: str, + pointer: str, +) -> list[RenderedLine]: + indent = " " * (indent_level * 2) + if not value: + return [RenderedLine(text=f"{indent}{prefix}[]")] + + lines = [RenderedLine(text=f"{indent}{prefix}[")] + last_index = len(value) - 1 + + for index, child in enumerate(value): + suffix = "," if index < last_index else "" + child_pointer = _append_json_pointer(pointer, str(index)) + child_lines = _render_json_lines( + child, + indent_level=indent_level + 1, + prefix="", + pointer=child_pointer, + ) + child_lines = _attach_list_child_location(child, child_lines, child_pointer) + child_lines[-1] = RenderedLine( + text=f"{child_lines[-1].text}{suffix}", + location=child_lines[-1].location, + ) + lines.extend(child_lines) + + lines.append(RenderedLine(text=f"{indent}]")) + return lines + + +def _json_object_key_name(key: Any) -> str: + if isinstance(key, str): + return key + if key is True: + return "true" + if key is False: + return "false" + if key is None: + return "null" + if isinstance(key, int | float): + return _json_dumps(key, ensure_ascii=False, allow_nan=False) + raise TypeError(f"Unsupported JSON object key type: {type(key).__name__}") + + +def _attach_dict_child_location( + child: Any, + child_lines: list[RenderedLine], + child_pointer: str, + parent_pointer: str | None, + key_literal: str, + key_name: str, +) -> list[RenderedLine]: + if child_lines: + first_line = child_lines[0] + child_lines[0] = RenderedLine( + text=first_line.text, + location=LineLocation( + json_pointer=child_pointer, + parent_pointer=parent_pointer, + key_probe_text=_build_key_probe_text(key_literal, child), + key_name=key_name, + ), + ) + return child_lines + + +def _attach_list_child_location( + child: Any, + child_lines: list[RenderedLine], + child_pointer: str, +) -> list[RenderedLine]: + if isinstance(child, dict | list) and child_lines: + first_line = child_lines[0] + child_lines[0] = RenderedLine( + text=first_line.text, + location=LineLocation(json_pointer=child_pointer), + ) + return child_lines + + +def _build_key_probe_text(key_literal: str, child: Any) -> str: + if isinstance(child, dict): + return f"{key_literal}: {{}}" + if isinstance(child, list): + return f"{key_literal}: []" + return f"{key_literal}: null" + + +def _append_json_pointer(pointer: str, segment: str) -> str: + escaped = segment.replace("~", "~0").replace("/", "~1") + return f"{pointer}/{escaped}" if pointer else f"/{escaped}" diff --git a/evaluators/contrib/detect_secrets/tests/__init__.py b/evaluators/contrib/detect_secrets/tests/__init__.py new file mode 100644 index 00000000..aaf932b2 --- /dev/null +++ b/evaluators/contrib/detect_secrets/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the detect-secrets contrib evaluator package.""" diff --git a/evaluators/contrib/detect_secrets/tests/detect_secrets/test_evaluator.py b/evaluators/contrib/detect_secrets/tests/detect_secrets/test_evaluator.py new file mode 100644 index 00000000..e64846f7 --- /dev/null +++ b/evaluators/contrib/detect_secrets/tests/detect_secrets/test_evaluator.py @@ -0,0 +1,1269 @@ +from __future__ import annotations + +import asyncio +from importlib.metadata import entry_points +from typing import Any, Literal + +import pytest +from detect_secrets_async import ( + RuntimeConfig, + RuntimeConfigConflictError, + RuntimeScanError, + ScanFailureCode, + ScanResult, + get_runtime_info, +) + +from agent_control_evaluator_detect_secrets.detect_secrets import ( + DetectSecretsEvaluator, + DetectSecretsEvaluatorConfig, +) +from agent_control_evaluator_detect_secrets.detect_secrets.evaluator import FAILURE_MESSAGES +from agent_control_evaluator_detect_secrets.detect_secrets.normalization import normalize_payload + + +@pytest.mark.asyncio +async def test_none_input_returns_no_match() -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + result = await evaluator.evaluate(None) + + assert result.matched is False + assert result.error is None + assert result.metadata == { + "findings_count": 0, + "findings": [], + "normalized_payload_type": "none", + "detect_secrets_version": get_runtime_info().detect_secrets_version, + } + + +@pytest.mark.asyncio +async def test_none_input_short_circuits_runtime_failures( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime_info", + lambda: (_ for _ in ()).throw(RuntimeError("boom")), + ) + + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(on_error="deny")) + result = await evaluator.evaluate(None) + + assert result.matched is False + assert result.error is None + assert result.metadata == { + "findings_count": 0, + "findings": [], + "normalized_payload_type": "none", + "detect_secrets_version": "unknown", + } + + +@pytest.mark.asyncio +async def test_string_secret_matches() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate("github_token = 'ghp_123456789012345678901234567890123456'") + + assert result.matched is True + assert result.confidence == 1.0 + assert result.metadata is not None + assert result.metadata["findings_count"] == 1 + assert result.metadata["normalized_payload_type"] == "str" + assert result.metadata["findings"] == [{"type": "GitHub Token", "line_number": 1}] + + +@pytest.mark.asyncio +async def test_string_without_findings_does_not_match() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate("safe content only") + + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["findings_count"] == 0 + + +@pytest.mark.asyncio +async def test_dict_payload_maps_findings_to_json_pointer() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + { + "response": { + "headers": { + "authorization": "ghp_123456789012345678901234567890123456", + } + } + } + ) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert result.metadata["findings"] == [ + { + "type": "GitHub Token", + "json_pointer": "/response/headers/authorization", + } + ] + + +@pytest.mark.asyncio +async def test_dict_key_with_container_value_maps_findings_to_json_pointer() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + { + "outer": { + "ghp_123456789012345678901234567890123456": { + "nested": "safe", + } + } + } + ) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert result.metadata["findings"] == [ + { + "type": "GitHub Token", + "json_pointer": "/outer", + } + ] + + +@pytest.mark.asyncio +async def test_root_dict_key_with_container_value_maps_findings_to_root_pointer() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + { + "ghp_123456789012345678901234567890123456": { + "nested": "safe", + } + } + ) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert result.metadata["findings"] == [ + { + "type": "GitHub Token", + "json_pointer": "", + } + ] + + +@pytest.mark.asyncio +async def test_dict_key_with_scalar_value_omits_json_pointer() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + { + "ghp_123456789012345678901234567890123456": "safe", + } + ) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert result.metadata["findings"] == [ + { + "type": "GitHub Token", + "json_pointer": "", + } + ] + + +@pytest.mark.asyncio +async def test_list_payload_maps_findings_to_json_pointer() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + [ + {"kind": "safe"}, + {"token": "ghp_123456789012345678901234567890123456"}, + ] + ) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "list" + assert result.metadata["findings"] == [ + { + "type": "GitHub Token", + "json_pointer": "/1/token", + } + ] + + +@pytest.mark.asyncio +async def test_structured_unicode_line_separator_preserves_json_pointer() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate({"x": "prefix\u2028ghp_123456789012345678901234567890123456"}) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["findings"] == [{"type": "GitHub Token", "json_pointer": "/x"}] + + +@pytest.mark.asyncio +async def test_secret_bearing_object_keys_do_not_leak_through_json_pointer() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + key_secret = "ghp_123456789012345678901234567890123456" + value_secret = "ghp_abcdefabcdefabcdefabcdefabcdefabcdef" + result = await evaluator.evaluate({key_secret: {"nested": value_secret}}) + + assert result.matched is True + assert result.metadata is not None + assert all(finding.get("json_pointer", "") == "" for finding in result.metadata["findings"]) + assert all( + key_secret not in finding.get("json_pointer", "") for finding in result.metadata["findings"] + ) + + +@pytest.mark.asyncio +async def test_nested_findings_under_secret_like_key_truncate_to_safe_ancestor() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + key_secret = "ghp_123456789012345678901234567890123456" + value_secret = "ghp_abcdefabcdefabcdefabcdefabcdefabcdef" + result = await evaluator.evaluate({"outer": {key_secret: {"nested": value_secret}}}) + + assert result.matched is True + assert result.metadata is not None + assert all( + finding.get("json_pointer", "") == "/outer" for finding in result.metadata["findings"] + ) + assert all( + key_secret not in finding.get("json_pointer", "") for finding in result.metadata["findings"] + ) + + +@pytest.mark.asyncio +async def test_tuple_payload_maps_findings_like_a_list() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + ( + {"token": "ghp_123456789012345678901234567890123456"}, + {"kind": "safe"}, + ) + ) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "list" + assert result.metadata["findings"] == [ + { + "type": "GitHub Token", + "json_pointer": "/0/token", + } + ] + + +@pytest.mark.asyncio +async def test_mixed_key_types_still_normalize_and_scan() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + { + 1: "ghp_123456789012345678901234567890123456", + "kind": "safe", + } + ) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert result.metadata["findings"] == [ + { + "type": "GitHub Token", + "json_pointer": "/1", + } + ] + + +@pytest.mark.asyncio +async def test_colliding_normalized_keys_route_through_normalization_error() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate( + { + 1: "ghp_123456789012345678901234567890123456", + "1": "safe", + } + ) + + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "allow" + + +@pytest.mark.asyncio +async def test_primitive_payload_is_normalized_and_omits_line_numbers() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + result = await evaluator.evaluate(True) + + assert result.matched is False + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "primitive" + assert result.metadata["findings"] == [] + + +@pytest.mark.asyncio +async def test_non_json_serializable_payload_routes_through_on_error_allow() -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + result = await evaluator.evaluate({"bad": {1, 2, 3}}) + + assert result.matched is False + assert result.confidence == 0.0 + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "allow" + + +@pytest.mark.asyncio +async def test_invalid_unicode_payload_routes_through_on_error_deny() -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(on_error="deny")) + + result = await evaluator.evaluate("\ud800") + + assert result.matched is True + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "deny" + + +@pytest.mark.asyncio +async def test_invalid_unicode_with_exclusions_routes_through_on_error_deny() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(on_error="deny", exclude_lines_regex=["x"]) + ) + + result = await evaluator.evaluate("\ud800") + + assert result.matched is True + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "deny" + + +@pytest.mark.asyncio +async def test_recursive_payload_routes_through_normalization_error() -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + payload: dict[str, Any] = {} + payload["self"] = payload + + result = await evaluator.evaluate(payload) + + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "allow" + + +@pytest.mark.asyncio +async def test_oversized_payload_routes_through_on_error_allow() -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(max_bytes=8)) + + result = await evaluator.evaluate("0123456789") + + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "payload_too_large" + assert result.metadata["normalized_payload_type"] == "str" + assert result.metadata["fallback_action"] == "allow" + + +@pytest.mark.asyncio +async def test_exhausted_timeout_budget_short_circuits_before_runtime( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monotonic_values = [100.0, 100.02] + + def fake_monotonic() -> float: + if monotonic_values: + return monotonic_values.pop(0) + return 100.02 + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.time.monotonic", + fake_monotonic, + ) + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + lambda config=None: pytest.fail("runtime should not be invoked"), + ) + + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(timeout_ms=10)) + result = await evaluator.evaluate("safe content only") + + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "queue_timeout" + assert result.metadata["fallback_action"] == "allow" + + +@pytest.mark.asyncio +async def test_on_error_deny_fails_closed(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeRuntime: + async def scan(self, request: Any) -> Any: + raise RuntimeScanError(ScanFailureCode.WORKER_TIMEOUT) + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + lambda config=None: FakeRuntime(), + ) + + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(on_error="deny")) + result = await evaluator.evaluate("hello") + + assert result.matched is True + assert result.confidence == 0.0 + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "worker_timeout" + assert result.metadata["fallback_action"] == "deny" + + +@pytest.mark.asyncio +async def test_explicit_runtime_failure_is_sanitized(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeRuntime: + async def scan(self, request: Any) -> Any: + raise RuntimeScanError(ScanFailureCode.WORKER_CRASH, "raw runtime detail") + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + lambda config=None: FakeRuntime(), + ) + + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + result = await evaluator.evaluate("hello") + + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "worker_crash" + assert result.metadata["fallback_action"] == "allow" + assert result.message is not None + assert FAILURE_MESSAGES["worker_crash"] in result.message + + +@pytest.mark.asyncio +async def test_structured_same_line_findings_map_to_field_pointer_without_probing() -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + result = await evaluator.evaluate({"secret": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="}) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert all(finding.get("json_pointer") == "/secret" for finding in result.metadata["findings"]) + assert {"type": "Secret Keyword", "json_pointer": "/secret"} in result.metadata["findings"] + + +@pytest.mark.asyncio +async def test_preconfigured_runtime_is_reused(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeRuntime: + async def scan(self, request: Any) -> ScanResult: + return ScanResult(findings=(), detect_secrets_version="1.5.0") + + runtime_config = RuntimeConfig(pool_size=2, max_queue_depth=6, max_requests_per_worker=40) + runtime_info = get_runtime_info().model_copy(update={"configured_runtime": runtime_config}) + runtime_calls: list[RuntimeConfig | None] = [] + + def fake_get_runtime(config: RuntimeConfig | None = None) -> FakeRuntime: + runtime_calls.append(config) + return FakeRuntime() + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime_info", + lambda: runtime_info, + ) + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + fake_get_runtime, + ) + + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + result = await evaluator.evaluate("safe content only") + + assert result.matched is False + assert runtime_calls == [runtime_config] + + +@pytest.mark.asyncio +async def test_unexpected_runtime_errors_honor_on_error_deny( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + lambda config=None: (_ for _ in ()).throw(ValueError("boom")), + ) + + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(on_error="deny")) + result = await evaluator.evaluate("safe content only") + + assert result.matched is True + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "runtime_error" + assert result.metadata["fallback_action"] == "deny" + + +@pytest.mark.asyncio +async def test_initial_scan_uses_remaining_timeout_budget( + monkeypatch: pytest.MonkeyPatch, +) -> None: + class FakeRuntime: + def __init__(self) -> None: + self.requests: list[Any] = [] + + async def scan(self, request: Any) -> ScanResult: + self.requests.append(request) + return ScanResult(findings=(), detect_secrets_version="1.5.0") + + fake_runtime = FakeRuntime() + monotonic_values = [100.0, 100.04] + + def fake_monotonic() -> float: + if monotonic_values: + return monotonic_values.pop(0) + return 100.04 + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + lambda config=None: fake_runtime, + ) + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.time.monotonic", + fake_monotonic, + ) + + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(timeout_ms=50)) + result = await evaluator.evaluate("safe content only") + + assert result.matched is False + assert len(fake_runtime.requests) == 1 + assert 1 <= fake_runtime.requests[0].timeout_ms < 50 + + +@pytest.mark.asyncio +async def test_exclude_lines_regex_suppresses_findings() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig( + enabled_plugins=["GitHubTokenDetector"], + exclude_lines_regex=["ghp_[A-Za-z0-9]{36}"], + ) + ) + + result = await evaluator.evaluate("github_token = 'ghp_123456789012345678901234567890123456'") + + assert result.matched is False + assert result.metadata is not None + assert result.metadata["findings"] == [] + + +@pytest.mark.asyncio +async def test_exclude_lines_preserves_line_numbers_for_plain_strings() -> None: + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig( + enabled_plugins=["GitHubTokenDetector"], + exclude_lines_regex=["^ignore me$"], + ) + ) + + content = "\n".join( + [ + "line 1", + "ignore me", + "github_token = 'ghp_123456789012345678901234567890123456'", + ] + ) + result = await evaluator.evaluate(content) + + assert result.matched is True + assert result.metadata is not None + assert result.metadata["findings"] == [{"type": "GitHub Token", "line_number": 3}] + + +def test_invalid_regex_is_rejected() -> None: + with pytest.raises(ValueError, match="Invalid RE2 pattern"): + DetectSecretsEvaluatorConfig(exclude_lines_regex=["("]) + + +def test_blank_regex_is_rejected() -> None: + with pytest.raises(ValueError, match="exclude_lines_regex entries must be non-empty"): + DetectSecretsEvaluatorConfig(exclude_lines_regex=[""]) + + +def test_unknown_plugin_is_rejected() -> None: + with pytest.raises(ValueError, match="Unknown detect-secrets plugin"): + DetectSecretsEvaluatorConfig(enabled_plugins=["NoSuchPlugin"]) + + +def test_plugin_validation_runtime_failures_are_wrapped( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.config.get_runtime_info", + lambda: (_ for _ in ()).throw(RuntimeError("boom")), + ) + + with pytest.raises( + ValueError, + match="Unable to validate detect-secrets plugins because runtime introspection failed", + ): + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + + +@pytest.mark.asyncio +async def test_omitted_enabled_plugins_uses_upstream_defaults() -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + sample = "api_key = 'abcdefghijklmnopqrstuvwxyz0123456789ABCDE='" + + result = await evaluator.evaluate(sample) + + assert result.matched is True + assert result.metadata is not None + finding_types = {finding["type"] for finding in result.metadata["findings"]} + assert "Secret Keyword" in finding_types + + +def test_normalize_payload_renders_expected_json_pointer_lines() -> None: + normalized = normalize_payload({"outer": [{"inner": "secret"}]}) + + assert normalized.payload_type == "dict" + assert normalized.line_locations_by_line[4].json_pointer == "/outer/0/inner" + + +@pytest.mark.parametrize( + ("key_name", "expected"), + [ + ("github_token_key_name", True), + ("MyVeryLongFunctionName", True), + ("api_key_v2", False), + ("github_pat_11ABCDEFG1234567890123", True), + ("0", False), + ("abcdefghijklmnopqrstuvwxyzabcdef", True), + ], +) +def test_key_name_is_secret_like_heuristic(key_name: str, expected: bool) -> None: + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + assert evaluator._key_name_is_secret_like(key_name) is expected + + +def test_entry_point_is_registered() -> None: + evaluator_entry_points = { + entry_point.name: entry_point.value + for entry_point in entry_points(group="agent_control.evaluators") + } + + assert evaluator_entry_points["yelp.detect_secrets"] == ( + "agent_control_evaluator_detect_secrets.detect_secrets:DetectSecretsEvaluator" + ) + + +def test_entry_point_load_returns_evaluator_class() -> None: + # Given: the registered yelp.detect_secrets entry point + evaluator_entry_points = { + entry_point.name: entry_point + for entry_point in entry_points(group="agent_control.evaluators") + } + entry_point = evaluator_entry_points["yelp.detect_secrets"] + + # When: the entry point is loaded + loaded_class = entry_point.load() + + # Then: it resolves to the DetectSecretsEvaluator class + assert loaded_class is DetectSecretsEvaluator + + +# --------------------------------------------------------------------------- +# Failure-mode matrix: every ScanFailureCode x {allow, deny} combination, +# plus evaluator-layer failures, plus a drift pin for FAILURE_MESSAGES. +# --------------------------------------------------------------------------- + +_RUNTIME_FAILURE_CODES: tuple[ScanFailureCode, ...] = ( + ScanFailureCode.INVALID_CONFIG, + ScanFailureCode.QUEUE_FULL, + ScanFailureCode.QUEUE_TIMEOUT, + ScanFailureCode.WORKER_STARTUP_ERROR, + ScanFailureCode.WORKER_TIMEOUT, + ScanFailureCode.WORKER_CRASH, + ScanFailureCode.WORKER_PROTOCOL_ERROR, + ScanFailureCode.RUNTIME_ERROR, +) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("failure_code", _RUNTIME_FAILURE_CODES) +@pytest.mark.parametrize("on_error", ["allow", "deny"]) +async def test_runtime_failure_routes_through_on_error_for_each_code( + failure_code: ScanFailureCode, + on_error: Literal["allow", "deny"], + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given: a runtime that raises the given ScanFailureCode on every scan + class FakeRuntime: + async def scan(self, request: Any) -> Any: + raise RuntimeScanError(failure_code) + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + lambda config=None: FakeRuntime(), + ) + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(on_error=on_error)) + + # When: a valid string payload is evaluated + result = await evaluator.evaluate("safe content only") + + # Then: the failure_mode reflects the code and fallback_action mirrors on_error + assert result.error is None + assert result.confidence == 0.0 + assert result.metadata is not None + assert result.metadata["failure_mode"] == failure_code.value + assert result.metadata["fallback_action"] == on_error + assert result.message is not None + assert FAILURE_MESSAGES[failure_code.value] in result.message + assert result.matched is (on_error == "deny") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("on_error", ["allow", "deny"]) +@pytest.mark.parametrize( + ("failure_mode", "config_kwargs", "payload"), + [ + ("normalization_error", {}, {"bad": {1, 2, 3}}), + ("payload_too_large", {"max_bytes": 8}, "0123456789"), + ], + ids=["normalization_error", "payload_too_large"], +) +async def test_evaluator_layer_failure_routes_through_on_error( + failure_mode: str, + config_kwargs: dict[str, Any], + payload: Any, + on_error: Literal["allow", "deny"], +) -> None: + # Given: an evaluator configured to hit the given evaluator-layer failure + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(on_error=on_error, **config_kwargs) + ) + + # When: the triggering payload is evaluated + result = await evaluator.evaluate(payload) + + # Then: metadata carries the expected failure_mode and fallback_action for both modes + assert result.error is None + assert result.confidence == 0.0 + assert result.metadata is not None + assert result.metadata["failure_mode"] == failure_mode + assert result.metadata["fallback_action"] == on_error + assert result.matched is (on_error == "deny") + + +def test_failure_messages_cover_all_runtime_scan_failure_codes() -> None: + # Given: the detect-secrets-async ScanFailureCode enum and the evaluator's two + # evaluator-layer failure modes + runtime_code_values = {code.value for code in ScanFailureCode} + evaluator_layer_codes = {"normalization_error", "payload_too_large"} + + # When: the expected key set is formed + expected_keys = runtime_code_values | evaluator_layer_codes + + # Then: FAILURE_MESSAGES has exactly that set, with a non-empty message for each + assert set(FAILURE_MESSAGES) == expected_keys + assert all(message for message in FAILURE_MESSAGES.values()) + + +# --------------------------------------------------------------------------- +# Normalization edge cases (top-level types, NaN, empty containers, +# non-string dict keys). +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_top_level_set_routes_through_normalization_error() -> None: + # Given: a payload whose top-level type is not supported (a plain set) + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: the evaluator evaluates it + result = await evaluator.evaluate({"abc", "def"}) + + # Then: the failure is classified as normalization_error + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "allow" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("bad_scalar", [float("nan"), float("inf"), float("-inf")]) +async def test_nan_or_infinity_primitive_routes_through_normalization_error( + bad_scalar: float, +) -> None: + # Given: a non-finite float which json.dumps(allow_nan=False) rejects + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: the evaluator evaluates it + result = await evaluator.evaluate(bad_scalar) + + # Then: normalization fails safely through on_error=allow + assert result.matched is False + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "allow" + + +@pytest.mark.asyncio +async def test_empty_dict_payload_yields_no_findings() -> None: + # Given: an empty dict + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: the evaluator evaluates it + result = await evaluator.evaluate({}) + + # Then: it scans successfully with no findings + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert result.metadata["findings"] == [] + assert result.metadata["findings_count"] == 0 + + +@pytest.mark.asyncio +async def test_empty_list_payload_yields_no_findings() -> None: + # Given: an empty list + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: the evaluator evaluates it + result = await evaluator.evaluate([]) + + # Then: it scans successfully with no findings + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "list" + assert result.metadata["findings"] == [] + assert result.metadata["findings_count"] == 0 + + +@pytest.mark.asyncio +async def test_boolean_and_none_dict_keys_are_normalized_as_scalar_strings() -> None: + # Given: a dict keyed by True/False/None alongside a GitHub token under the True key + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + # When: the evaluator normalizes and scans + result = await evaluator.evaluate( + { + True: "ghp_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + False: "safe value", + None: "also safe", + } + ) + + # Then: the True key normalizes to "true" and the finding resolves to /true + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "dict" + assert result.metadata["findings"] == [{"type": "GitHub Token", "json_pointer": "/true"}] + + +@pytest.mark.asyncio +async def test_unsupported_dict_key_type_routes_through_normalization_error() -> None: + # Given: a dict keyed by a tuple (unsupported JSON key type) + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: the evaluator evaluates it + result = await evaluator.evaluate({("a", "b"): "value"}) + + # Then: normalization fails safely + assert result.matched is False + assert result.metadata is not None + assert result.metadata["failure_mode"] == "normalization_error" + assert result.metadata["fallback_action"] == "allow" + + +# --------------------------------------------------------------------------- +# Runtime-side failure paths that aren't exercised elsewhere. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_runtime_info_failure_during_non_none_evaluate_returns_runtime_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given: a get_runtime_info reference inside the evaluator module that always raises + def raise_runtime_error() -> Any: + raise RuntimeError("boom") + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime_info", + raise_runtime_error, + ) + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: a non-None payload is evaluated (the None short-circuit doesn't apply) + result = await evaluator.evaluate("safe content only") + + # Then: the evaluator returns runtime_error with unknown detect-secrets version + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "runtime_error" + assert result.metadata["fallback_action"] == "allow" + assert result.metadata["detect_secrets_version"] == "unknown" + assert "normalized_payload_type" not in result.metadata + + +@pytest.mark.asyncio +async def test_runtime_config_conflict_routes_through_runtime_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given: get_runtime that raises RuntimeConfigConflictError + def raise_conflict(config: Any = None) -> Any: + raise RuntimeConfigConflictError("conflict") + + monkeypatch.setattr( + "agent_control_evaluator_detect_secrets.detect_secrets.evaluator.get_runtime", + raise_conflict, + ) + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: a valid payload is evaluated + result = await evaluator.evaluate("safe content only") + + # Then: the conflict is sanitized to a runtime_error failure + assert result.matched is False + assert result.error is None + assert result.metadata is not None + assert result.metadata["failure_mode"] == "runtime_error" + assert result.metadata["fallback_action"] == "allow" + assert result.metadata["normalized_payload_type"] == "str" + + +# --------------------------------------------------------------------------- +# exclude_lines_regex on structured payloads. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_exclude_lines_regex_on_dict_payload_blanks_matching_line() -> None: + # Given: an evaluator configured to exclude JSON lines that contain "authorization" + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig( + enabled_plugins=["GitHubTokenDetector"], + exclude_lines_regex=[r'"authorization"'], + ) + ) + + # When: a dict carries the secret on the excluded line + result = await evaluator.evaluate( + { + "authorization": "ghp_123456789012345678901234567890123456", + "other": "safe", + } + ) + + # Then: the excluded line is blanked and no finding is surfaced + assert result.matched is False + assert result.metadata is not None + assert result.metadata["findings"] == [] + + +@pytest.mark.asyncio +async def test_exclude_lines_regex_on_dict_payload_preserves_pointers_for_other_findings() -> None: + # Given: a dict where one line matches the exclusion and a DIFFERENT line has a secret + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig( + enabled_plugins=["GitHubTokenDetector"], + exclude_lines_regex=[r'"skip"'], + ) + ) + + # When: the evaluator scans + result = await evaluator.evaluate( + { + "skip": "ghp_abcdefabcdefabcdefabcdefabcdefabcdef", + "keep": "ghp_111111111111111111111111111111111111", + } + ) + + # Then: line-number blanking does not disturb the surviving finding's pointer + assert result.matched is True + assert result.metadata is not None + assert result.metadata["findings"] == [{"type": "GitHub Token", "json_pointer": "/keep"}] + + +@pytest.mark.asyncio +async def test_exclude_lines_regex_does_not_treat_unicode_nel_as_line_break() -> None: + # Given: a structured payload with a secret after U+0085 and an unrelated excluded line + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig( + enabled_plugins=["GitHubTokenDetector"], + exclude_lines_regex=[r'"skip"'], + ) + ) + + # When: exclusions are applied before scanning + result = await evaluator.evaluate( + { + "skip": "safe", + "keep": "prefix\u0085ghp_123456789012345678901234567890123456", + } + ) + + # Then: only actual JSON newlines affect line numbering, so the pointer stays on /keep + assert result.matched is True + assert result.metadata is not None + assert result.metadata["findings"] == [{"type": "GitHub Token", "json_pointer": "/keep"}] + + +# --------------------------------------------------------------------------- +# max_bytes boundary behavior. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_payload_exactly_at_max_bytes_is_accepted() -> None: + # Given: a payload whose UTF-8 byte length exactly equals max_bytes + payload = "a" * 64 + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(max_bytes=64)) + + # When: the evaluator evaluates it + result = await evaluator.evaluate(payload) + + # Then: the scan proceeds without tripping payload_too_large + assert result.metadata is not None + assert result.metadata.get("failure_mode") is None + assert result.metadata["normalized_payload_type"] == "str" + + +@pytest.mark.asyncio +async def test_payload_one_byte_over_max_bytes_is_rejected() -> None: + # Given: a payload one byte over the configured max_bytes + payload = "a" * 65 + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig(max_bytes=64)) + + # When: the evaluator evaluates it + result = await evaluator.evaluate(payload) + + # Then: failure_mode is payload_too_large + assert result.matched is False + assert result.metadata is not None + assert result.metadata["failure_mode"] == "payload_too_large" + assert result.metadata["fallback_action"] == "allow" + + +# --------------------------------------------------------------------------- +# Scan-mapping edge cases and concurrency. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_multi_line_string_preserves_distinct_line_numbers() -> None: + # Given: a multi-line string with GitHub tokens on lines 1 and 3 + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + content = "\n".join( + [ + "first = 'ghp_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'", + "safe middle line", + "third = 'ghp_bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'", + ] + ) + + # When: the evaluator evaluates it + result = await evaluator.evaluate(content) + + # Then: findings carry their respective original line numbers + assert result.matched is True + assert result.metadata is not None + line_numbers = sorted(finding["line_number"] for finding in result.metadata["findings"]) + assert line_numbers == [1, 3] + + +@pytest.mark.asyncio +async def test_list_with_scalar_elements_maps_pointer_to_index() -> None: + # Given: a list whose element at index 1 is a bare secret string + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + + # When: the evaluator scans it + result = await evaluator.evaluate(["safe", "ghp_123456789012345678901234567890123456"]) + + # Then: the finding pointer names the list index + assert result.matched is True + assert result.metadata is not None + assert result.metadata["normalized_payload_type"] == "list" + assert result.metadata["findings"] == [{"type": "GitHub Token", "json_pointer": "/1"}] + + +@pytest.mark.asyncio +async def test_evaluate_is_safe_under_concurrent_calls() -> None: + # Given: a single evaluator instance and several distinct secret-bearing payloads + evaluator = DetectSecretsEvaluator( + DetectSecretsEvaluatorConfig(enabled_plugins=["GitHubTokenDetector"]) + ) + payloads = [ + f"github_token_{index} = 'ghp_{str(index).zfill(2)}3456789012345678901234567890123456'" + for index in range(5) + ] + + # When: many evaluate() calls run in parallel on the cached instance + results = await asyncio.gather(*(evaluator.evaluate(payload) for payload in payloads)) + + # Then: every call produces the correct finding with line_number=1 and no cross-talk + assert all(result.matched for result in results) + assert all( + result.metadata is not None + and result.metadata["findings"] == [{"type": "GitHub Token", "line_number": 1}] + for result in results + ) + + +def test_safe_structured_pointer_returns_none_for_missing_location() -> None: + # Given: an evaluator instance and no location metadata + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When: the pointer helper is invoked without a location + pointer = evaluator._safe_structured_pointer(location=None) + + # Then: the helper returns None so the finding is emitted without a pointer + assert pointer is None + + +# --------------------------------------------------------------------------- +# Additional _key_name_is_secret_like coverage (None, non-identifier keys). +# --------------------------------------------------------------------------- + + +def test_key_name_is_secret_like_returns_false_for_none() -> None: + # Given: an evaluator instance + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When/Then: a None key name is treated as not secret-like + assert evaluator._key_name_is_secret_like(None) is False + + +@pytest.mark.parametrize( + ("key_name", "expected"), + [ + # Starts with digit then letters -> fails IDENTIFIER_LIKE_KEY_PATTERN -> secret-like. + ("12abcd", True), + # Starts with symbol -> fails IDENTIFIER_LIKE_KEY_PATTERN -> secret-like. + ("!bang", True), + # Matches JSON_SCALAR_LIKE_KEY_PATTERN -> not secret-like. + ("true", False), + ("-1.5e10", False), + ], +) +def test_key_name_is_secret_like_for_non_identifier_and_scalar_keys( + key_name: str, expected: bool +) -> None: + # Given: an evaluator instance + evaluator = DetectSecretsEvaluator(DetectSecretsEvaluatorConfig()) + + # When/Then: the heuristic honors the non-identifier and JSON-scalar branches + assert evaluator._key_name_is_secret_like(key_name) is expected + + +# --------------------------------------------------------------------------- +# Config validator edge cases. +# --------------------------------------------------------------------------- + + +def test_explicit_none_enabled_plugins_is_accepted() -> None: + # Given: enabled_plugins explicitly set to None + config = DetectSecretsEvaluatorConfig(enabled_plugins=None) + + # Then: the config is accepted and enabled_plugins stays None + assert config.enabled_plugins is None + + +def test_whitespace_only_enabled_plugin_name_is_rejected() -> None: + # Given: a plugin list containing only whitespace + # When/Then: construction raises a non-empty validation error + with pytest.raises(ValueError, match="non-empty"): + DetectSecretsEvaluatorConfig(enabled_plugins=[" "]) + + +def test_enabled_plugins_strips_whitespace_and_dedups() -> None: + # Given: duplicate and whitespace-padded plugin names + config = DetectSecretsEvaluatorConfig( + enabled_plugins=[" GitHubTokenDetector ", "GitHubTokenDetector"] + ) + + # Then: names are stripped and duplicates removed in first-seen order + assert config.enabled_plugins == ["GitHubTokenDetector"] + + +def test_zero_timeout_ms_is_rejected() -> None: + # Given/When/Then: timeout_ms must be strictly positive + with pytest.raises(ValueError): + DetectSecretsEvaluatorConfig(timeout_ms=0) + + +def test_zero_max_bytes_is_rejected() -> None: + # Given/When/Then: max_bytes must be strictly positive + with pytest.raises(ValueError): + DetectSecretsEvaluatorConfig(max_bytes=0) + + +def test_invalid_on_error_value_is_rejected() -> None: + # Given/When/Then: on_error only accepts "allow" or "deny" + with pytest.raises(ValueError): + DetectSecretsEvaluatorConfig(on_error="maybe") # type: ignore[arg-type] diff --git a/pyproject.toml b/pyproject.toml index 857e35d3..d4bd7afd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ version_toml = [ "evaluators/builtin/pyproject.toml:project.version", "evaluators/contrib/budget/pyproject.toml:project.version", "evaluators/contrib/cisco/pyproject.toml:project.version", + "evaluators/contrib/detect_secrets/pyproject.toml:project.version", "evaluators/contrib/galileo/pyproject.toml:project.version", ] version_source = "tag" diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 036c54e4..844fa64f 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -39,6 +39,7 @@ Repository = "https://github.com/yourusername/agent-control" strands-agents = ["strands-agents>=1.26.0"] google-adk = ["google-adk>=1.0.0"] galileo = ["agent-control-evaluator-galileo>=7.5.0"] +detect_secrets = ["agent-control-evaluator-detect_secrets>=7.5.0"] [dependency-groups] dev = [ @@ -92,3 +93,4 @@ agent-control-telemetry = { workspace = true } agent-control-evaluators = { workspace = true } # For local dev: use local galileo package instead of PyPI agent-control-evaluator-galileo = { path = "../../evaluators/contrib/galileo", editable = true } +agent-control-evaluator-detect_secrets = { path = "../../evaluators/contrib/detect_secrets", editable = true } diff --git a/server/pyproject.toml b/server/pyproject.toml index 6c28a317..c7c80d0a 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -32,6 +32,7 @@ license = {text = "Apache-2.0"} [project.optional-dependencies] galileo = ["agent-control-evaluator-galileo>=7.5.0"] +detect_secrets = ["agent-control-evaluator-detect_secrets>=7.5.0"] [dependency-groups] dev = [ @@ -99,3 +100,4 @@ agent-control-telemetry = { workspace = true } agent-control-evaluators = { workspace = true } # For local dev: use local galileo package instead of PyPI agent-control-evaluator-galileo = { path = "../evaluators/contrib/galileo", editable = true } +agent-control-evaluator-detect_secrets = { path = "../evaluators/contrib/detect_secrets", editable = true }