Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions src/praisonai/praisonai/_approval_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
Approval specification module - unified approval configuration across CLI, YAML, Python.

This module provides a single canonical ApprovalSpec dataclass that all three
surfaces (CLI, YAML, Python) normalize into, preventing fragmentation and
ensuring consistent behavior across all entry points.
"""
from dataclasses import dataclass
from typing import Optional, Literal, Union, Dict, Any

Backend = Literal["console", "slack", "telegram", "discord", "webhook", "http", "agent", "auto", "none"]
ApprovalLevel = Literal["low", "medium", "high", "critical"]


def _parse_timeout(timeout_val: Optional[Union[str, int, float]]) -> Optional[float]:
"""Parse timeout value to float, handling 'none' case."""
if timeout_val is None:
return None
if isinstance(timeout_val, str) and timeout_val.lower() == 'none':
return None
try:
return float(timeout_val)
except (ValueError, TypeError):
raise ValueError(f"Invalid timeout value: {timeout_val}")
Comment on lines +15 to +24
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟑 Minor

Validate timeout bounds and preserve the parse failure cause.

float() accepts values like nan, inf, and negative numbers, which are not valid wait durations for this config. Also address Ruff B904 by chaining the parse error.

πŸ›‘οΈ Proposed fix
+import math
@@
 def _parse_timeout(timeout_val: Optional[Union[str, int, float]]) -> Optional[float]:
     """Parse timeout value to float, handling 'none' case."""
     if timeout_val is None:
         return None
     if isinstance(timeout_val, str) and timeout_val.lower() == 'none':
         return None
     try:
-        return float(timeout_val)
-    except (ValueError, TypeError):
-        raise ValueError(f"Invalid timeout value: {timeout_val}")
+        timeout = float(timeout_val)
+    except (ValueError, TypeError) as err:
+        raise ValueError(f"Invalid timeout value: {timeout_val}") from err
+    if not math.isfinite(timeout) or timeout < 0:
+        raise ValueError(f"Invalid timeout value: {timeout_val}")
+    return timeout
🧰 Tools
πŸͺ› Ruff (0.15.10)

[warning] 24-24: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/_approval_spec.py` around lines 15 - 24,
_parsΠ΅_timeout currently converts inputs with float() but allows nan/inf and
negatives and loses the original parse error; update _parse_timeout to (1) try
converting with float(timeout_val) capturing the exception as e and re-raise a
ValueError using "from e" to preserve the cause, (2) validate the resulting
value with math.isfinite(value) and value > 0 (reject nan/inf/negative/zero) and
raise a ValueError with a clear message if invalid, and (3) ensure math is
imported if not already; keep the existing handling for None and the string
'none'.



@dataclass(frozen=True)
class ApprovalSpec:
"""
Unified approval specification for CLI, YAML, and Python APIs.

This replaces the fragmented approval configuration scattered across
multiple fields and provides consistent behavior across all surfaces.
"""
enabled: bool = False
backend: Backend = "console"
approve_all_tools: bool = False
timeout: Optional[float] = None
approve_level: Optional[ApprovalLevel] = None
guardrails: Optional[str] = None

@classmethod
def from_cli(cls, args) -> "ApprovalSpec":
"""
Create ApprovalSpec from CLI arguments.

