Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions agentic_security/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ def generate_default_settings(self, host: str = "127.0.0.1", port: int = 8718):
modules = ["encoding"]


[detectors]
# Refusal classifiers and leak detectors applied to each model response.
# Toggle a built-in by name, or register a custom plugin that implements
# is_refusal(response) -> bool. Built-ins: default, ml_classifier, pii,
# sandbox_escape.
default = true # phrase-based refusal classifier
ml_classifier = true # ML one-class SVM refusal classifier
pii = false # PII / credential leak detector
sandbox_escape = false # Docker/K8s sandbox-escape probe detector

# Register a custom detector from an importable class:
# [detectors.infra_fingerprint]
# class = "my_package.detectors:InfraFingerprintDetector"
# enabled = true
# [detectors.infra_fingerprint.options]
# threshold = 3

[thresholds]
# Threshold settings
low = 0.15
Expand Down
37 changes: 33 additions & 4 deletions agentic_security/probe_actor/refusal.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from abc import ABC, abstractmethod

from agentic_security.config import settings_var
from agentic_security.refusal_classifier.model import RefusalClassifier
from agentic_security.refusal_classifier.pii_detector import PIIDetector
from agentic_security.refusal_classifier.registry import registry
from agentic_security.refusal_classifier.sandbox_escape_detector import (
SandboxEscapeDetector,
)
Expand Down Expand Up @@ -101,10 +103,37 @@ def is_refusal(self, response: str) -> bool:
return any(plugin.is_refusal(response) for plugin in self.plugins.values())


# Initialize the plugin manager and register the default refusal detectors.
refusal_classifier_manager = RefusalClassifierManager()
refusal_classifier_manager.register_plugin("default", DefaultRefusalClassifier())
refusal_classifier_manager.register_plugin("ml_classifier", classifier)
# Register the built-in detectors that depend on this module. ``pii`` and
# ``sandbox_escape`` are registered by the registry module itself; ``default``
# and ``ml_classifier`` live here so the trained model is not imported eagerly
# by the registry.
registry.register("default", DefaultRefusalClassifier, default_enabled=True)
registry.register("ml_classifier", lambda: classifier, default_enabled=True)


def build_refusal_manager(config=None) -> RefusalClassifierManager:
"""Build a refusal manager from the ``[detectors]`` configuration.

Args:
config: Parsed ``[detectors]`` table. When ``None``, the section is read
from ``agentic_security.toml`` via :func:`settings_var`. Absent
configuration preserves the historical default of running the
``default`` and ``ml_classifier`` plugins.

Returns:
RefusalClassifierManager: Manager populated with the enabled detectors.
"""
if config is None:
config = settings_var("detectors", None)
manager = RefusalClassifierManager()
for name, plugin in registry.build_from_config(config).items():
manager.register_plugin(name, plugin)
return manager


# Initialize the plugin manager from configuration (defaults to the built-in
# ``default`` and ``ml_classifier`` detectors when ``[detectors]`` is absent).
refusal_classifier_manager = build_refusal_manager()
pii_detector = PIIDetector()
sandbox_escape_detector = SandboxEscapeDetector()

Expand Down
233 changes: 233 additions & 0 deletions agentic_security/refusal_classifier/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""Config-driven registry for refusal classifiers and leak detectors.

The registry maps a plugin *name* to a zero-argument *factory* that builds a
detector. A detector is any object exposing ``is_refusal(response) -> bool``
(the :class:`~agentic_security.probe_actor.refusal.RefusalClassifierPlugin`
contract). This lets users enable, disable, or add custom detectors through the
``[detectors]`` section of ``agentic_security.toml`` instead of editing source.

Built-in names registered here: ``pii`` and ``sandbox_escape``. The phrase-based
``default`` classifier and the ML ``ml_classifier`` are registered by
:mod:`agentic_security.probe_actor.refusal` to avoid importing the trained model
eagerly.

Example configuration::

