diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 34250436f0..91811dfa26 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -1216,16 +1216,32 @@ def _inject_date_to_task(self, task: Task) -> None: self._logger.log("warning", f"Failed to inject date: {e!s}") def _validate_docker_installation(self) -> None: - """Check if Docker is installed and running.""" + """Check if Docker is installed and running. + + Validates the subprocess command against the agent's governance + policy before execution (OWASP ASI08: Uncontrolled Code Execution). + """ + from crewai.security.governance import GovernanceError + docker_path = shutil.which("docker") if not docker_path: raise RuntimeError( f"Docker is not installed. Please install Docker to use code execution with agent: {self.role}" ) + command = [str(docker_path), "info"] + + # Validate subprocess command against governance policy + try: + self.security_config.governance.validate_subprocess(command) + except GovernanceError as e: + raise RuntimeError( + f"Governance policy blocked Docker validation for agent '{self.role}': {e}" + ) from e + try: subprocess.run( # noqa: S603 - [str(docker_path), "info"], + command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index 0707f59d60..c87ee883f7 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -962,6 +962,22 @@ def _execute_single_native_tool_call( structured_tool = structured break + # Governance policy check (OWASP ASI02: Tool Misuse & Exploitation) + governance_blocked = False + if self.crew and hasattr(self.crew, "security_config"): + from crewai.security.governance import GovernanceError + + try: + self.crew.security_config.governance.validate_tool( + func_name, args_dict or {} + ) + except GovernanceError as gov_err: + governance_blocked = True + result = ( + f"Tool execution blocked by governance policy. " + f"Tool: {func_name}. Reason: {gov_err.detail}" + ) + hook_blocked = False before_hook_context = ToolCallHookContext( tool_name=func_name, @@ -972,20 +988,23 @@ def _execute_single_native_tool_call( crew=self.crew, ) before_hooks = get_before_tool_call_hooks() - try: - for hook in before_hooks: - hook_result = hook(before_hook_context) - if hook_result is False: - hook_blocked = True - break - except Exception as hook_error: - if self.agent.verbose: - self._printer.print( - content=f"Error in before_tool_call hook: {hook_error}", - color="red", - ) + if not governance_blocked: + try: + for hook in before_hooks: + hook_result = hook(before_hook_context) + if hook_result is False: + hook_blocked = True + break + except Exception as hook_error: + if self.agent.verbose: + self._printer.print( + content=f"Error in before_tool_call hook: {hook_error}", + color="red", + ) - if hook_blocked: + if governance_blocked: + result = result # already set above + elif hook_blocked: result = f"Tool execution blocked by hook. Tool: {func_name}" elif max_usage_reached and original_tool: result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore." diff --git a/lib/crewai/src/crewai/security/__init__.py b/lib/crewai/src/crewai/security/__init__.py index 40ed659331..4d3baedafb 100644 --- a/lib/crewai/src/crewai/security/__init__.py +++ b/lib/crewai/src/crewai/security/__init__.py @@ -8,7 +8,22 @@ """ from crewai.security.fingerprint import Fingerprint +from crewai.security.governance import ( + GovernanceConfig, + GovernanceError, + HttpPolicy, + SubprocessPolicy, + ToolPolicy, +) from crewai.security.security_config import SecurityConfig -__all__ = ["Fingerprint", "SecurityConfig"] +__all__ = [ + "Fingerprint", + "GovernanceConfig", + "GovernanceError", + "HttpPolicy", + "SecurityConfig", + "SubprocessPolicy", + "ToolPolicy", +] diff --git a/lib/crewai/src/crewai/security/governance.py b/lib/crewai/src/crewai/security/governance.py new file mode 100644 index 0000000000..aec3237c5b --- /dev/null +++ b/lib/crewai/src/crewai/security/governance.py @@ -0,0 +1,377 @@ +"""Security Governance Module + +This module provides configurable governance policies for CrewAI to address +ungoverned call sites identified by security audits (OWASP Agentic Top 10). + +Governance policies allow users to define allowlists and blocklists for: +- Subprocess execution (ASI08: Uncontrolled Code Execution) +- HTTP requests (ASI07: Data Leakage & Exfiltration) +- Tool invocations (ASI02: Tool Misuse & Exploitation) + +Each policy validates operations before they execute and raises +GovernanceError when a policy violation is detected. +""" + +from __future__ import annotations + +from collections.abc import Callable +import logging +import re +from typing import Any +from urllib.parse import urlparse + +from pydantic import BaseModel, Field + + +logger = logging.getLogger(__name__) + + +class GovernanceError(Exception): + """Raised when a governance policy blocks an operation. + + Attributes: + category: The governance category that was violated + (e.g., 'subprocess', 'http', 'tool'). + detail: A human-readable description of the violation. + """ + + def __init__(self, category: str, detail: str) -> None: + self.category = category + self.detail = detail + super().__init__(f"[{category}] {detail}") + + +class SubprocessPolicy(BaseModel): + """Policy governing subprocess execution. + + Controls which subprocess commands are allowed or blocked during + agent execution. By default, all commands are allowed unless + explicitly configured. + + Attributes: + allowed_commands: If set, only these command basenames are permitted. + Example: ["docker", "git", "uv"] + blocked_commands: Commands that are always blocked, even if they + appear in allowed_commands. + Example: ["rm", "shutdown"] + allow_shell: Whether shell=True is permitted in subprocess calls. + Defaults to False for security. + custom_validator: Optional callable that receives (command, kwargs) + and returns True to allow or False to block. + """ + + allowed_commands: list[str] | None = Field( + default=None, + description=( + "Allowlist of command basenames. " + "If None, all commands are allowed (unless blocked)." + ), + ) + blocked_commands: list[str] = Field( + default_factory=list, + description="Blocklist of command basenames that are always denied.", + ) + allow_shell: bool = Field( + default=False, + description="Whether shell=True is permitted in subprocess calls.", + ) + custom_validator: Callable[[list[str], dict[str, Any]], bool] | None = Field( + default=None, + exclude=True, + description=( + "Optional callable(command_list, kwargs) -> bool. " + "Return True to allow, False to block." + ), + ) + + model_config = {"arbitrary_types_allowed": True} + + def validate_command( + self, command: list[str], *, shell: bool = False, **kwargs: Any + ) -> None: + """Validate a subprocess command against this policy. + + Args: + command: The command as a list of strings (e.g., ["docker", "info"]). + shell: Whether shell mode is requested. + **kwargs: Additional subprocess keyword arguments. + + Raises: + GovernanceError: If the command violates this policy. + """ + if not command: + raise GovernanceError("subprocess", "Empty command is not allowed.") + + if shell and not self.allow_shell: + raise GovernanceError( + "subprocess", + "shell=True is not permitted by the subprocess policy.", + ) + + cmd_basename = command[0].rsplit("/", 1)[-1] + + if cmd_basename in self.blocked_commands: + raise GovernanceError( + "subprocess", + f"Command '{cmd_basename}' is blocked by policy.", + ) + + if ( + self.allowed_commands is not None + and cmd_basename not in self.allowed_commands + ): + raise GovernanceError( + "subprocess", + f"Command '{cmd_basename}' is not in the allowed commands list: " + f"{self.allowed_commands}.", + ) + + if self.custom_validator is not None: + if not self.custom_validator(command, kwargs): + raise GovernanceError( + "subprocess", + f"Command '{' '.join(command)}' was rejected by custom validator.", + ) + + logger.debug("Subprocess governance: allowed command '%s'", " ".join(command)) + + +class HttpPolicy(BaseModel): + """Policy governing HTTP requests. + + Controls which HTTP endpoints agents are allowed to call. By default, + all requests are allowed unless explicitly configured. + + Attributes: + allowed_domains: If set, only requests to these domains are allowed. + Example: ["api.openai.com", "api.anthropic.com"] + blocked_domains: Domains that are always blocked. + Example: ["evil.example.com"] + allowed_url_patterns: Regex patterns that URLs must match. + Example: [r"https://api\\.openai\\.com/.*"] + custom_validator: Optional callable that receives (url, method, kwargs) + and returns True to allow or False to block. + """ + + allowed_domains: list[str] | None = Field( + default=None, + description=( + "Allowlist of domains. If None, all domains are allowed (unless blocked)." + ), + ) + blocked_domains: list[str] = Field( + default_factory=list, + description="Blocklist of domains that are always denied.", + ) + allowed_url_patterns: list[str] | None = Field( + default=None, + description="Regex patterns that requested URLs must match.", + ) + custom_validator: Callable[[str, str, dict[str, Any]], bool] | None = Field( + default=None, + exclude=True, + description=( + "Optional callable(url, method, kwargs) -> bool. " + "Return True to allow, False to block." + ), + ) + + model_config = {"arbitrary_types_allowed": True} + + def validate_request(self, url: str, method: str = "GET", **kwargs: Any) -> None: + """Validate an HTTP request against this policy. + + Args: + url: The target URL. + method: HTTP method (GET, POST, etc.). + **kwargs: Additional request keyword arguments. + + Raises: + GovernanceError: If the request violates this policy. + """ + parsed = urlparse(url) + domain = parsed.hostname or "" + + if domain in self.blocked_domains: + raise GovernanceError( + "http", + f"Domain '{domain}' is blocked by policy.", + ) + + if self.allowed_domains is not None and domain not in self.allowed_domains: + raise GovernanceError( + "http", + f"Domain '{domain}' is not in the allowed domains list: " + f"{self.allowed_domains}.", + ) + + if self.allowed_url_patterns is not None: + matched = any( + re.match(pattern, url) for pattern in self.allowed_url_patterns + ) + if not matched: + raise GovernanceError( + "http", + f"URL '{url}' does not match any allowed URL pattern.", + ) + + if self.custom_validator is not None: + if not self.custom_validator(url, method, kwargs): + raise GovernanceError( + "http", + f"Request to '{url}' ({method}) was rejected by custom validator.", + ) + + logger.debug("HTTP governance: allowed %s %s", method, url) + + +class ToolPolicy(BaseModel): + """Policy governing tool invocations. + + Controls which tools agents are allowed to use. By default, all tools + are allowed unless explicitly configured. + + Attributes: + allowed_tools: If set, only tools with these names can be invoked. + Example: ["search", "read_file"] + blocked_tools: Tools that are always blocked. + Example: ["delete_database", "execute_code"] + custom_validator: Optional callable that receives + (tool_name, tool_input) and returns True to allow or False to block. + """ + + allowed_tools: list[str] | None = Field( + default=None, + description=( + "Allowlist of tool names. If None, all tools are allowed (unless blocked)." + ), + ) + blocked_tools: list[str] = Field( + default_factory=list, + description="Blocklist of tool names that are always denied.", + ) + custom_validator: Callable[[str, dict[str, Any]], bool] | None = Field( + default=None, + exclude=True, + description=( + "Optional callable(tool_name, tool_input) -> bool. " + "Return True to allow, False to block." + ), + ) + + model_config = {"arbitrary_types_allowed": True} + + def validate_tool( + self, tool_name: str, tool_input: dict[str, Any] | None = None + ) -> None: + """Validate a tool invocation against this policy. + + Args: + tool_name: Name of the tool being invoked. + tool_input: Input arguments for the tool. + + Raises: + GovernanceError: If the tool invocation violates this policy. + """ + if tool_name in self.blocked_tools: + raise GovernanceError( + "tool", + f"Tool '{tool_name}' is blocked by policy.", + ) + + if self.allowed_tools is not None and tool_name not in self.allowed_tools: + raise GovernanceError( + "tool", + f"Tool '{tool_name}' is not in the allowed tools list: " + f"{self.allowed_tools}.", + ) + + if self.custom_validator is not None: + if not self.custom_validator(tool_name, tool_input or {}): + raise GovernanceError( + "tool", + f"Tool '{tool_name}' was rejected by custom validator.", + ) + + logger.debug("Tool governance: allowed tool '%s'", tool_name) + + +class GovernanceConfig(BaseModel): + """Aggregated governance configuration for a CrewAI crew. + + Combines subprocess, HTTP, and tool policies into a single + configuration object that can be attached to a Crew's SecurityConfig. + + Example: + >>> governance = GovernanceConfig( + ... subprocess_policy=SubprocessPolicy( + ... allowed_commands=["docker", "git"], + ... blocked_commands=["rm"], + ... ), + ... http_policy=HttpPolicy( + ... allowed_domains=["api.openai.com"], + ... ), + ... tool_policy=ToolPolicy( + ... blocked_tools=["delete_database"], + ... ), + ... ) + >>> crew = Crew( + ... security_config=SecurityConfig(governance=governance), + ... ... + ... ) + """ + + subprocess_policy: SubprocessPolicy = Field( + default_factory=SubprocessPolicy, + description="Policy for subprocess execution governance.", + ) + http_policy: HttpPolicy = Field( + default_factory=HttpPolicy, + description="Policy for HTTP request governance.", + ) + tool_policy: ToolPolicy = Field( + default_factory=ToolPolicy, + description="Policy for tool invocation governance.", + ) + + def validate_subprocess( + self, command: list[str], *, shell: bool = False, **kwargs: Any + ) -> None: + """Validate a subprocess command. + + Args: + command: The command as a list of strings. + shell: Whether shell mode is requested. + **kwargs: Additional subprocess keyword arguments. + + Raises: + GovernanceError: If the command violates the subprocess policy. + """ + self.subprocess_policy.validate_command(command, shell=shell, **kwargs) + + def validate_http(self, url: str, method: str = "GET", **kwargs: Any) -> None: + """Validate an HTTP request. + + Args: + url: The target URL. + method: HTTP method. + **kwargs: Additional request keyword arguments. + + Raises: + GovernanceError: If the request violates the HTTP policy. + """ + self.http_policy.validate_request(url, method, **kwargs) + + def validate_tool( + self, tool_name: str, tool_input: dict[str, Any] | None = None + ) -> None: + """Validate a tool invocation. + + Args: + tool_name: Name of the tool. + tool_input: Input arguments for the tool. + + Raises: + GovernanceError: If the tool invocation violates the tool policy. + """ + self.tool_policy.validate_tool(tool_name, tool_input) diff --git a/lib/crewai/src/crewai/security/security_config.py b/lib/crewai/src/crewai/security/security_config.py index 8f037a0dbf..b1e0e6ae4b 100644 --- a/lib/crewai/src/crewai/security/security_config.py +++ b/lib/crewai/src/crewai/security/security_config.py @@ -9,12 +9,15 @@ in CrewAI applications. """ +from __future__ import annotations + from typing import Any from pydantic import BaseModel, ConfigDict, Field, field_validator from typing_extensions import Self from crewai.security.fingerprint import Fingerprint +from crewai.security.governance import GovernanceConfig class SecurityConfig(BaseModel): @@ -39,6 +42,13 @@ class SecurityConfig(BaseModel): fingerprint: Fingerprint = Field( default_factory=Fingerprint, description="Unique identifier for the component" ) + governance: GovernanceConfig = Field( + default_factory=GovernanceConfig, + description=( + "Governance policies for controlling subprocess execution, " + "HTTP requests, and tool invocations." + ), + ) @field_validator("fingerprint", mode="before") @classmethod @@ -64,7 +74,10 @@ def to_dict(self) -> dict[str, Any]: Returns: Dictionary representation of the security config """ - return {"fingerprint": self.fingerprint.to_dict()} + return { + "fingerprint": self.fingerprint.to_dict(), + "governance": self.governance.model_dump(), + } @classmethod def from_dict(cls, data: dict[str, Any]) -> Self: @@ -84,4 +97,11 @@ def from_dict(cls, data: dict[str, Any]) -> Self: else Fingerprint() ) - return cls(fingerprint=fingerprint) + governance_data = data.get("governance") + governance = ( + GovernanceConfig(**governance_data) + if governance_data + else GovernanceConfig() + ) + + return cls(fingerprint=fingerprint, governance=governance) diff --git a/lib/crewai/src/crewai/utilities/tool_utils.py b/lib/crewai/src/crewai/utilities/tool_utils.py index 027f136ed6..4a6a8fa17c 100644 --- a/lib/crewai/src/crewai/utilities/tool_utils.py +++ b/lib/crewai/src/crewai/utilities/tool_utils.py @@ -10,6 +10,7 @@ get_before_tool_call_hooks, ) from crewai.security.fingerprint import Fingerprint +from crewai.security.governance import GovernanceError from crewai.tools.structured_tool import CrewStructuredTool from crewai.tools.tool_types import ToolResult from crewai.tools.tool_usage import ToolUsage, ToolUsageError @@ -95,6 +96,21 @@ async def aexecute_tool_and_check_finality( tool = tool_name_to_tool_map.get(sanitized_tool_name) if tool: tool_input = tool_calling.arguments if tool_calling.arguments else {} + + # Governance policy check (OWASP ASI02: Tool Misuse & Exploitation) + if crew and hasattr(crew, "security_config"): + try: + crew.security_config.governance.validate_tool( + sanitized_tool_name, tool_input + ) + except GovernanceError as e: + blocked_message = ( + f"Tool execution blocked by governance policy. " + f"Tool: {tool_calling.tool_name}. Reason: {e.detail}" + ) + logger.log("warning", blocked_message) + return ToolResult(blocked_message, False) + hook_context = ToolCallHookContext( tool_name=tool_calling.tool_name, tool_input=tool_input, @@ -215,6 +231,21 @@ def execute_tool_and_check_finality( tool = tool_name_to_tool_map.get(sanitized_tool_name) if tool: tool_input = tool_calling.arguments if tool_calling.arguments else {} + + # Governance policy check (OWASP ASI02: Tool Misuse & Exploitation) + if crew and hasattr(crew, "security_config"): + try: + crew.security_config.governance.validate_tool( + sanitized_tool_name, tool_input + ) + except GovernanceError as e: + blocked_message = ( + f"Tool execution blocked by governance policy. " + f"Tool: {tool_calling.tool_name}. Reason: {e.detail}" + ) + logger.log("warning", blocked_message) + return ToolResult(blocked_message, False) + hook_context = ToolCallHookContext( tool_name=tool_calling.tool_name, tool_input=tool_input, diff --git a/lib/crewai/tests/security/test_governance.py b/lib/crewai/tests/security/test_governance.py new file mode 100644 index 0000000000..19aedccb2b --- /dev/null +++ b/lib/crewai/tests/security/test_governance.py @@ -0,0 +1,577 @@ +"""Tests for the security governance module. + +Tests cover: +- SubprocessPolicy: command allowlist/blocklist, shell validation, custom validators +- HttpPolicy: domain allowlist/blocklist, URL pattern matching, custom validators +- ToolPolicy: tool allowlist/blocklist, custom validators +- GovernanceConfig: aggregated policy validation +- Integration with SecurityConfig +- Integration with agent subprocess calls +- Integration with tool execution governance +""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from crewai.security import ( + GovernanceConfig, + GovernanceError, + HttpPolicy, + SecurityConfig, + SubprocessPolicy, + ToolPolicy, +) + + +# --------------------------------------------------------------------------- +# SubprocessPolicy tests +# --------------------------------------------------------------------------- + + +class TestSubprocessPolicy: + """Tests for SubprocessPolicy.""" + + def test_default_policy_allows_all(self) -> None: + """Default policy with no restrictions should allow any command.""" + policy = SubprocessPolicy() + # Should not raise + policy.validate_command(["docker", "info"]) + policy.validate_command(["git", "status"]) + policy.validate_command(["python", "-c", "print('hello')"]) + + def test_allowed_commands_allowlist(self) -> None: + """Only commands in the allowlist should be permitted.""" + policy = SubprocessPolicy(allowed_commands=["docker", "git"]) + policy.validate_command(["docker", "info"]) # allowed + policy.validate_command(["git", "status"]) # allowed + + with pytest.raises(GovernanceError, match="not in the allowed commands list"): + policy.validate_command(["rm", "-rf", "/"]) + + def test_blocked_commands_blocklist(self) -> None: + """Blocked commands should always be denied.""" + policy = SubprocessPolicy(blocked_commands=["rm", "shutdown"]) + policy.validate_command(["docker", "info"]) # allowed + + with pytest.raises(GovernanceError, match="blocked by policy"): + policy.validate_command(["rm", "-rf", "/"]) + + with pytest.raises(GovernanceError, match="blocked by policy"): + policy.validate_command(["shutdown", "-h", "now"]) + + def test_blocked_takes_precedence_over_allowed(self) -> None: + """A command in both allowed and blocked should be denied.""" + policy = SubprocessPolicy( + allowed_commands=["docker", "rm"], + blocked_commands=["rm"], + ) + policy.validate_command(["docker", "info"]) # allowed + + with pytest.raises(GovernanceError, match="blocked by policy"): + policy.validate_command(["rm", "-rf", "/"]) + + def test_shell_not_allowed_by_default(self) -> None: + """shell=True should be blocked by default.""" + policy = SubprocessPolicy() + + with pytest.raises(GovernanceError, match="shell=True is not permitted"): + policy.validate_command(["echo", "hello"], shell=True) + + def test_shell_allowed_when_configured(self) -> None: + """shell=True should be allowed when explicitly configured.""" + policy = SubprocessPolicy(allow_shell=True) + # Should not raise + policy.validate_command(["echo", "hello"], shell=True) + + def test_empty_command_rejected(self) -> None: + """Empty command list should be rejected.""" + policy = SubprocessPolicy() + + with pytest.raises(GovernanceError, match="Empty command"): + policy.validate_command([]) + + def test_custom_validator_allows(self) -> None: + """Custom validator returning True should allow execution.""" + validator = MagicMock(return_value=True) + policy = SubprocessPolicy(custom_validator=validator) + policy.validate_command(["docker", "info"]) + validator.assert_called_once() + + def test_custom_validator_blocks(self) -> None: + """Custom validator returning False should block execution.""" + validator = MagicMock(return_value=False) + policy = SubprocessPolicy(custom_validator=validator) + + with pytest.raises(GovernanceError, match="rejected by custom validator"): + policy.validate_command(["docker", "info"]) + + def test_command_basename_extraction(self) -> None: + """Full paths should be reduced to basename for matching.""" + policy = SubprocessPolicy(allowed_commands=["docker"]) + # Should match "docker" even with full path + policy.validate_command(["/usr/bin/docker", "info"]) + + def test_governance_error_attributes(self) -> None: + """GovernanceError should carry category and detail.""" + policy = SubprocessPolicy(blocked_commands=["rm"]) + + with pytest.raises(GovernanceError) as exc_info: + policy.validate_command(["rm", "-rf", "/"]) + + error = exc_info.value + assert error.category == "subprocess" + assert "rm" in error.detail + assert "blocked by policy" in error.detail + + +# --------------------------------------------------------------------------- +# HttpPolicy tests +# --------------------------------------------------------------------------- + + +class TestHttpPolicy: + """Tests for HttpPolicy.""" + + def test_default_policy_allows_all(self) -> None: + """Default policy with no restrictions should allow any URL.""" + policy = HttpPolicy() + policy.validate_request("https://api.openai.com/v1/chat") + policy.validate_request("https://example.com/data", method="POST") + + def test_allowed_domains_allowlist(self) -> None: + """Only requests to allowed domains should be permitted.""" + policy = HttpPolicy(allowed_domains=["api.openai.com", "api.anthropic.com"]) + policy.validate_request("https://api.openai.com/v1/chat") + + with pytest.raises(GovernanceError, match="not in the allowed domains list"): + policy.validate_request("https://evil.example.com/steal") + + def test_blocked_domains_blocklist(self) -> None: + """Blocked domains should always be denied.""" + policy = HttpPolicy(blocked_domains=["evil.example.com"]) + policy.validate_request("https://api.openai.com/v1/chat") + + with pytest.raises(GovernanceError, match="blocked by policy"): + policy.validate_request("https://evil.example.com/steal") + + def test_blocked_takes_precedence_over_allowed(self) -> None: + """A domain in both allowed and blocked should be denied.""" + policy = HttpPolicy( + allowed_domains=["api.openai.com", "evil.example.com"], + blocked_domains=["evil.example.com"], + ) + + with pytest.raises(GovernanceError, match="blocked by policy"): + policy.validate_request("https://evil.example.com/data") + + def test_url_pattern_matching(self) -> None: + """URLs should be validated against regex patterns.""" + policy = HttpPolicy( + allowed_url_patterns=[r"https://api\.openai\.com/.*"] + ) + policy.validate_request("https://api.openai.com/v1/chat") + + with pytest.raises(GovernanceError, match="does not match any allowed URL pattern"): + policy.validate_request("https://evil.com/steal") + + def test_custom_validator_allows(self) -> None: + """Custom validator returning True should allow the request.""" + validator = MagicMock(return_value=True) + policy = HttpPolicy(custom_validator=validator) + policy.validate_request("https://api.openai.com/v1/chat", method="POST") + validator.assert_called_once() + + def test_custom_validator_blocks(self) -> None: + """Custom validator returning False should block the request.""" + validator = MagicMock(return_value=False) + policy = HttpPolicy(custom_validator=validator) + + with pytest.raises(GovernanceError, match="rejected by custom validator"): + policy.validate_request("https://api.openai.com/v1/chat") + + def test_governance_error_attributes(self) -> None: + """GovernanceError should carry category and detail for HTTP.""" + policy = HttpPolicy(blocked_domains=["evil.com"]) + + with pytest.raises(GovernanceError) as exc_info: + policy.validate_request("https://evil.com/data") + + error = exc_info.value + assert error.category == "http" + assert "evil.com" in error.detail + + +# --------------------------------------------------------------------------- +# ToolPolicy tests +# --------------------------------------------------------------------------- + + +class TestToolPolicy: + """Tests for ToolPolicy.""" + + def test_default_policy_allows_all(self) -> None: + """Default policy with no restrictions should allow any tool.""" + policy = ToolPolicy() + policy.validate_tool("search", {"query": "hello"}) + policy.validate_tool("read_file", {"path": "/tmp/test"}) + + def test_allowed_tools_allowlist(self) -> None: + """Only tools in the allowlist should be permitted.""" + policy = ToolPolicy(allowed_tools=["search", "read_file"]) + policy.validate_tool("search", {"query": "hello"}) + + with pytest.raises(GovernanceError, match="not in the allowed tools list"): + policy.validate_tool("delete_database", {}) + + def test_blocked_tools_blocklist(self) -> None: + """Blocked tools should always be denied.""" + policy = ToolPolicy(blocked_tools=["delete_database", "execute_code"]) + policy.validate_tool("search", {"query": "hello"}) + + with pytest.raises(GovernanceError, match="blocked by policy"): + policy.validate_tool("delete_database", {}) + + def test_blocked_takes_precedence_over_allowed(self) -> None: + """A tool in both allowed and blocked should be denied.""" + policy = ToolPolicy( + allowed_tools=["search", "delete_database"], + blocked_tools=["delete_database"], + ) + + with pytest.raises(GovernanceError, match="blocked by policy"): + policy.validate_tool("delete_database", {}) + + def test_custom_validator_allows(self) -> None: + """Custom validator returning True should allow the tool.""" + validator = MagicMock(return_value=True) + policy = ToolPolicy(custom_validator=validator) + policy.validate_tool("search", {"query": "hello"}) + validator.assert_called_once_with("search", {"query": "hello"}) + + def test_custom_validator_blocks(self) -> None: + """Custom validator returning False should block the tool.""" + validator = MagicMock(return_value=False) + policy = ToolPolicy(custom_validator=validator) + + with pytest.raises(GovernanceError, match="rejected by custom validator"): + policy.validate_tool("search", {"query": "hello"}) + + def test_none_input_handled(self) -> None: + """None input should be handled gracefully.""" + policy = ToolPolicy() + policy.validate_tool("search", None) + + def test_governance_error_attributes(self) -> None: + """GovernanceError should carry category and detail for tools.""" + policy = ToolPolicy(blocked_tools=["danger"]) + + with pytest.raises(GovernanceError) as exc_info: + policy.validate_tool("danger", {}) + + error = exc_info.value + assert error.category == "tool" + assert "danger" in error.detail + + +# --------------------------------------------------------------------------- +# GovernanceConfig tests +# --------------------------------------------------------------------------- + + +class TestGovernanceConfig: + """Tests for the aggregated GovernanceConfig.""" + + def test_default_governance_allows_all(self) -> None: + """Default GovernanceConfig should allow all operations.""" + config = GovernanceConfig() + config.validate_subprocess(["docker", "info"]) + config.validate_http("https://api.openai.com/v1/chat") + config.validate_tool("search", {"query": "hello"}) + + def test_governance_with_all_policies(self) -> None: + """GovernanceConfig should enforce all configured policies.""" + config = GovernanceConfig( + subprocess_policy=SubprocessPolicy( + allowed_commands=["docker"], + blocked_commands=["rm"], + ), + http_policy=HttpPolicy( + allowed_domains=["api.openai.com"], + ), + tool_policy=ToolPolicy( + blocked_tools=["delete_database"], + ), + ) + + # Allowed operations + config.validate_subprocess(["docker", "info"]) + config.validate_http("https://api.openai.com/v1/chat") + config.validate_tool("search", {"query": "hello"}) + + # Blocked operations + with pytest.raises(GovernanceError, match="subprocess"): + config.validate_subprocess(["rm", "-rf", "/"]) + + with pytest.raises(GovernanceError, match="http"): + config.validate_http("https://evil.com/steal") + + with pytest.raises(GovernanceError, match="tool"): + config.validate_tool("delete_database", {}) + + def test_governance_serialization(self) -> None: + """GovernanceConfig should be serializable.""" + config = GovernanceConfig( + subprocess_policy=SubprocessPolicy( + allowed_commands=["docker"], + blocked_commands=["rm"], + ), + http_policy=HttpPolicy( + blocked_domains=["evil.com"], + ), + tool_policy=ToolPolicy( + blocked_tools=["danger"], + ), + ) + + data = config.model_dump() + assert data["subprocess_policy"]["allowed_commands"] == ["docker"] + assert data["subprocess_policy"]["blocked_commands"] == ["rm"] + assert data["http_policy"]["blocked_domains"] == ["evil.com"] + assert data["tool_policy"]["blocked_tools"] == ["danger"] + + +# --------------------------------------------------------------------------- +# SecurityConfig integration tests +# --------------------------------------------------------------------------- + + +class TestSecurityConfigGovernance: + """Tests for governance integration in SecurityConfig.""" + + def test_security_config_has_default_governance(self) -> None: + """SecurityConfig should have default governance that allows all.""" + config = SecurityConfig() + assert config.governance is not None + assert isinstance(config.governance, GovernanceConfig) + + # Default governance allows everything + config.governance.validate_subprocess(["docker", "info"]) + config.governance.validate_http("https://api.openai.com/v1/chat") + config.governance.validate_tool("search", {}) + + def test_security_config_with_custom_governance(self) -> None: + """SecurityConfig should accept a custom GovernanceConfig.""" + governance = GovernanceConfig( + subprocess_policy=SubprocessPolicy(blocked_commands=["rm"]), + tool_policy=ToolPolicy(blocked_tools=["danger"]), + ) + config = SecurityConfig(governance=governance) + + with pytest.raises(GovernanceError): + config.governance.validate_subprocess(["rm", "-rf", "/"]) + + with pytest.raises(GovernanceError): + config.governance.validate_tool("danger", {}) + + def test_security_config_to_dict_includes_governance(self) -> None: + """to_dict() should include governance configuration.""" + governance = GovernanceConfig( + subprocess_policy=SubprocessPolicy(blocked_commands=["rm"]), + ) + config = SecurityConfig(governance=governance) + config_dict = config.to_dict() + + assert "governance" in config_dict + assert "subprocess_policy" in config_dict["governance"] + assert config_dict["governance"]["subprocess_policy"]["blocked_commands"] == ["rm"] + + def test_security_config_from_dict_with_governance(self) -> None: + """from_dict() should restore governance configuration.""" + original = SecurityConfig( + governance=GovernanceConfig( + subprocess_policy=SubprocessPolicy(blocked_commands=["rm"]), + http_policy=HttpPolicy(blocked_domains=["evil.com"]), + tool_policy=ToolPolicy(blocked_tools=["danger"]), + ) + ) + + config_dict = original.to_dict() + restored = SecurityConfig.from_dict(config_dict) + + assert restored.governance is not None + assert "rm" in restored.governance.subprocess_policy.blocked_commands + assert "evil.com" in restored.governance.http_policy.blocked_domains + assert "danger" in restored.governance.tool_policy.blocked_tools + + +# --------------------------------------------------------------------------- +# Agent subprocess governance integration tests +# --------------------------------------------------------------------------- + + +class TestAgentSubprocessGovernance: + """Tests for governance enforcement in agent subprocess calls.""" + + def test_validate_docker_blocked_by_governance(self) -> None: + """Agent._validate_docker_installation should respect subprocess governance.""" + from crewai.agent.core import Agent + + agent = Agent( + role="test", + goal="test", + backstory="test", + security_config=SecurityConfig( + governance=GovernanceConfig( + subprocess_policy=SubprocessPolicy( + blocked_commands=["docker"], + ), + ), + ), + ) + + with patch("shutil.which", return_value="/usr/bin/docker"): + with pytest.raises(RuntimeError, match="Governance policy blocked"): + agent._validate_docker_installation() + + def test_validate_docker_allowed_by_governance(self) -> None: + """Agent._validate_docker_installation should pass with permissive governance.""" + from crewai.agent.core import Agent + + agent = Agent( + role="test", + goal="test", + backstory="test", + security_config=SecurityConfig( + governance=GovernanceConfig( + subprocess_policy=SubprocessPolicy( + allowed_commands=["docker"], + ), + ), + ), + ) + + with ( + patch("shutil.which", return_value="/usr/bin/docker"), + patch("subprocess.run") as mock_run, + ): + mock_run.return_value = MagicMock(returncode=0) + # Should not raise + agent._validate_docker_installation() + + def test_validate_docker_not_in_allowlist(self) -> None: + """Subprocess governance should block docker when not in allowlist.""" + from crewai.agent.core import Agent + + agent = Agent( + role="test", + goal="test", + backstory="test", + security_config=SecurityConfig( + governance=GovernanceConfig( + subprocess_policy=SubprocessPolicy( + allowed_commands=["git"], # docker not allowed + ), + ), + ), + ) + + with patch("shutil.which", return_value="/usr/bin/docker"): + with pytest.raises(RuntimeError, match="Governance policy blocked"): + agent._validate_docker_installation() + + +# --------------------------------------------------------------------------- +# Tool governance integration tests +# --------------------------------------------------------------------------- + + +class TestToolGovernanceIntegration: + """Tests for tool governance enforcement in the tool execution path.""" + + def test_tool_blocked_by_governance_sync(self) -> None: + """execute_tool_and_check_finality should block tools per governance.""" + from crewai.agents.parser import AgentAction + from crewai.security.security_config import SecurityConfig + from crewai.utilities.tool_utils import execute_tool_and_check_finality + + # Create a mock crew with governance that blocks 'danger_tool' + mock_crew = MagicMock() + mock_crew.verbose = False + mock_crew.security_config = SecurityConfig( + governance=GovernanceConfig( + tool_policy=ToolPolicy(blocked_tools=["danger_tool"]), + ) + ) + + # Create a mock tool with attributes needed by ToolUsage internals + mock_tool = MagicMock() + mock_tool.name = "Danger Tool" + mock_tool.result_as_answer = False + mock_tool.description = "Danger Tool: A dangerous tool, args: {}" + + agent_action = AgentAction( + tool="danger_tool", + tool_input="{}", + text='Action: Danger Tool\nAction Input: {}', + thought="", + ) + + from crewai.utilities.i18n import get_i18n + + result = execute_tool_and_check_finality( + agent_action=agent_action, + tools=[mock_tool], + i18n=get_i18n(), + crew=mock_crew, + ) + + assert "blocked by governance policy" in result.result + + def test_tool_allowed_by_governance(self) -> None: + """Tools not blocked by governance should proceed normally.""" + governance = GovernanceConfig( + tool_policy=ToolPolicy( + allowed_tools=["search_tool"], + ), + ) + + # Allowed tool should not raise + governance.validate_tool("search_tool", {"query": "test"}) + + # Disallowed tool should raise + with pytest.raises(GovernanceError): + governance.validate_tool("other_tool", {}) + + +# --------------------------------------------------------------------------- +# GovernanceError tests +# --------------------------------------------------------------------------- + + +class TestGovernanceError: + """Tests for the GovernanceError exception.""" + + def test_error_message_format(self) -> None: + """GovernanceError message should include category and detail.""" + error = GovernanceError("subprocess", "Command 'rm' is blocked") + assert str(error) == "[subprocess] Command 'rm' is blocked" + assert error.category == "subprocess" + assert error.detail == "Command 'rm' is blocked" + + def test_error_inherits_from_exception(self) -> None: + """GovernanceError should be catchable as Exception.""" + with pytest.raises(Exception): + raise GovernanceError("test", "test detail") + + def test_error_categories(self) -> None: + """Different governance categories should be properly tracked.""" + subprocess_err = GovernanceError("subprocess", "blocked") + http_err = GovernanceError("http", "blocked") + tool_err = GovernanceError("tool", "blocked") + + assert subprocess_err.category == "subprocess" + assert http_err.category == "http" + assert tool_err.category == "tool"