Handles --trust, --approval, --approve-all-tools, --approval-timeout,
--approve-level, and --guardrail flags.
"""
# Determine if approval is enabled from any of the CLI flags
enabled = bool(
getattr(args, 'trust', False) or
getattr(args, 'approval', None) or
getattr(args, 'approve_all_tools', False) or
getattr(args, 'approve_level', None)
)
Comment on lines +51 to +56
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Include guardrail when deriving CLI-enabled approval.

from_cli() stores guardrails on the spec, but --guardrail alone leaves enabled=False and backend="none", so the guardrail config can be silently ignored.

πŸ› Proposed fix
         enabled = bool(
             getattr(args, 'trust', False) or 
             getattr(args, 'approval', None) or 
             getattr(args, 'approve_all_tools', False) or 
-            getattr(args, 'approve_level', None)
+            getattr(args, 'approve_level', None) or
+            getattr(args, 'guardrail', None)
         )
πŸ“ Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
enabled = bool(
getattr(args, 'trust', False) or
getattr(args, 'approval', None) or
getattr(args, 'approve_all_tools', False) or
getattr(args, 'approve_level', None)
)
enabled = bool(
getattr(args, 'trust', False) or
getattr(args, 'approval', None) or
getattr(args, 'approve_all_tools', False) or
getattr(args, 'approve_level', None) or
getattr(args, 'guardrail', None)
)
πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/_approval_spec.py` around lines 51 - 56, The enabled
calculation in from_cli() mistakenly omits the CLI --guardrail flag, so
guardrail can be set on the spec but leave enabled=False; update the boolean
expression that sets enabled (the local variable named enabled in
_approval_spec.py) to include getattr(args, 'guardrail', None) (or getattr(args,
'guardrail', False)) alongside 'trust', 'approval', 'approve_all_tools', and
'approve_level' so that providing --guardrail flips enabled true and prevents
the guardrail config from being silently ignored.


# Determine backend
if getattr(args, 'trust', False):
backend = "auto" # --trust means auto-approve
elif getattr(args, 'approval', None):
backend = args.approval
else:
backend = "console" if enabled else "none"

return cls(
enabled=enabled,
backend=backend, # type: ignore[arg-type]
approve_all_tools=bool(getattr(args, 'approve_all_tools', False)),
timeout=_parse_timeout(getattr(args, 'approval_timeout', None)),
approve_level=getattr(args, 'approve_level', None),
guardrails=getattr(args, 'guardrail', None),
Comment on lines +66 to +72
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Runtime-validate canonical enum fields.

Literal only helps static type checkers; YAML strings and CLI args can still pass unsupported backend or approve_level values into ApprovalSpec. Since this is the canonical normalization layer, reject invalid values here instead of relying on downstream failures.

πŸ›‘οΈ Proposed validation
-from typing import Optional, Literal, Union, Dict, Any
+from typing import Optional, Literal, Union, Dict, Any, get_args
@@
 Backend = Literal["console", "slack", "telegram", "discord", "webhook", "http", "agent", "auto", "none"]
 ApprovalLevel = Literal["low", "medium", "high", "critical"]
+_BACKENDS = set(get_args(Backend))
+_APPROVAL_LEVELS = set(get_args(ApprovalLevel))
+
+
+def _validate_backend(value: Any) -> Backend:
+    if value not in _BACKENDS:
+        raise ValueError(f"Invalid approval backend: {value!r}. Allowed: {sorted(_BACKENDS)}")
+    return value
+
+
+def _validate_approval_level(value: Any) -> Optional[ApprovalLevel]:
+    if value is None:
+        return None
+    if value not in _APPROVAL_LEVELS:
+        raise ValueError(f"Invalid approve_level: {value!r}. Allowed: {sorted(_APPROVAL_LEVELS)}")
+    return value
@@
-            backend=backend,  # type: ignore[arg-type]
+            backend=_validate_backend(backend),
@@
-            approve_level=getattr(args, 'approve_level', None),
+            approve_level=_validate_approval_level(getattr(args, 'approve_level', None)),
@@
         if isinstance(node, str):
-            return cls(enabled=True, backend=node)  # type: ignore[arg-type]
+            return cls(enabled=True, backend=_validate_backend(node))
@@
-                backend=backend,  # type: ignore[arg-type]
+                backend=_validate_backend(backend),
@@
-                approve_level=node.get("approve_level"),
+                approve_level=_validate_approval_level(node.get("approve_level")),

Also applies to: 92-117

πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/_approval_spec.py` around lines 66 - 72, The returned
ApprovalSpec currently accepts raw strings for fields like backend,
approve_level, and guardrails; add runtime validation in the factory/constructor
that canonicalizes and rejects unknown enum values (e.g., map/validate against
the allowed members of your Backend/ApproveLevel/Guardrail enums) and raise a
clear ValueError for invalid inputs before calling cls(...). Locate the code
path that builds the spec (the method returning cls(...) and any similar block
around lines 92-117) and replace the direct getattr usage for
backend/approve_level/guardrail with a validation step that converts strings to
enum members (or None) and only passes validated enum values into cls, keeping
timeout parsing via _parse_timeout as-is.

)

@classmethod
def from_yaml(cls, node: Union[None, bool, str, Dict[str, Any]]) -> "ApprovalSpec":
"""
Create ApprovalSpec from YAML approval configuration.

Accepts:
- None/False: disabled
- True: enabled with console backend
- str: enabled with specified backend
- dict: full configuration