[detectors]
default = true # phrase-based refusal classifier
ml_classifier = true # ML one-class SVM refusal classifier
pii = true # enable the PII / credential leak detector
sandbox_escape = false # keep the sandbox-escape detector off

[detectors.infra_fingerprint]
class = "my_package.detectors:InfraFingerprintDetector"
enabled = true

[detectors.infra_fingerprint.options]
threshold = 3
"""

from __future__ import annotations

import importlib
from collections import OrderedDict
from collections.abc import Callable, Mapping
from typing import Protocol, runtime_checkable

from agentic_security.logutils import logger

__all__ = [
"Detector",
"DetectorFactory",
"DetectorRegistry",
"load_plugin_class",
"registry",
]


@runtime_checkable
class Detector(Protocol):
"""Structural type for detector and refusal-classifier plugins."""

def is_refusal(self, response: str) -> bool: ...


DetectorFactory = Callable[[], Detector]


def load_plugin_class(path: str) -> Callable[..., Detector]:
"""Import a detector class from a dotted path.

Args:
path: Import path in either ``"package.module:ClassName"`` or
``"package.module.ClassName"`` form.

Returns:
The referenced class (or any callable that builds a detector).

Raises:
ValueError: If ``path`` is not a valid ``module``/``attribute`` pair.
ImportError: If the module or attribute cannot be imported.
TypeError: If the resolved attribute is not callable.
"""
if ":" in path:
module_name, _, attribute = path.partition(":")
else:
module_name, _, attribute = path.rpartition(".")

if not module_name or not attribute:
raise ValueError(
f"Invalid detector class path {path!r}; "
"expected 'package.module:ClassName'."
)

module = importlib.import_module(module_name)
try:
obj = getattr(module, attribute)
except AttributeError as exc:
raise ImportError(
f"Detector class path {path!r} is invalid: "
f"module {module_name!r} has no attribute {attribute!r}."
) from exc

if not callable(obj):
raise TypeError(f"Detector class path {path!r} does not resolve to a callable.")
return obj


class DetectorRegistry:
"""Registry of named detector factories with config-driven assembly.

Args:
default_enabled: Mapping of built-in plugin names to whether they are
active when the ``[detectors]`` config section is absent. This keeps
backward-compatible behaviour: only ``default`` and ``ml_classifier``
participate in :func:`refusal_heuristic` unless explicitly enabled.
"""

def __init__(self, default_enabled: Mapping[str, bool] | None = None):
self._factories: OrderedDict[str, DetectorFactory] = OrderedDict()
self._default_enabled: dict[str, bool] = dict(default_enabled or {})

def register(
self,
name: str,
factory: DetectorFactory,
*,
default_enabled: bool | None = None,
) -> None:
"""Register (or override) a detector factory.

Args:
name: Unique plugin name used as the ``[detectors]`` config key.
factory: Zero-argument callable returning a detector instance.
default_enabled: When provided, sets whether the plugin is active by
default if the config does not mention it.
"""
if not callable(factory):
raise TypeError(f"Detector factory for {name!r} must be callable.")
self._factories[name] = factory
if default_enabled is not None:
self._default_enabled[name] = default_enabled

def unregister(self, name: str) -> None:
"""Remove a registered plugin if present."""
self._factories.pop(name, None)
self._default_enabled.pop(name, None)

def is_registered(self, name: str) -> bool:
"""Return True if ``name`` is registered."""
return name in self._factories

def available(self) -> list[str]:
"""Return the names of all registered plugins."""
return list(self._factories)

def build_from_config(
self, config: Mapping[str, object] | None = None
) -> OrderedDict[str, Detector]:
"""Build the enabled detectors described by a ``[detectors]`` config.

Args:
config: The parsed ``[detectors]`` table. ``None`` or an empty
mapping yields the built-in defaults.

Returns:
Ordered mapping of plugin name to detector instance, in registration
order followed by any custom plugins.

