diff --git a/src/praisonai/praisonai/_approval_spec.py b/src/praisonai/praisonai/_approval_spec.py new file mode 100644 index 000000000..6026ecc5c --- /dev/null +++ b/src/praisonai/praisonai/_approval_spec.py @@ -0,0 +1,140 @@ +""" +Approval specification module - unified approval configuration across CLI, YAML, Python. + +This module provides a single canonical ApprovalSpec dataclass that all three +surfaces (CLI, YAML, Python) normalize into, preventing fragmentation and +ensuring consistent behavior across all entry points. +""" +from dataclasses import dataclass +from typing import Optional, Literal, Union, Dict, Any + +Backend = Literal["console", "slack", "telegram", "discord", "webhook", "http", "agent", "auto", "none"] +ApprovalLevel = Literal["low", "medium", "high", "critical"] + + +def _parse_timeout(timeout_val: Optional[Union[str, int, float]]) -> Optional[float]: + """Parse timeout value to float, handling 'none' case.""" + if timeout_val is None: + return None + if isinstance(timeout_val, str) and timeout_val.lower() == 'none': + return None + try: + return float(timeout_val) + except (ValueError, TypeError): + raise ValueError(f"Invalid timeout value: {timeout_val}") + + +@dataclass(frozen=True) +class ApprovalSpec: + """ + Unified approval specification for CLI, YAML, and Python APIs. + + This replaces the fragmented approval configuration scattered across + multiple fields and provides consistent behavior across all surfaces. + """ + enabled: bool = False + backend: Backend = "console" + approve_all_tools: bool = False + timeout: Optional[float] = None + approve_level: Optional[ApprovalLevel] = None + guardrails: Optional[str] = None + + @classmethod + def from_cli(cls, args) -> "ApprovalSpec": + """ + Create ApprovalSpec from CLI arguments. + + Handles --trust, --approval, --approve-all-tools, --approval-timeout, + --approve-level, and --guardrail flags. + """ + # Determine if approval is enabled from any of the CLI flags + enabled = bool( + getattr(args, 'trust', False) or + getattr(args, 'approval', None) or + getattr(args, 'approve_all_tools', False) or + getattr(args, 'approve_level', None) + ) + + # Determine backend + if getattr(args, 'trust', False): + backend = "auto" # --trust means auto-approve + elif getattr(args, 'approval', None): + backend = args.approval + else: + backend = "console" if enabled else "none" + + return cls( + enabled=enabled, + backend=backend, # type: ignore[arg-type] + approve_all_tools=bool(getattr(args, 'approve_all_tools', False)), + timeout=_parse_timeout(getattr(args, 'approval_timeout', None)), + approve_level=getattr(args, 'approve_level', None), + guardrails=getattr(args, 'guardrail', None), + ) + + @classmethod + def from_yaml(cls, node: Union[None, bool, str, Dict[str, Any]]) -> "ApprovalSpec": + """ + Create ApprovalSpec from YAML approval configuration. + + Accepts: + - None/False: disabled + - True: enabled with console backend + - str: enabled with specified backend + - dict: full configuration + + Validates keys to prevent silent typos. + """ + if node is None or node is False: + return cls(enabled=False, backend="none") + if node is True: + return cls(enabled=True, backend="console") + if isinstance(node, str): + return cls(enabled=True, backend=node) # type: ignore[arg-type] + if isinstance(node, dict): + # Validate allowed keys to catch typos early + allowed = { + "enabled", "backend", "approve_all_tools", "timeout", + "approve_level", "guardrails", + # Legacy aliases for backward compatibility + "backend_name", "all_tools", "approval_timeout" + } + unknown = set(node) - allowed + if unknown: + raise ValueError(f"Unknown approval keys: {sorted(unknown)}. Allowed: {sorted(allowed)}") + + # Handle legacy aliases + backend = node.get("backend") or node.get("backend_name", "console") + if "approve_all_tools" in node: + approve_all_tools = node.get("approve_all_tools") + else: + approve_all_tools = node.get("all_tools", False) + if "timeout" in node: + timeout_val = node.get("timeout") + else: + timeout_val = node.get("approval_timeout") + + return cls( + enabled=node.get("enabled", True), + backend=backend, # type: ignore[arg-type] + approve_all_tools=bool(approve_all_tools), + timeout=_parse_timeout(timeout_val) if timeout_val is not None else None, + approve_level=node.get("approve_level"), + guardrails=node.get("guardrails"), + ) + raise TypeError(f"Unsupported approval node type: {type(node).__name__}") + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for backward compatibility with existing code.""" + result = { + "enabled": self.enabled, + "backend": self.backend, + "approve_all_tools": self.approve_all_tools, + } + if self.timeout is not None: + result["timeout"] = self.timeout + if self.approve_level is not None: + result["approve_level"] = self.approve_level + if self.guardrails is not None: + result["guardrails"] = self.guardrails + return result diff --git a/src/praisonai/praisonai/_async_bridge.py b/src/praisonai/praisonai/_async_bridge.py new file mode 100644 index 000000000..9893a65c5 --- /dev/null +++ b/src/praisonai/praisonai/_async_bridge.py @@ -0,0 +1,61 @@ +""" +Async bridge module - single source of truth for running coroutines synchronously. + +This module provides a safe way to run async functions from sync contexts, +handling nested event loop scenarios without creating a new event loop +on every call (which is expensive and breaks multi-agent workflows). +""" +import asyncio +import threading +from concurrent.futures import Future +from typing import Awaitable, TypeVar + +T = TypeVar("T") + +_loop: asyncio.AbstractEventLoop | None = None +_loop_lock = threading.Lock() + + +def _ensure_background_loop() -> asyncio.AbstractEventLoop: + """Ensure a background event loop exists and return it.""" + global _loop + with _loop_lock: + if _loop is None or _loop.is_closed(): + _loop = asyncio.new_event_loop() + t = threading.Thread(target=_loop.run_forever, name="praisonai-async", daemon=True) + t.start() + return _loop + + +def run_sync(coro: Awaitable[T], *, timeout: float | None = None) -> T: + """ + Run a coroutine synchronously, safe inside a running loop. + + This function automatically detects if there's already a running event loop + and handles the execution appropriately: + - If no loop is running: uses asyncio.run() (fastest path) + - If a loop is running: schedules on background loop (safe path) + + Args: + coro: The coroutine to run + timeout: Maximum time to wait for completion (seconds) + + Returns: + The result of the coroutine + + Raises: + TimeoutError: If timeout is exceeded + Any exception raised by the coroutine + """ + try: + running = asyncio.get_running_loop() + except RuntimeError: + running = None + + if running is None: + # Cheap path: no outer loop, just run. + return asyncio.run(coro) + + # Outer loop exists -> schedule on background loop, do NOT nest asyncio.run. + fut: Future = asyncio.run_coroutine_threadsafe(coro, _ensure_background_loop()) + return fut.result(timeout=timeout) \ No newline at end of file diff --git a/src/praisonai/praisonai/_logging.py b/src/praisonai/praisonai/_logging.py new file mode 100644 index 000000000..91abe89c4 --- /dev/null +++ b/src/praisonai/praisonai/_logging.py @@ -0,0 +1,42 @@ +""" +Logging configuration module - single source of truth for PraisonAI logging. + +This module ensures that: +1. Only the CLI configures the root logger (no library-side mutation) +2. Library code uses namespaced loggers +3. No hot-path basicConfig() calls on every instance creation +4. Embedders keep their own logging configuration intact +""" +import logging +import os + +_PKG_LOGGER = "praisonai" +_configured = False + + +def configure_cli_logging(level: str | int | None = None) -> None: + """ + Configure root logging. Must only be called from the CLI entrypoint. + + Args: + level: Log level (string like 'INFO' or logging constant) + """ + global _configured + if _configured: + return + lvl = level or os.environ.get("LOGLEVEL", "WARNING") + logging.basicConfig(level=lvl, format="%(asctime)s - %(levelname)s - %(message)s") + _configured = True + + +def get_logger(name: str | None = None) -> logging.Logger: + """ + Return a namespaced logger; never touches root logger. + + Args: + name: Optional logger name suffix + + Returns: + A logger with the praisonai namespace + """ + return logging.getLogger(f"{_PKG_LOGGER}.{name}" if name else _PKG_LOGGER) \ No newline at end of file diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 8962a2a03..8579c443a 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -174,13 +174,18 @@ def __init__(self, agent_file, framework, config_list, log_level=None, agent_cal self.agent_yaml = agent_yaml self.tools = tools or [] # Store tool class names as a list self.cli_config = cli_config or {} # Store CLI configuration overrides - self.log_level = log_level or logging.getLogger().getEffectiveLevel() - if self.log_level == logging.NOTSET: - self.log_level = os.environ.get('LOGLEVEL', 'INFO').upper() + # Use namespaced logger - no hot-path basicConfig calls + from ._logging import get_logger + self.logger = get_logger("agents_generator") - logging.basicConfig(level=self.log_level, format='%(asctime)s - %(levelname)s - %(message)s') - self.logger = logging.getLogger(__name__) - self.logger.setLevel(self.log_level) + # Set level if provided, but don't mutate root logger + if log_level: + if isinstance(log_level, str): + self.logger.setLevel(getattr(logging, log_level.upper(), logging.INFO)) + else: + self.logger.setLevel(log_level) + elif os.environ.get('LOGLEVEL'): + self.logger.setLevel(getattr(logging, os.environ.get('LOGLEVEL', 'INFO').upper(), logging.INFO)) # Initialize tool registry (replaces globals() pattern) self.tool_registry = ToolRegistry() @@ -238,26 +243,29 @@ def _merge_cli_config(self, config, cli_config): config['config']['lsp'] = cli_config['lsp'] self.logger.debug(f"CLI override: lsp = {cli_config['lsp']}") - # Handle agent-level overrides (trust, tool_timeout, planning_tools, autonomy, guardrail, approval) - agent_level_fields = ['trust', 'tool_timeout', 'planning_tools', 'autonomy', 'guardrail', 'approval', 'approve_all_tools', 'approval_timeout'] + # Handle agent-level overrides using unified approach + agent_level_fields = ['tool_timeout', 'planning_tools', 'autonomy'] agent_overrides = {k: v for k, v in cli_config.items() if k in agent_level_fields} - # Map CLI field names to YAML field names - field_mappings = { - 'guardrail': 'guardrails', # CLI uses --guardrail, YAML uses guardrails - 'trust': 'approval' # --trust maps to approval=True - } - - # Apply field mappings and special handling - for cli_field in field_mappings: - if cli_field in agent_overrides: - value = agent_overrides.pop(cli_field) - if cli_field == 'trust' and value: - # --trust flag maps to approval=True for auto-approval - agent_overrides['approval'] = True - elif cli_field == 'guardrail': - # --guardrail "description" maps to guardrails config - agent_overrides['guardrails'] = value + # Handle approval configuration using unified spec + approval_fields = ['trust', 'approval', 'approve_all_tools', 'approval_timeout', 'approve_level'] + if any(field in cli_config for field in approval_fields): + from ._approval_spec import ApprovalSpec + + # Create a mock args object for CLI parsing + class MockArgs: + def __init__(self, cli_config): + for field in approval_fields: + setattr(self, field, cli_config.get(field)) + self.guardrail = cli_config.get('guardrail') + + spec = ApprovalSpec.from_cli(MockArgs(cli_config)) + if spec.enabled: + agent_overrides['approval'] = spec.to_dict() + + # Handle guardrail separately + if 'guardrail' in cli_config: + agent_overrides['guardrails'] = cli_config['guardrail'] if agent_overrides: # Apply to all agents in the config @@ -761,9 +769,10 @@ async def run_autogen_v4_async(): # Close the model client await model_client.close() - # Run the async function + # Run the async function using safe bridge + from ._async_bridge import run_sync try: - return asyncio.run(run_autogen_v4_async()) + return run_sync(run_autogen_v4_async()) except Exception as e: self.logger.error(f"Error running AutoGen v0.4: {str(e)}") return f"### AutoGen v0.4 Error ###\n{str(e)}" @@ -1192,39 +1201,27 @@ def _run_praisonai(self, config, topic, tools_dict): stream_enabled = cli_config.get('stream', False) stream_metrics = cli_config.get('stream_metrics', False) - # Reconstruct approval config from potentially scattered settings - approval_val = details.get('approval') - approve_all = details.get('approve_all_tools') - approval_timeout = details.get('approval_timeout') - + # Use unified approval specification approval_config = None - if approval_val is not None or approve_all is not None or approval_timeout is not None: - if isinstance(approval_val, dict): - approval_dict = approval_val - else: - approval_dict = {'backend': approval_val} - - if approve_all is not None: - approval_dict['approve_all_tools'] = approve_all - if approval_timeout is not None: - approval_dict['approval_timeout'] = approval_timeout - + if 'approval' in details: + from ._approval_spec import ApprovalSpec try: - from .cli.features.approval import resolve_approval_config - # Map common YAML fields to resolve_approval_config parameters - approval_config = resolve_approval_config( - backend_name=approval_dict.get('backend') or approval_dict.get('backend_name'), - all_tools=approval_dict.get('approve_all_tools') or approval_dict.get('all_tools', False), - timeout=approval_dict.get('approval_timeout') or approval_dict.get('timeout') + spec = ApprovalSpec.from_yaml(details.get('approval')) + if spec.enabled: + from .cli.features.approval import resolve_approval_config + approval_config = resolve_approval_config( + backend_name=spec.backend, + all_tools=spec.approve_all_tools, + timeout=spec.timeout ) except ImportError: # Fallback: Create ApprovalConfig directly if resolve_approval_config isn't available try: from praisonaiagents.approval.protocols import ApprovalConfig approval_config = ApprovalConfig( - backend=approval_dict.get('backend', None), - all_tools=approval_dict.get('approve_all_tools', approval_dict.get('all_tools', False)), - timeout=approval_dict.get('approval_timeout', approval_dict.get('timeout', 0)) + backend=spec.backend, + all_tools=spec.approve_all_tools, + timeout=spec.timeout ) except ImportError: # Last resort: disable approval for this agent @@ -1353,4 +1350,4 @@ def _run_praisonai(self, config, topic, tools_dict): if AGENTOPS_AVAILABLE: agentops.end_session("Success") - return result \ No newline at end of file + return result diff --git a/src/praisonai/praisonai/bots/_approval_base.py b/src/praisonai/praisonai/bots/_approval_base.py index 9fffa2209..74363b6f5 100644 --- a/src/praisonai/praisonai/bots/_approval_base.py +++ b/src/praisonai/praisonai/bots/_approval_base.py @@ -10,7 +10,6 @@ from __future__ import annotations -import asyncio import logging from typing import Set @@ -125,15 +124,5 @@ async def classify_with_llm( def sync_wrapper(async_fn, timeout: float): """Run *async_fn* (a coroutine) synchronously, handling nested loops.""" - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - - if loop and loop.is_running(): - import concurrent.futures - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - future = pool.submit(asyncio.run, async_fn) - return future.result(timeout=timeout + 10) - else: - return asyncio.run(async_fn) + from .._async_bridge import run_sync + return run_sync(async_fn, timeout=timeout) diff --git a/src/praisonai/praisonai/cli/main.py b/src/praisonai/praisonai/cli/main.py index 9e7865058..802861b25 100644 --- a/src/praisonai/praisonai/cli/main.py +++ b/src/praisonai/praisonai/cli/main.py @@ -221,7 +221,9 @@ def _get_autogen(): import autogen return autogen -logging.basicConfig(level=os.environ.get('LOGLEVEL', 'WARNING') or 'WARNING', format='%(asctime)s - %(levelname)s - %(message)s') +# Configure root logging only at CLI entrypoint +from .._logging import configure_cli_logging +configure_cli_logging(os.environ.get('LOGLEVEL', 'WARNING') or 'WARNING') logging.getLogger('alembic').setLevel(logging.ERROR) logging.getLogger('gradio').setLevel(logging.ERROR) logging.getLogger('gradio').setLevel(os.environ.get('GRADIO_LOGLEVEL', 'WARNING')) diff --git a/src/praisonai/praisonai/endpoints/a2u_server.py b/src/praisonai/praisonai/endpoints/a2u_server.py index 1a7dd0ac2..a1a630064 100644 --- a/src/praisonai/praisonai/endpoints/a2u_server.py +++ b/src/praisonai/praisonai/endpoints/a2u_server.py @@ -175,11 +175,12 @@ def publish_sync(self, event: A2UEvent, stream_name: str = "events") -> int: # Schedule in running loop asyncio.ensure_future(self.publish(event, stream_name)) return len(self._streams.get(stream_name, set())) - else: - return loop.run_until_complete(self.publish(event, stream_name)) except RuntimeError: - # No event loop, create one - return asyncio.run(self.publish(event, stream_name)) + pass + + # Use safe bridge for sync execution + from .._async_bridge import run_sync + return run_sync(self.publish(event, stream_name)) async def get_events( self, diff --git a/src/praisonai/praisonai/integrations/base.py b/src/praisonai/praisonai/integrations/base.py index e19cad620..b0e87de87 100644 --- a/src/praisonai/praisonai/integrations/base.py +++ b/src/praisonai/praisonai/integrations/base.py @@ -297,19 +297,8 @@ def as_tool(self) -> callable: def tool_func(query: str) -> str: """Execute the CLI tool with the given query.""" - # Use asyncio.run() for clean event loop management - # This creates a new event loop, runs the coroutine, and closes it - try: - # Check if we're already in an async context - asyncio.get_running_loop() - # If we're in an async context, use ThreadPoolExecutor to avoid nested loop - import concurrent.futures - with concurrent.futures.ThreadPoolExecutor() as executor: - future = executor.submit(asyncio.run, integration.execute(query)) - return future.result() - except RuntimeError: - # No running loop, safe to use asyncio.run() - return asyncio.run(integration.execute(query)) + from .._async_bridge import run_sync + return run_sync(integration.execute(query)) # Set function metadata for agent tool registration tool_func.__name__ = f"{self.cli_command}_tool" diff --git a/src/praisonai/praisonai/integrations/managed_local.py b/src/praisonai/praisonai/integrations/managed_local.py index 70c324549..fe35debba 100644 --- a/src/praisonai/praisonai/integrations/managed_local.py +++ b/src/praisonai/praisonai/integrations/managed_local.py @@ -351,11 +351,8 @@ def compute_bridged_tool(*args, **kwargs): """Compute-bridged tool wrapper.""" # Auto-provision compute if needed if self._compute_instance_id is None: - try: - loop = asyncio.get_event_loop() - loop.run_until_complete(self.provision_compute()) - except RuntimeError: - asyncio.run(self.provision_compute()) + from .._async_bridge import run_sync + run_sync(self.provision_compute()) if tool_name == "execute_command": # For execute_command, directly route to compute @@ -364,15 +361,10 @@ def compute_bridged_tool(*args, **kwargs): return "Error: No command specified" try: - try: - loop = asyncio.get_event_loop() - result = loop.run_until_complete( - self._compute.execute(self._compute_instance_id, command) - ) - except RuntimeError: - result = asyncio.run( - self._compute.execute(self._compute_instance_id, command) - ) + from .._async_bridge import run_sync + result = run_sync( + self._compute.execute(self._compute_instance_id, command) + ) # Format result similar to local execute_command if result.get("exit_code", 0) == 0: @@ -433,15 +425,10 @@ def _bridge_file_tool(self, tool_name: str, *args, **kwargs) -> str: return f"Error: Unsupported bridged tool: {tool_name}" try: - try: - loop = asyncio.get_event_loop() - result = loop.run_until_complete( - self._compute.execute(self._compute_instance_id, command) - ) - except RuntimeError: - result = asyncio.run( - self._compute.execute(self._compute_instance_id, command) - ) + from .._async_bridge import run_sync + result = run_sync( + self._compute.execute(self._compute_instance_id, command) + ) if result.get("exit_code", 0) == 0: return result.get("stdout", "") @@ -623,30 +610,18 @@ def _install_packages_in_compute(self, pip_pkgs: List[str]) -> None: # Auto-provision compute if not done yet if self._compute_instance_id is None: - try: - import asyncio - loop = asyncio.get_event_loop() - loop.run_until_complete(self.provision_compute()) - except RuntimeError: - # No event loop, create one - asyncio.run(self.provision_compute()) + from .._async_bridge import run_sync + run_sync(self.provision_compute()) pip_cmd = "python -m pip install -q " + " ".join(f'"{pkg}"' for pkg in pip_pkgs) logger.info("[local_managed] installing pip packages in compute: %s", pip_pkgs) try: # Run installation synchronously in compute - import asyncio - try: - loop = asyncio.get_event_loop() - result = loop.run_until_complete( - self._compute.execute(self._compute_instance_id, pip_cmd, timeout=120) - ) - except RuntimeError: - # No event loop, create one - result = asyncio.run( - self._compute.execute(self._compute_instance_id, pip_cmd, timeout=120) - ) + from .._async_bridge import run_sync + result = run_sync( + self._compute.execute(self._compute_instance_id, pip_cmd, timeout=120) + ) if result.get("exit_code", 0) == 0: logger.info("[local_managed] compute pip install completed") diff --git a/src/praisonai/praisonai/integrations/registry.py b/src/praisonai/praisonai/integrations/registry.py index fb5f52426..15bb2d060 100644 --- a/src/praisonai/praisonai/integrations/registry.py +++ b/src/praisonai/praisonai/integrations/registry.py @@ -239,18 +239,5 @@ def get_available_integrations() -> Dict[str, bool]: import asyncio registry = get_registry() - try: - # Try to get existing event loop - loop = asyncio.get_event_loop() - if loop.is_running(): - # We're in an async context, use create_task via thread pool - import concurrent.futures - with concurrent.futures.ThreadPoolExecutor() as executor: - future = executor.submit(asyncio.run, registry.get_available()) - return future.result() - else: - # No running loop, safe to use asyncio.run - return asyncio.run(registry.get_available()) - except RuntimeError: - # No event loop, safe to use asyncio.run - return asyncio.run(registry.get_available()) \ No newline at end of file + from .._async_bridge import run_sync + return run_sync(registry.get_available()) \ No newline at end of file diff --git a/src/praisonai/tests/unit/test_approval_spec.py b/src/praisonai/tests/unit/test_approval_spec.py new file mode 100644 index 000000000..bc038383d --- /dev/null +++ b/src/praisonai/tests/unit/test_approval_spec.py @@ -0,0 +1,17 @@ +from praisonai._approval_spec import ApprovalSpec + + +def test_from_yaml_prefers_primary_keys_over_legacy_aliases(): + spec = ApprovalSpec.from_yaml( + { + "enabled": True, + "backend": "console", + "approve_all_tools": False, + "all_tools": True, + "timeout": 0, + "approval_timeout": 30, + } + ) + + assert spec.approve_all_tools is False + assert spec.timeout == 0