Validates keys to prevent silent typos.
"""
if node is None or node is False:
return cls(enabled=False, backend="none")
if node is True:
return cls(enabled=True, backend="console")
if isinstance(node, str):
return cls(enabled=True, backend=node) # type: ignore[arg-type]
if isinstance(node, dict):
# Validate allowed keys to catch typos early
allowed = {
"enabled", "backend", "approve_all_tools", "timeout",
"approve_level", "guardrails",
# Legacy aliases for backward compatibility
"backend_name", "all_tools", "approval_timeout"
}
unknown = set(node) - allowed
if unknown:
raise ValueError(f"Unknown approval keys: {sorted(unknown)}. Allowed: {sorted(allowed)}")

# Handle legacy aliases
backend = node.get("backend") or node.get("backend_name", "console")
if "approve_all_tools" in node:
approve_all_tools = node.get("approve_all_tools")
else:
approve_all_tools = node.get("all_tools", False)
if "timeout" in node:
timeout_val = node.get("timeout")
else:
timeout_val = node.get("approval_timeout")

return cls(
enabled=node.get("enabled", True),
backend=backend, # type: ignore[arg-type]
approve_all_tools=bool(approve_all_tools),
timeout=_parse_timeout(timeout_val) if timeout_val is not None else None,
approve_level=node.get("approve_level"),
guardrails=node.get("guardrails"),
)
raise TypeError(f"Unsupported approval node type: {type(node).__name__}")

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for backward compatibility with existing code."""
result = {
"enabled": self.enabled,
"backend": self.backend,
"approve_all_tools": self.approve_all_tools,
}
if self.timeout is not None:
result["timeout"] = self.timeout
if self.approve_level is not None:
result["approve_level"] = self.approve_level
if self.guardrails is not None:
result["guardrails"] = self.guardrails
return result
61 changes: 61 additions & 0 deletions src/praisonai/praisonai/_async_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Async bridge module - single source of truth for running coroutines synchronously.

This module provides a safe way to run async functions from sync contexts,
handling nested event loop scenarios without creating a new event loop
on every call (which is expensive and breaks multi-agent workflows).
"""
import asyncio
import threading
from concurrent.futures import Future
from typing import Awaitable, TypeVar

T = TypeVar("T")

_loop: asyncio.AbstractEventLoop | None = None
_loop_lock = threading.Lock()


def _ensure_background_loop() -> asyncio.AbstractEventLoop:
"""Ensure a background event loop exists and return it."""
global _loop
with _loop_lock:
if _loop is None or _loop.is_closed():
_loop = asyncio.new_event_loop()
t = threading.Thread(target=_loop.run_forever, name="praisonai-async", daemon=True)
t.start()
return _loop


def run_sync(coro: Awaitable[T], *, timeout: float | None = None) -> T:
"""
Run a coroutine synchronously, safe inside a running loop.

This function automatically detects if there's already a running event loop
and handles the execution appropriately:
- If no loop is running: uses asyncio.run() (fastest path)
- If a loop is running: schedules on background loop (safe path)

Args:
coro: The coroutine to run
timeout: Maximum time to wait for completion (seconds)

Returns:
The result of the coroutine