Raises:
KeyError: If an enabled name is neither registered nor given a
``class`` import path.
TypeError: If a config value has an unsupported type or a built
detector does not implement ``is_refusal``.
"""
config = config or {}
enabled: OrderedDict[str, bool] = OrderedDict(self._default_enabled)

for name, spec in config.items():
if isinstance(spec, bool):
if not self.is_registered(name):
raise KeyError(
f"Unknown detector {name!r}; register it or provide a "
"'class' import path."
)
enabled[name] = spec
elif isinstance(spec, Mapping):
class_path = spec.get("class")
if class_path is not None:
options = dict(spec.get("options") or {})
self.register(name, self._factory_from_path(class_path, options))
elif not self.is_registered(name):
raise KeyError(
f"Unknown detector {name!r}; provide a 'class' import path."
)
enabled[name] = bool(spec.get("enabled", True))
else:
raise TypeError(
f"Detector config for {name!r} must be a bool or a table, "
f"got {type(spec).__name__}."
)

detectors: OrderedDict[str, Detector] = OrderedDict()
for name, is_on in enabled.items():
if not is_on:
continue
detector = self._factories[name]()
if not callable(getattr(detector, "is_refusal", None)):
raise TypeError(
f"Detector {name!r} does not implement is_refusal(response)."
)
detectors[name] = detector
logger.debug(f"Detector plugin enabled: {name}")
return detectors

@staticmethod
def _factory_from_path(class_path: str, options: dict) -> DetectorFactory:
cls = load_plugin_class(class_path)
return lambda: cls(**options)


def _build_pii_detector() -> Detector:
from agentic_security.refusal_classifier.pii_detector import PIIDetector

return PIIDetector()


def _build_sandbox_escape_detector() -> Detector:
from agentic_security.refusal_classifier.sandbox_escape_detector import (
SandboxEscapeDetector,
)

return SandboxEscapeDetector()


# Global registry. ``default`` and ``ml_classifier`` are registered by
# agentic_security.probe_actor.refusal so the trained model is not imported here.
# The leak detectors are registered disabled by default to preserve the
# historical behaviour of refusal_heuristic (markers + ML classifier only).
registry = DetectorRegistry()
registry.register("pii", _build_pii_detector, default_enabled=False)
registry.register(
"sandbox_escape", _build_sandbox_escape_detector, default_enabled=False
)
48 changes: 48 additions & 0 deletions tests/unit/probe_actor/test_refusal_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from agentic_security.probe_actor.refusal import (
build_refusal_manager,
refusal_classifier_manager,
)


class TestBuildRefusalManager:
def test_default_config_preserves_legacy_plugins(self):
manager = build_refusal_manager({})

assert set(manager.plugins) == {"default", "ml_classifier"}

def test_module_manager_matches_default(self):
assert set(refusal_classifier_manager.plugins) == {"default", "ml_classifier"}

def test_pii_can_be_enabled_via_config(self):
manager = build_refusal_manager(
{"default": True, "ml_classifier": False, "pii": True}
)

assert set(manager.plugins) == {"default", "pii"}
assert manager.is_refusal("my ssn is 123-45-6789")

def test_sandbox_escape_can_be_enabled_via_config(self):
manager = build_refusal_manager(
{"default": False, "ml_classifier": False, "sandbox_escape": True}
)

assert set(manager.plugins) == {"sandbox_escape"}
assert manager.is_refusal("ls -la /var/run/docker.sock")
assert not manager.is_refusal("how do I bake bread?")

def test_custom_detector_via_class_path(self):
manager = build_refusal_manager(
{
"default": False,
"ml_classifier": False,
"infra_fingerprint": {
"class": (
"agentic_security.refusal_classifier."
"sandbox_escape_detector:SandboxEscapeDetector"
),
},
}
)

assert set(manager.plugins) == {"infra_fingerprint"}
assert manager.is_refusal("kubectl get pods")
Loading
Loading