From dcff27aaa02f1960a7b4b337f13430feea2de624 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 08:39:39 +0000 Subject: [PATCH 1/2] fix(wrapper): resolve 3 architectural gaps in praisonai wrapper layer - Fix dual CLI entry points with semantic drift by making Typer single dispatcher - Fix thread-unsafe module-level lazy state with proper synchronization - Fix closed framework-adapter registry with proper registry pattern and entry points Addresses core architecture violations of protocol-driven core, performance-first, and multi-agent + async safe by default principles. Fixes #1533 Co-authored-by: MervinPraison --- src/praisonai/praisonai/__init__.py | 39 ++-- src/praisonai/praisonai/__main__.py | 167 +++++----------- src/praisonai/praisonai/_async_bridge.py | 23 ++- src/praisonai/praisonai/agents_generator.py | 24 +-- .../praisonai/async_agent_scheduler.py | 22 ++- src/praisonai/praisonai/auto.py | 185 ++++++++---------- .../praisonai/framework_adapters/registry.py | 172 ++++++++++++++++ .../praisonai/integrations/registry.py | 14 +- 8 files changed, 374 insertions(+), 272 deletions(-) create mode 100644 src/praisonai/praisonai/framework_adapters/registry.py diff --git a/src/praisonai/praisonai/__init__.py b/src/praisonai/praisonai/__init__.py index 23ae537cc..d7015a62a 100644 --- a/src/praisonai/praisonai/__init__.py +++ b/src/praisonai/praisonai/__init__.py @@ -27,26 +27,37 @@ 'LocalManagedConfig', ] -# Telemetry initialization state +# Telemetry initialization state - thread-safe +import threading + +_telemetry_lock = threading.Lock() _telemetry_initialized = False def _ensure_telemetry_defaults() -> None: - """Apply telemetry env defaults exactly once, on first observability use.""" + """Apply telemetry env defaults exactly once, on first observability use. + + Thread-safe implementation using double-checked locking pattern. + """ global _telemetry_initialized if _telemetry_initialized: return - import os - langfuse_configured = bool( - os.getenv("LANGFUSE_PUBLIC_KEY") - or os.path.exists(os.path.expanduser("~/.praisonai/langfuse.env")) - ) - if langfuse_configured: - # Explicitly enable OTEL for Langfuse integration - os.environ["OTEL_SDK_DISABLED"] = "false" - else: - os.environ.setdefault("OTEL_SDK_DISABLED", "true") - os.environ.setdefault("EC_TELEMETRY", "false") # respect user overrides - _telemetry_initialized = True + + with _telemetry_lock: + if _telemetry_initialized: + return + + import os + langfuse_configured = bool( + os.getenv("LANGFUSE_PUBLIC_KEY") + or os.path.exists(os.path.expanduser("~/.praisonai/langfuse.env")) + ) + if langfuse_configured: + # Explicitly enable OTEL for Langfuse integration + os.environ["OTEL_SDK_DISABLED"] = "false" + else: + os.environ.setdefault("OTEL_SDK_DISABLED", "true") + os.environ.setdefault("EC_TELEMETRY", "false") # respect user overrides + _telemetry_initialized = True # Lazy loading for heavy imports diff --git a/src/praisonai/praisonai/__main__.py b/src/praisonai/praisonai/__main__.py index 152c0eeb4..eb1cf4ce3 100644 --- a/src/praisonai/praisonai/__main__.py +++ b/src/praisonai/praisonai/__main__.py @@ -3,118 +3,43 @@ PraisonAI CLI — Unified Entry Point. Single entry point for all CLI invocations. -Routes to Typer-based commands for known subcommands, -falls back to legacy argparse for direct prompts and YAML files. +Makes Typer the single dispatcher with narrow legacy shim for bare prompts/YAML. Design: - - Typer-first: all registered commands auto-discovered via Click - - Legacy fallback: prompts, .yaml paths, and deprecated --flags - - No manual command lists needed — adding a Typer command Just Works + - Typer owns all command resolution + - Legacy shim only for bare prompt/YAML invocations via Typer callback + - Fail loud on registration errors - no silent degradation """ import sys -# --------------------------------------------------------------------------- -# Internal helpers -# --------------------------------------------------------------------------- - -_typer_commands_cache = None - - -def _get_typer_commands(): - """Auto-discover registered Typer commands via Click introspection. - - Returns a set of command names that the Typer app knows about. - This is populated from app.py's register_commands() — no manual - lists to maintain. +def _is_legacy_invocation(argv: list[str]) -> bool: + """Check if this is a bare prompt or bare YAML invocation. + + Legacy invocations are: + - Bare YAML file: "agents.yaml" + - Free-text prompt: "Create a weather app" + + All other invocations should be handled by Typer commands. """ - global _typer_commands_cache - if _typer_commands_cache is not None: - return _typer_commands_cache - - try: - from praisonai.cli.app import app, register_commands - register_commands() - - import typer.main - import click - click_app = typer.main.get_command(app) - ctx = click.Context(click_app, info_name="praisonai") - _typer_commands_cache = set(click_app.list_commands(ctx)) - except Exception: - _typer_commands_cache = set() - - return _typer_commands_cache - - -def _find_first_command(argv): - """Find the first non-flag argument in argv. - - Skips global flags (--json, --verbose, etc.) and their values. - Returns the first positional arg, or None if only flags are present. - """ - # Flags that consume a following value - VALUE_FLAGS = {"--output-format", "-o"} - - skip_next = False for arg in argv: - if skip_next: - skip_next = False - continue if arg.startswith("-"): - if arg in VALUE_FLAGS: - skip_next = True continue - return arg # First non-flag arg - return None - - -def _run_typer(argv): - """Dispatch to the Typer CLI app.""" - from praisonai.cli.app import app, register_commands - register_commands() # idempotent - - original = sys.argv - sys.argv = ["praisonai"] + list(argv) - try: - app() - except SystemExit as e: - sys.exit(e.code if isinstance(e.code, int) else 0) - finally: - sys.argv = original + # Check if it's a YAML file or contains spaces (free-text prompt) + return (arg.endswith((".yaml", ".yml")) or + " " in arg or + not arg.isidentifier()) + return False -def _run_legacy(argv): - """Dispatch to the legacy argparse CLI (prompts, YAML, deprecated flags).""" - from praisonai.cli.main import PraisonAI - - original = sys.argv - sys.argv = ["praisonai"] + list(argv) - try: - praison = PraisonAI() - result = praison.main() - code = 0 if result is None else (1 if result is False else 0) - sys.exit(code) - except SystemExit as e: - sys.exit(e.code if isinstance(e.code, int) else 0) - finally: - sys.argv = original - - -# --------------------------------------------------------------------------- -# Main entry point -# --------------------------------------------------------------------------- - def main(): - """Unified CLI entry point — Typer-first, legacy fallback. + """Unified CLI entry point - Typer is the single dispatcher. Routing rules (in order): - 1. --version / -V → print version and exit - 2. --help / -h → Typer help (global or command-level) - 3. No arguments → Typer interactive TUI - 4. First arg is a Typer cmd→ Typer (auto-discovered from app.py) - 5. Everything else → Legacy (prompt, .yaml, deprecated flags) + 1. --version / -V → print version and exit + 2. Legacy invocation → legacy shim (bare prompts/YAML only) + 3. Everything else → Typer (owns all subcommands) """ argv = sys.argv[1:] @@ -124,30 +49,36 @@ def main(): print(f"PraisonAI version {__version__}") return - # 2. Help flags → always Typer (global help or command help) - if "--help" in argv or "-h" in argv: - _run_typer(argv) - return - - # 3. No arguments → Typer (interactive TUI) - if not argv: - _run_typer(argv) + # 2. Check for legacy invocation (bare prompt/YAML) + if _is_legacy_invocation(argv): + from praisonai.cli.main import PraisonAI + original = sys.argv + sys.argv = ["praisonai"] + list(argv) + try: + praison = PraisonAI() + result = praison.main() + code = 0 if result is None else (1 if result is False else 0) + sys.exit(code) + except SystemExit as e: + sys.exit(e.code if isinstance(e.code, int) else 0) + finally: + sys.argv = original return - # 4. Find first non-flag argument and check if it's a Typer command - first_cmd = _find_first_command(argv) - - if first_cmd is None: - # Only flags, no command → Typer handles global flags - _run_typer(argv) - return - - if first_cmd in _get_typer_commands(): - # Known Typer command → Typer - _run_typer(argv) - else: - # Prompt, YAML file, or legacy invocation → legacy - _run_legacy(argv) + # 3. All other invocations → Typer (fail loud on registration errors) + from praisonai.cli.app import app, register_commands + + # CRITICAL: Fail loud - do not swallow registration exceptions + register_commands() # Let any ImportError/other exceptions propagate + + original = sys.argv + sys.argv = ["praisonai"] + list(argv) + try: + app() + except SystemExit as e: + sys.exit(e.code if isinstance(e.code, int) else 0) + finally: + sys.argv = original if __name__ == "__main__": diff --git a/src/praisonai/praisonai/_async_bridge.py b/src/praisonai/praisonai/_async_bridge.py index 25c0da555..a58395ee9 100644 --- a/src/praisonai/praisonai/_async_bridge.py +++ b/src/praisonai/praisonai/_async_bridge.py @@ -34,6 +34,18 @@ def get(self) -> asyncio.AbstractEventLoop: ) self._thread.start() return self._loop + + def get_unlocked(self) -> asyncio.AbstractEventLoop: + """Get loop assuming caller holds _lock. For run_sync() use only.""" + if self._loop is None or self._loop.is_closed(): + self._loop = asyncio.new_event_loop() + self._thread = threading.Thread( + target=self._loop.run_forever, + name="praisonai-async", + daemon=False, + ) + self._thread.start() + return self._loop def shutdown(self, timeout: float = 5.0) -> None: with self._lock: @@ -87,12 +99,11 @@ def run_sync(coro: Awaitable[T], *, timeout: float | None = _DEFAULT_TIMEOUT) -> except RuntimeError: running = False - if not running: - # Reuse the background loop instead of creating a new one per call. - fut: Future = asyncio.run_coroutine_threadsafe(coro, _BG.get()) - return fut.result(timeout=timeout) - - fut = asyncio.run_coroutine_threadsafe(coro, _BG.get()) + # Submit the coroutine inside the lock to prevent shutdown races + with _BG._lock: + loop = _BG.get_unlocked() # get loop while holding lock + fut: Future = asyncio.run_coroutine_threadsafe(coro, loop) + return fut.result(timeout=timeout) diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 573ffacb4..687e89ff4 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -17,10 +17,8 @@ import keyword # Import new architecture components -from .framework_adapters import ( - FrameworkAdapter, CrewAIAdapter, AutoGenAdapter, - AutoGenV4Adapter, AG2Adapter, PraisonAIAdapter -) +from .framework_adapters.base import FrameworkAdapter +from .framework_adapters.registry import FrameworkAdapterRegistry from .tool_registry import ToolRegistry # Import availability flags @@ -51,14 +49,8 @@ except ImportError: pass -# Registry of available adapters (lazy-loaded) -FRAMEWORK_ADAPTERS = { - "crewai": CrewAIAdapter, - "autogen": AutoGenAdapter, - "autogen_v4": AutoGenV4Adapter, - "ag2": AG2Adapter, - "praisonai": PraisonAIAdapter -} +# Framework adapter registry - now uses proper registry pattern +# This replaces the hardcoded FRAMEWORK_ADAPTERS dict # Note: OTEL_SDK_DISABLED moved to CLI entry point per issue requirements @@ -258,12 +250,8 @@ def _get_framework_adapter(self, framework: str) -> FrameworkAdapter: Raises: ValueError: If framework is not supported """ - if framework not in FRAMEWORK_ADAPTERS: - raise ValueError(f"Unsupported framework: {framework}. " - f"Supported frameworks: {list(FRAMEWORK_ADAPTERS.keys())}") - - adapter_class = FRAMEWORK_ADAPTERS[framework] - return adapter_class() + adapter_registry = FrameworkAdapterRegistry.get_instance() + return adapter_registry.create(framework) def _merge_cli_config(self, config, cli_config): """ diff --git a/src/praisonai/praisonai/async_agent_scheduler.py b/src/praisonai/praisonai/async_agent_scheduler.py index cce7f0646..f57f6f79e 100644 --- a/src/praisonai/praisonai/async_agent_scheduler.py +++ b/src/praisonai/praisonai/async_agent_scheduler.py @@ -7,6 +7,7 @@ import asyncio import logging +import threading from datetime import datetime from typing import Optional, Dict, Any, Callable, Union from abc import ABC, abstractmethod @@ -106,16 +107,25 @@ def __init__( self._success_count = 0 self._failure_count = 0 - # Created lazily on first async entry — binds to the caller's loop + # Sync lock for async primitives creation and bound loop tracking + self._primitives_lock = threading.Lock() self._cancel_event: Optional[asyncio.Event] = None self._stats_lock: Optional[asyncio.Lock] = None + self._bound_loop: Optional[asyncio.AbstractEventLoop] = None def _ensure_async_primitives(self) -> None: - """Create async primitives if they don't exist yet.""" - if self._cancel_event is None: - self._cancel_event = asyncio.Event() - if self._stats_lock is None: - self._stats_lock = asyncio.Lock() + """Create async primitives if they don't exist yet. + + Thread-safe and loop-aware: primitives are bound to the current running loop. + If called from a different loop, new primitives are created. + """ + loop = asyncio.get_running_loop() # must be called from a coroutine + + with self._primitives_lock: + if self._bound_loop is not loop: + self._cancel_event = asyncio.Event() + self._stats_lock = asyncio.Lock() + self._bound_loop = loop async def start( self, diff --git a/src/praisonai/praisonai/auto.py b/src/praisonai/praisonai/auto.py index 6b77f28b2..01116c51a 100644 --- a/src/praisonai/praisonai/auto.py +++ b/src/praisonai/praisonai/auto.py @@ -22,150 +22,124 @@ T = TypeVar('T', bound=BaseModel) # ============================================================================= -# LAZY LOADING INFRASTRUCTURE - All heavy imports are deferred +# THREAD-SAFE LAZY LOADING INFRASTRUCTURE - All heavy imports are deferred # ============================================================================= -# Cached availability flags (None = not checked yet) -_crewai_available = None -_autogen_available = None -_autogen_v4_available = None -_praisonai_available = None -_praisonai_tools_available = None -_litellm_available = None -_openai_available = None - -# Cached module/class references -_crewai_classes = None # (Agent, Task, Crew) -_autogen_module = None -_autogen_v4_classes = None # (AssistantAgent, OpenAIChatCompletionClient) -_praisonai_classes = None # (PraisonAgent, PraisonTask, Agents) -_praisonai_tools = None # dict of tool classes -_litellm = None -_openai_client = None +import threading +from functools import lru_cache +# Thread-safe lazy cache for optional dependencies +_optional_lock = threading.Lock() +_optional_cache: dict[str, object] = {} -# --- CrewAI lazy loading --- -def _check_crewai_available() -> bool: - """Check if crewai is available (cached).""" - global _crewai_available - if _crewai_available is None: + +def _load_optional(key: str, loader): + """Thread-safe lazy loading for optional dependencies. + + Args: + key: Unique key for the dependency + loader: Function that imports and returns the dependency + + Returns: + The loaded dependency or None if import fails + """ + if key in _optional_cache: + return _optional_cache[key] + + with _optional_lock: + if key in _optional_cache: + return _optional_cache[key] + try: - import crewai # noqa: F401 - _crewai_available = True + _optional_cache[key] = loader() except ImportError: - _crewai_available = False - return _crewai_available + _optional_cache[key] = None + + return _optional_cache[key] + + +# --- CrewAI lazy loading --- +def _check_crewai_available() -> bool: + """Check if crewai is available (cached, thread-safe).""" + result = _load_optional("crewai_check", lambda: __import__("crewai")) + return result is not None def _get_crewai(): - """Lazy load crewai classes.""" - global _crewai_classes - if _crewai_classes is None: - from crewai import Agent, Task, Crew - _crewai_classes = (Agent, Task, Crew) - return _crewai_classes + """Lazy load crewai classes (thread-safe).""" + return _load_optional("crewai_classes", lambda: ( + __import__("crewai", fromlist=["Agent", "Task", "Crew"]).Agent, + __import__("crewai", fromlist=["Agent", "Task", "Crew"]).Task, + __import__("crewai", fromlist=["Agent", "Task", "Crew"]).Crew, + )) # --- AutoGen lazy loading --- def _check_autogen_available() -> bool: - """Check if autogen v0.2 is available (cached).""" - global _autogen_available - if _autogen_available is None: - try: - import autogen # noqa: F401 - _autogen_available = True - except ImportError: - _autogen_available = False - return _autogen_available + """Check if autogen v0.2 is available (cached, thread-safe).""" + result = _load_optional("autogen_check", lambda: __import__("autogen")) + return result is not None def _check_autogen_v4_available() -> bool: - """Check if autogen v0.4 is available (cached).""" - global _autogen_v4_available - if _autogen_v4_available is None: - try: - from autogen_agentchat.agents import AssistantAgent # noqa: F401 - _autogen_v4_available = True - except ImportError: - _autogen_v4_available = False - return _autogen_v4_available + """Check if autogen v0.4 is available (cached, thread-safe).""" + result = _load_optional("autogen_v4_check", lambda: __import__("autogen_agentchat.agents", fromlist=["AssistantAgent"])) + return result is not None -# --- AG2 lazy loading --- -_ag2_available = None - def _check_ag2_available() -> bool: - """Check if AG2 (community fork of AutoGen) is available (cached).""" - global _ag2_available - if _ag2_available is None: - try: - import importlib.metadata - importlib.metadata.distribution('ag2') - from autogen import LLMConfig # noqa: F401 — AG2-exclusive class - _ag2_available = True - except Exception: - _ag2_available = False - return _ag2_available + """Check if AG2 (community fork of AutoGen) is available (cached, thread-safe).""" + def ag2_loader(): + import importlib.metadata + importlib.metadata.distribution('ag2') + from autogen import LLMConfig # AG2-exclusive class + return True + + result = _load_optional("ag2_check", ag2_loader) + return result is not None def _get_autogen(): - """Lazy load autogen module.""" - global _autogen_module - if _autogen_module is None: - import autogen - _autogen_module = autogen - return _autogen_module + """Lazy load autogen module (thread-safe).""" + return _load_optional("autogen_module", lambda: __import__("autogen")) def _get_autogen_v4(): - """Lazy load autogen v0.4 classes.""" - global _autogen_v4_classes - if _autogen_v4_classes is None: + """Lazy load autogen v0.4 classes (thread-safe).""" + def autogen_v4_loader(): from autogen_agentchat.agents import AssistantAgent from autogen_ext.models.openai import OpenAIChatCompletionClient - _autogen_v4_classes = (AssistantAgent, OpenAIChatCompletionClient) - return _autogen_v4_classes + return (AssistantAgent, OpenAIChatCompletionClient) + + return _load_optional("autogen_v4_classes", autogen_v4_loader) # --- PraisonAI Agents lazy loading --- def _check_praisonai_available() -> bool: - """Check if praisonaiagents is available (cached).""" - global _praisonai_available - if _praisonai_available is None: - try: - import praisonaiagents # noqa: F401 - _praisonai_available = True - except ImportError: - _praisonai_available = False - return _praisonai_available + """Check if praisonaiagents is available (cached, thread-safe).""" + result = _load_optional("praisonai_check", lambda: __import__("praisonaiagents")) + return result is not None def _get_praisonai(): - """Lazy load praisonaiagents classes.""" - global _praisonai_classes - if _praisonai_classes is None: - from praisonaiagents import Agent as PraisonAgent, Task as PraisonTask, AgentTeam - _praisonai_classes = (PraisonAgent, PraisonTask, Agents) - return _praisonai_classes + """Lazy load praisonaiagents classes (thread-safe).""" + def praisonai_loader(): + from praisonaiagents import Agent as PraisonAgent, Task as PraisonTask, AgentTeam as Agents + return (PraisonAgent, PraisonTask, Agents) + + return _load_optional("praisonai_classes", praisonai_loader) # --- PraisonAI Tools lazy loading --- def _check_praisonai_tools_available() -> bool: - """Check if praisonai_tools is available (cached).""" - global _praisonai_tools_available - if _praisonai_tools_available is None: - try: - import praisonai_tools # noqa: F401 - _praisonai_tools_available = True - except ImportError: - _praisonai_tools_available = False - return _praisonai_tools_available + """Check if praisonai_tools is available (cached, thread-safe).""" + result = _load_optional("praisonai_tools_check", lambda: __import__("praisonai_tools")) + return result is not None def _get_praisonai_tools(): - """Lazy load praisonai_tools classes.""" - global _praisonai_tools - if _praisonai_tools is None: + """Lazy load praisonai_tools classes (thread-safe).""" + def tools_loader(): from praisonai_tools import ( CodeDocsSearchTool, CSVSearchTool, DirectorySearchTool, DOCXSearchTool, DirectoryReadTool, FileReadTool, TXTSearchTool, JSONSearchTool, @@ -173,7 +147,7 @@ def _get_praisonai_tools(): ScrapeWebsiteTool, WebsiteSearchTool, XMLSearchTool, YoutubeChannelSearchTool, YoutubeVideoSearchTool ) - _praisonai_tools = { + return { 'CodeDocsSearchTool': CodeDocsSearchTool, 'CSVSearchTool': CSVSearchTool, 'DirectorySearchTool': DirectorySearchTool, @@ -192,7 +166,8 @@ def _get_praisonai_tools(): 'YoutubeChannelSearchTool': YoutubeChannelSearchTool, 'YoutubeVideoSearchTool': YoutubeVideoSearchTool, } - return _praisonai_tools + + return _load_optional("praisonai_tools_dict", tools_loader) # --- LiteLLM lazy loading --- diff --git a/src/praisonai/praisonai/framework_adapters/registry.py b/src/praisonai/praisonai/framework_adapters/registry.py new file mode 100644 index 000000000..115981faa --- /dev/null +++ b/src/praisonai/praisonai/framework_adapters/registry.py @@ -0,0 +1,172 @@ +""" +Framework Adapter Registry for PraisonAI. + +Provides a registry pattern for managing framework adapters with entry points support, +enabling dynamic registration and discovery of framework adapters. +Mirrors the design of integrations/registry.py for consistency. +""" + +from __future__ import annotations + +import threading +from importlib.metadata import entry_points +from typing import Dict, Type, Optional +import logging + +from .base import FrameworkAdapter + +logger = logging.getLogger(__name__) + + +class FrameworkAdapterRegistry: + """ + Registry for framework adapters. + + Provides centralized management of framework adapters with support + for dynamic registration, entry points discovery, and availability checking. + + Uses singleton pattern to ensure consistent state across the application. + """ + + _instance: Optional["FrameworkAdapterRegistry"] = None + _instance_lock = threading.Lock() + + def __init__(self) -> None: + """Initialize the registry with built-in adapters.""" + self._adapters: Dict[str, Type[FrameworkAdapter]] = {} + self._lock = threading.Lock() + self._register_builtin() + self._register_entry_points() + + @classmethod + def get_instance(cls) -> "FrameworkAdapterRegistry": + """ + Get the singleton registry instance. + + Returns: + FrameworkAdapterRegistry: The singleton registry + """ + if cls._instance is None: + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def _register_builtin(self) -> None: + """Register built-in framework adapters with lazy imports.""" + # Lazy, optional imports - mirrors integrations/registry.py pattern + try: + from .crewai_adapter import CrewAIAdapter + self._adapters["crewai"] = CrewAIAdapter + except ImportError: + pass + + try: + from .autogen_adapter import AutoGenAdapter, AutoGenV4Adapter, AG2Adapter + self._adapters["autogen"] = AutoGenAdapter + self._adapters["autogen_v4"] = AutoGenV4Adapter + self._adapters["ag2"] = AG2Adapter + except ImportError: + pass + + try: + from .praisonai_adapter import PraisonAIAdapter + self._adapters["praisonai"] = PraisonAIAdapter + except ImportError: + pass + + def _register_entry_points(self) -> None: + """Register framework adapters from entry points.""" + try: + for ep in entry_points(group="praisonai.framework_adapters"): + try: + adapter_class = ep.load() + self._adapters[ep.name] = adapter_class + except Exception: + # Do not break framework dispatch because one plugin is broken. + # Surface via structured logging instead of swallowing silently. + logger.warning( + "Failed to load framework adapter %r from entry point", + ep.name, + exc_info=True, + ) + except Exception: + # entry_points() might not be available in older Python versions + # or in certain packaging environments + logger.debug("Entry points not available for framework adapters") + + def register(self, name: str, adapter_class: Type[FrameworkAdapter]) -> None: + """ + Register a new framework adapter. + + Args: + name: Unique name for the adapter + adapter_class: The adapter class (must implement FrameworkAdapter protocol) + """ + # Note: We don't enforce strict type checking here since FrameworkAdapter is a Protocol + # and isinstance() doesn't work with Protocols. The runtime will catch typing issues. + with self._lock: + self._adapters[name] = adapter_class + + def unregister(self, name: str) -> bool: + """ + Unregister a framework adapter. + + Args: + name: Name of the adapter to unregister + + Returns: + bool: True if the adapter was found and removed, False otherwise + """ + with self._lock: + return self._adapters.pop(name, None) is not None + + def create(self, name: str) -> FrameworkAdapter: + """ + Create an instance of the specified framework adapter. + + Args: + name: Name of the adapter to create + + Returns: + FrameworkAdapter: Instance of the adapter + + Raises: + ValueError: If the adapter is not found + """ + with self._lock: + adapter_class = self._adapters.get(name) + + if adapter_class is None: + raise ValueError( + f"Unsupported framework: {name}. " + f"Registered: {sorted(self._adapters)}" + ) + + return adapter_class() + + def list_registered(self) -> list[str]: + """ + List all registered framework adapter names. + + Returns: + list[str]: Sorted list of registered adapter names + """ + with self._lock: + return sorted(self._adapters) + + def is_available(self, name: str) -> bool: + """ + Check if a framework adapter is available and functional. + + Args: + name: Name of the adapter to check + + Returns: + bool: True if adapter exists and is available + """ + try: + adapter = self.create(name) + return adapter.is_available() + except (ValueError, Exception): + return False \ No newline at end of file diff --git a/src/praisonai/praisonai/integrations/registry.py b/src/praisonai/praisonai/integrations/registry.py index f664dd5e8..fa54d4c54 100644 --- a/src/praisonai/praisonai/integrations/registry.py +++ b/src/praisonai/praisonai/integrations/registry.py @@ -107,7 +107,9 @@ def register(self, name: str, integration_class: Type[BaseCLIIntegration]) -> No f"Integration class {integration_class.__name__} must inherit from BaseCLIIntegration" ) - self._integrations[name] = integration_class + # Thread-safe registration + with self._instance_lock: + self._integrations[name] = integration_class def unregister(self, name: str) -> bool: """ @@ -119,10 +121,12 @@ def unregister(self, name: str) -> bool: Returns: bool: True if the integration was found and removed, False otherwise """ - if name in self._integrations: - del self._integrations[name] - return True - return False + # Thread-safe unregistration with atomic check-then-delete + with self._instance_lock: + if name in self._integrations: + del self._integrations[name] + return True + return False def create(self, name: str, **kwargs: Any) -> Optional[BaseCLIIntegration]: """ From 83fd376892cc30a4e1cf37e089749c6bcde807d4 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 10:27:13 +0000 Subject: [PATCH 2/2] fix(review): address critical architectural issues from reviewer feedback - Fix CLI routing regression in __main__.py - replace isidentifier() with proper YAML/prompt detection - Fix thread safety races in integrations/registry.py - add proper instance-level locking - Fix critical NameError in auto.py - migrate remaining lazy-loading functions to _load_optional - Fix dead code in _async_bridge.py - remove unused variable, restore running-loop guard - Fix loop safety in async_agent_scheduler.py - add loop consistency check in stop() - Fix exception handling in framework_adapters/registry.py - improve specificity and logging - Remove unused lru_cache import in auto.py Co-authored-by: Mervin Praison --- src/praisonai/praisonai/__main__.py | 25 +++++++---- src/praisonai/praisonai/_async_bridge.py | 20 +++++---- .../praisonai/async_agent_scheduler.py | 18 ++++++++ src/praisonai/praisonai/auto.py | 41 +++++++------------ .../praisonai/framework_adapters/registry.py | 7 +++- .../praisonai/integrations/registry.py | 22 ++++++---- 6 files changed, 80 insertions(+), 53 deletions(-) diff --git a/src/praisonai/praisonai/__main__.py b/src/praisonai/praisonai/__main__.py index eb1cf4ce3..438359fbe 100644 --- a/src/praisonai/praisonai/__main__.py +++ b/src/praisonai/praisonai/__main__.py @@ -18,18 +18,27 @@ def _is_legacy_invocation(argv: list[str]) -> bool: """Check if this is a bare prompt or bare YAML invocation. Legacy invocations are: - - Bare YAML file: "agents.yaml" + - Bare YAML file: "agents.yaml" - Free-text prompt: "Create a weather app" All other invocations should be handled by Typer commands. """ - for arg in argv: - if arg.startswith("-"): - continue - # Check if it's a YAML file or contains spaces (free-text prompt) - return (arg.endswith((".yaml", ".yml")) or - " " in arg or - not arg.isidentifier()) + import os + + # Only the very first positional token is considered; option values never are. + if not argv or argv[0].startswith("-"): + return False + + first = argv[0] + + # Check for free-text prompt (contains spaces) + if " " in first: + return True + + # Check for YAML file that actually exists on disk + if first.endswith((".yaml", ".yml")) and os.path.isfile(first): + return True + return False diff --git a/src/praisonai/praisonai/_async_bridge.py b/src/praisonai/praisonai/_async_bridge.py index a58395ee9..fb4dd09f9 100644 --- a/src/praisonai/praisonai/_async_bridge.py +++ b/src/praisonai/praisonai/_async_bridge.py @@ -75,12 +75,10 @@ async def _cancel_all() -> None: def run_sync(coro: Awaitable[T], *, timeout: float | None = _DEFAULT_TIMEOUT) -> T: """ - Run a coroutine synchronously, safe inside a running loop. + Run a coroutine synchronously using the background loop. - This function automatically detects if there's already a running event loop - and handles the execution appropriately: - - If no loop is running: uses background loop (consistent behavior) - - If a loop is running: schedules on background loop (safe path) + IMPORTANT: This function cannot be called from within a running event loop + as it would cause deadlock. Use 'await coro' directly from async contexts. Args: coro: The coroutine to run @@ -90,20 +88,24 @@ def run_sync(coro: Awaitable[T], *, timeout: float | None = _DEFAULT_TIMEOUT) -> The result of the coroutine Raises: + RuntimeError: If called from within a running event loop TimeoutError: If timeout is exceeded Any exception raised by the coroutine """ try: asyncio.get_running_loop() - running = True except RuntimeError: - running = False + pass + else: + raise RuntimeError( + "run_sync() cannot be called from a running event loop; " + "await the coroutine directly instead." + ) # Submit the coroutine inside the lock to prevent shutdown races with _BG._lock: - loop = _BG.get_unlocked() # get loop while holding lock + loop = _BG.get_unlocked() fut: Future = asyncio.run_coroutine_threadsafe(coro, loop) - return fut.result(timeout=timeout) diff --git a/src/praisonai/praisonai/async_agent_scheduler.py b/src/praisonai/praisonai/async_agent_scheduler.py index f57f6f79e..db96cfbf2 100644 --- a/src/praisonai/praisonai/async_agent_scheduler.py +++ b/src/praisonai/praisonai/async_agent_scheduler.py @@ -183,12 +183,30 @@ async def stop(self) -> bool: """ Stop the scheduler gracefully with proper cancellation. + IMPORTANT: This method must be called from the same event loop + that was used to start the scheduler. + Returns: True if stopped successfully + + Raises: + RuntimeError: If called from a different event loop than start() """ if not self._is_running: logger.info("Scheduler is not running") return True + + # Ensure we're on the same loop that was bound during start() + try: + current_loop = asyncio.get_running_loop() + if self._bound_loop is not None and current_loop is not self._bound_loop: + raise RuntimeError( + "stop() must be called from the same event loop as start(). " + f"Expected: {self._bound_loop}, got: {current_loop}" + ) + except RuntimeError: + # No running loop - this is fine if scheduler was never started + pass logger.info("Stopping async agent scheduler...") self._cancel_event.set() diff --git a/src/praisonai/praisonai/auto.py b/src/praisonai/praisonai/auto.py index 01116c51a..8508c995a 100644 --- a/src/praisonai/praisonai/auto.py +++ b/src/praisonai/praisonai/auto.py @@ -26,7 +26,6 @@ # ============================================================================= import threading -from functools import lru_cache # Thread-safe lazy cache for optional dependencies _optional_lock = threading.Lock() @@ -173,48 +172,38 @@ def tools_loader(): # --- LiteLLM lazy loading --- def _check_litellm_available() -> bool: """Check if litellm is available (cached).""" - global _litellm_available - if _litellm_available is None: - try: - import litellm # noqa: F401 - _litellm_available = True - except ImportError: - _litellm_available = False - return _litellm_available + result = _load_optional("litellm") + return result is not None def _get_litellm(): """Lazy load litellm module.""" - global _litellm - if _litellm is None: - import litellm as _litellm_module - _litellm = _litellm_module - return _litellm + result = _load_optional("litellm") + if result is None: + raise ImportError("Install with: pip install litellm") + return result # --- OpenAI lazy loading --- def _check_openai_available() -> bool: """Check if openai is available (cached).""" - global _openai_available - if _openai_available is None: - try: - import openai # noqa: F401 - _openai_available = True - except ImportError: - _openai_available = False - return _openai_available + result = _load_optional("openai") + return result is not None def _get_openai_client(api_key: str = None, base_url: str = None): """Lazy load OpenAI client.""" - global _openai_client - if _openai_client is None: + def create_openai_client(): from openai import OpenAI - _openai_client = OpenAI( + return OpenAI( api_key=api_key or os.environ.get("OPENAI_API_KEY"), base_url=base_url ) - return _openai_client + + result = _load_optional("openai_client", create_openai_client) + if result is None: + raise ImportError("Install with: pip install openai") + return result _loglevel = os.environ.get('LOGLEVEL', 'INFO').strip().upper() or 'INFO' diff --git a/src/praisonai/praisonai/framework_adapters/registry.py b/src/praisonai/praisonai/framework_adapters/registry.py index 115981faa..9941dc655 100644 --- a/src/praisonai/praisonai/framework_adapters/registry.py +++ b/src/praisonai/praisonai/framework_adapters/registry.py @@ -167,6 +167,11 @@ def is_available(self, name: str) -> bool: """ try: adapter = self.create(name) + except ValueError: + return False + + try: return adapter.is_available() - except (ValueError, Exception): + except Exception: + logger.warning("is_available() raised for adapter %r", name, exc_info=True) return False \ No newline at end of file diff --git a/src/praisonai/praisonai/integrations/registry.py b/src/praisonai/praisonai/integrations/registry.py index fa54d4c54..ab676f864 100644 --- a/src/praisonai/praisonai/integrations/registry.py +++ b/src/praisonai/praisonai/integrations/registry.py @@ -48,6 +48,7 @@ class ExternalAgentRegistry: def __init__(self): """Initialize the registry with built-in integrations.""" self._integrations: Dict[str, Type[BaseCLIIntegration]] = {} + self._lock = threading.Lock() self._register_builtin_integrations() @classmethod @@ -108,7 +109,7 @@ def register(self, name: str, integration_class: Type[BaseCLIIntegration]) -> No ) # Thread-safe registration - with self._instance_lock: + with self._lock: self._integrations[name] = integration_class def unregister(self, name: str) -> bool: @@ -122,11 +123,8 @@ def unregister(self, name: str) -> bool: bool: True if the integration was found and removed, False otherwise """ # Thread-safe unregistration with atomic check-then-delete - with self._instance_lock: - if name in self._integrations: - del self._integrations[name] - return True - return False + with self._lock: + return self._integrations.pop(name, None) is not None def create(self, name: str, **kwargs: Any) -> Optional[BaseCLIIntegration]: """ @@ -139,7 +137,9 @@ def create(self, name: str, **kwargs: Any) -> Optional[BaseCLIIntegration]: Returns: BaseCLIIntegration: Instance of the integration, or None if not found """ - integration_class = self._integrations.get(name) + with self._lock: + integration_class = self._integrations.get(name) + if integration_class is None: return None @@ -152,7 +152,8 @@ def list_registered(self) -> List[str]: Returns: List[str]: List of registered integration names """ - return list(self._integrations.keys()) + with self._lock: + return list(self._integrations.keys()) async def get_available(self) -> Dict[str, bool]: """ @@ -164,7 +165,10 @@ async def get_available(self) -> Dict[str, bool]: import inspect availability = {} - for name, integration_class in self._integrations.items(): + with self._lock: + snapshot = list(self._integrations.items()) + + for name, integration_class in snapshot: try: # Check if constructor requires parameters beyond self sig = inspect.signature(integration_class.__init__)