Raises:
TimeoutError: If timeout is exceeded
Any exception raised by the coroutine
"""
try:
running = asyncio.get_running_loop()
except RuntimeError:
running = None

if running is None:
# Cheap path: no outer loop, just run.
return asyncio.run(coro)

# Outer loop exists -> schedule on background loop, do NOT nest asyncio.run.
fut: Future = asyncio.run_coroutine_threadsafe(coro, _ensure_background_loop())
return fut.result(timeout=timeout)
Comment on lines +55 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Verify run_sync no longer uses per-call asyncio.run(coro)
# and that timeout cancellation is present.
rg -n -C3 'asyncio\.run\(coro\)|run_coroutine_threadsafe|FutureTimeoutError|fut\.cancel\(\)' -- src/praisonai/praisonai/_async_bridge.py

Repository: MervinPraison/PraisonAI

Length of output: 400


Use the long-lived bridge consistently and cancel timed-out work.

Line 57 still creates a fresh event loop per sync call; line 61 can time out while leaving the submitted coroutine running in the background. This reintroduces loop churn and can leak long-running CLI/subprocess work.

πŸ› Proposed fix
-from concurrent.futures import Future
+from concurrent.futures import Future, TimeoutError as FutureTimeoutError
@@
-    if running is None:
-        # Cheap path: no outer loop, just run.
-        return asyncio.run(coro)
-
-    # Outer loop exists -> schedule on background loop, do NOT nest asyncio.run.
-    fut: Future = asyncio.run_coroutine_threadsafe(coro, _ensure_background_loop())
-    return fut.result(timeout=timeout)
+    loop = _ensure_background_loop()
+    if running is loop:
+        raise RuntimeError("run_sync() cannot block the async bridge event loop")
+
+    fut: Future = asyncio.run_coroutine_threadsafe(coro, loop)
+    try:
+        return fut.result(timeout=timeout)
+    except FutureTimeoutError:
+        fut.cancel()
+        raise TimeoutError(f"Coroutine timed out after {timeout}s") from None
πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/_async_bridge.py` around lines 55 - 61, The sync
bridge currently uses asyncio.run(coro) when running is None (creating a fresh
loop) and also returns fut.result(timeout=timeout) without canceling on timeout;
change it to always schedule on the long-lived background loop via
_ensure_background_loop() using asyncio.run_coroutine_threadsafe(coro,
_ensure_background_loop()) instead of asyncio.run(coro), and wrap the
fut.result(timeout=timeout) call to catch concurrent.futures.TimeoutError so you
can cancel the submitted work (call fut.cancel() or post a cancellation to the
background loop) before re-raising; update references in this block (the running
check, asyncio.run, asyncio.run_coroutine_threadsafe, fut, and
_ensure_background_loop) accordingly so no new event loop is created per call
and timed-out tasks are cancelled.

42 changes: 42 additions & 0 deletions src/praisonai/praisonai/_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Logging configuration module - single source of truth for PraisonAI logging.

This module ensures that:
1. Only the CLI configures the root logger (no library-side mutation)
2. Library code uses namespaced loggers
3. No hot-path basicConfig() calls on every instance creation
4. Embedders keep their own logging configuration intact
"""
import logging
import os

_PKG_LOGGER = "praisonai"
_configured = False


def configure_cli_logging(level: str | int | None = None) -> None:
"""
Configure root logging. Must only be called from the CLI entrypoint.

Args:
level: Log level (string like 'INFO' or logging constant)
"""
global _configured
if _configured:
return
lvl = level or os.environ.get("LOGLEVEL", "WARNING")
logging.basicConfig(level=lvl, format="%(asctime)s - %(levelname)s - %(message)s")
_configured = True


def get_logger(name: str | None = None) -> logging.Logger:
"""
Return a namespaced logger; never touches root logger.

Args:
name: Optional logger name suffix

Returns:
A logger with the praisonai namespace
"""
return logging.getLogger(f"{_PKG_LOGGER}.{name}" if name else _PKG_LOGGER)
101 changes: 49 additions & 52 deletions src/praisonai/praisonai/agents_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,18 @@ def __init__(self, agent_file, framework, config_list, log_level=None, agent_cal
self.agent_yaml = agent_yaml
self.tools = tools or [] # Store tool class names as a list
self.cli_config = cli_config or {} # Store CLI configuration overrides
self.log_level = log_level or logging.getLogger().getEffectiveLevel()
if self.log_level == logging.NOTSET:
self.log_level = os.environ.get('LOGLEVEL', 'INFO').upper()
# Use namespaced logger - no hot-path basicConfig calls
from ._logging import get_logger
self.logger = get_logger("agents_generator")

logging.basicConfig(level=self.log_level, format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
self.logger.setLevel(self.log_level)
# Set level if provided, but don't mutate root logger
if log_level:
if isinstance(log_level, str):
self.logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
else:
self.logger.setLevel(log_level)
elif os.environ.get('LOGLEVEL'):
self.logger.setLevel(getattr(logging, os.environ.get('LOGLEVEL', 'INFO').upper(), logging.INFO))

# Initialize tool registry (replaces globals() pattern)
self.tool_registry = ToolRegistry()
Expand Down Expand Up @@ -238,26 +243,29 @@ def _merge_cli_config(self, config, cli_config):
config['config']['lsp'] = cli_config['lsp']
self.logger.debug(f"CLI override: lsp = {cli_config['lsp']}")

# Handle agent-level overrides (trust, tool_timeout, planning_tools, autonomy, guardrail, approval)
agent_level_fields = ['trust', 'tool_timeout', 'planning_tools', 'autonomy', 'guardrail', 'approval', 'approve_all_tools', 'approval_timeout']
# Handle agent-level overrides using unified approach
agent_level_fields = ['tool_timeout', 'planning_tools', 'autonomy']
agent_overrides = {k: v for k, v in cli_config.items() if k in agent_level_fields}

# Map CLI field names to YAML field names
field_mappings = {
'guardrail': 'guardrails', # CLI uses --guardrail, YAML uses guardrails
'trust': 'approval' # --trust maps to approval=True
}

# Apply field mappings and special handling
for cli_field in field_mappings:
if cli_field in agent_overrides:
value = agent_overrides.pop(cli_field)
if cli_field == 'trust' and value:
# --trust flag maps to approval=True for auto-approval
agent_overrides['approval'] = True
elif cli_field == 'guardrail':
# --guardrail "description" maps to guardrails config
agent_overrides['guardrails'] = value
# Handle approval configuration using unified spec
approval_fields = ['trust', 'approval', 'approve_all_tools', 'approval_timeout', 'approve_level']
if any(field in cli_config for field in approval_fields):
from ._approval_spec import ApprovalSpec

# Create a mock args object for CLI parsing
class MockArgs:
def __init__(self, cli_config):
for field in approval_fields:
setattr(self, field, cli_config.get(field))
self.guardrail = cli_config.get('guardrail')

spec = ApprovalSpec.from_cli(MockArgs(cli_config))
if spec.enabled:
agent_overrides['approval'] = spec.to_dict()

# Handle guardrail separately
if 'guardrail' in cli_config:
agent_overrides['guardrails'] = cli_config['guardrail']

if agent_overrides:
# Apply to all agents in the config
Expand Down Expand Up @@ -761,9 +769,10 @@ async def run_autogen_v4_async():
# Close the model client
await model_client.close()

# Run the async function
# Run the async function using safe bridge
from ._async_bridge import run_sync
try:
return asyncio.run(run_autogen_v4_async())
return run_sync(run_autogen_v4_async())
except Exception as e:
self.logger.error(f"Error running AutoGen v0.4: {str(e)}")
return f"### AutoGen v0.4 Error ###\n{str(e)}"
Expand Down Expand Up @@ -1192,39 +1201,27 @@ def _run_praisonai(self, config, topic, tools_dict):
stream_enabled = cli_config.get('stream', False)
stream_metrics = cli_config.get('stream_metrics', False)

# Reconstruct approval config from potentially scattered settings
approval_val = details.get('approval')
approve_all = details.get('approve_all_tools')
approval_timeout = details.get('approval_timeout')

# Use unified approval specification
approval_config = None
if approval_val is not None or approve_all is not None or approval_timeout is not None:
if isinstance(approval_val, dict):
approval_dict = approval_val
else:
approval_dict = {'backend': approval_val}

if approve_all is not None:
approval_dict['approve_all_tools'] = approve_all
if approval_timeout is not None:
approval_dict['approval_timeout'] = approval_timeout

if 'approval' in details:
from ._approval_spec import ApprovalSpec
try:
from .cli.features.approval import resolve_approval_config
# Map common YAML fields to resolve_approval_config parameters
approval_config = resolve_approval_config(
backend_name=approval_dict.get('backend') or approval_dict.get('backend_name'),
all_tools=approval_dict.get('approve_all_tools') or approval_dict.get('all_tools', False),
timeout=approval_dict.get('approval_timeout') or approval_dict.get('timeout')
spec = ApprovalSpec.from_yaml(details.get('approval'))
if spec.enabled:
from .cli.features.approval import resolve_approval_config
approval_config = resolve_approval_config(
backend_name=spec.backend,
all_tools=spec.approve_all_tools,
timeout=spec.timeout
)
except ImportError:
# Fallback: Create ApprovalConfig directly if resolve_approval_config isn't available
try:
from praisonaiagents.approval.protocols import ApprovalConfig
approval_config = ApprovalConfig(
backend=approval_dict.get('backend', None),
all_tools=approval_dict.get('approve_all_tools', approval_dict.get('all_tools', False)),
timeout=approval_dict.get('approval_timeout', approval_dict.get('timeout', 0))
backend=spec.backend,
all_tools=spec.approve_all_tools,
timeout=spec.timeout
)
except ImportError:
# Last resort: disable approval for this agent
Expand Down Expand Up @@ -1353,4 +1350,4 @@ def _run_praisonai(self, config, topic, tools_dict):
if AGENTOPS_AVAILABLE:
agentops.end_session("Success")

return result
return result
Loading