-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: resolve three core architectural violations in wrapper layer #1552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| # Suppress crewai.cli.config logger BEFORE any imports to prevent INFO log | ||
| import logging | ||
| import threading | ||
| logging.getLogger('crewai.cli.config').setLevel(logging.ERROR) | ||
|
|
||
| # Version is lightweight, import directly | ||
|
|
@@ -35,32 +36,34 @@ | |
| ] | ||
|
|
||
| # Telemetry initialization state | ||
| _telemetry_lock = threading.Lock() | ||
| _telemetry_initialized = False | ||
|
|
||
| def _ensure_telemetry_defaults() -> None: | ||
| """Apply telemetry env defaults exactly once, on first observability use.""" | ||
| global _telemetry_initialized | ||
| if _telemetry_initialized: | ||
| if _telemetry_initialized: # fast path, OK without lock | ||
| return | ||
| import os | ||
| langfuse_configured = bool( | ||
| os.getenv("LANGFUSE_PUBLIC_KEY") | ||
| or os.path.exists(os.path.expanduser("~/.praisonai/langfuse.env")) | ||
| ) | ||
| if langfuse_configured: | ||
| # Explicitly enable OTEL for Langfuse integration | ||
| os.environ["OTEL_SDK_DISABLED"] = "false" | ||
| else: | ||
| os.environ.setdefault("OTEL_SDK_DISABLED", "true") | ||
| os.environ.setdefault("EC_TELEMETRY", "false") # respect user overrides | ||
| _telemetry_initialized = True | ||
| with _telemetry_lock: | ||
| if _telemetry_initialized: | ||
| return | ||
| import os | ||
| # Respect any value the user already set | ||
| if "OTEL_SDK_DISABLED" not in os.environ: | ||
| langfuse_configured = bool( | ||
| os.getenv("LANGFUSE_PUBLIC_KEY") | ||
| or os.path.exists(os.path.expanduser("~/.praisonai/langfuse.env")) | ||
| ) | ||
| os.environ["OTEL_SDK_DISABLED"] = "false" if langfuse_configured else "true" | ||
| os.environ.setdefault("EC_TELEMETRY", "false") # respect user overrides | ||
| _telemetry_initialized = True | ||
|
|
||
|
|
||
| # Lazy loading for heavy imports | ||
| def __getattr__(name): | ||
| """Lazy load heavy modules to improve import time.""" | ||
| # Ensure telemetry defaults before any lazy import that may touch OTEL. | ||
| _ensure_telemetry_defaults() | ||
| # Note: Telemetry initialization moved out of lazy hook to avoid side effects | ||
| # It should be called explicitly from cli.PraisonAI.__init__ instead | ||
|
|
||
|
Comment on lines
42
to
67
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Find every reference to _ensure_telemetry_defaults across the repo
rg -nP -C2 '\b_ensure_telemetry_defaults\b'
# And specifically inside the cli package, where the comment says it should live
fd -t f -e py . src/praisonai/praisonai/cli 2>/dev/null | xargs rg -nP -C2 '\b_ensure_telemetry_defaults\b' || true
# Also verify the unconditional OTEL setdefault in main.py is still present and unconditional
rg -nP -C3 'OTEL_SDK_DISABLED' src/praisonai/praisonaiRepository: MervinPraison/PraisonAI Length of output: 2576 Telemetry init function is dead code — never invoked anywhere in the codebase.
Result: the function's smart telemetry detection never runs in normal flows, and the telemetry environment variable defaults are not applied to non-CLI consumers either. Either:
🤖 Prompt for AI Agents |
||
| if name == 'PraisonAI': | ||
| from .cli import PraisonAI | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,291 +1,19 @@ | ||
| """ | ||
| Agent Scheduler for PraisonAI - Run agents periodically 24/7. | ||
|
|
||
| This module provides scheduling capabilities for running PraisonAI agents | ||
| at regular intervals, enabling 24/7 autonomous agent operations. | ||
| """Backward-compatible re-export. Prefer `praisonai.scheduler`. | ||
|
|
||
| Example: | ||
| # Run news checker every hour | ||
| from praisonai.agent_scheduler import AgentScheduler | ||
| from praisonai_agents import Agent | ||
|
|
||
| agent = Agent( | ||
| name="NewsChecker", | ||
| instructions="Check latest AI news and summarize", | ||
| tools=[search_tool] | ||
| ) | ||
|
|
||
| scheduler = AgentScheduler(agent, task="Check latest AI news") | ||
| scheduler.start(schedule_expr="hourly") | ||
| This module is deprecated. Use the canonical implementation in the | ||
| scheduler package for full functionality including YAML and recipe support. | ||
| """ | ||
|
|
||
| import threading | ||
| import time | ||
| import logging | ||
| from datetime import datetime | ||
| from typing import Optional, Dict, Any, Callable | ||
| from abc import ABC, abstractmethod | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| # Import shared schedule parser | ||
| from .scheduler.shared import ScheduleParser, backoff_delay, safe_call | ||
|
|
||
|
|
||
| class AgentExecutorInterface(ABC): | ||
| """Abstract interface for agent execution.""" | ||
|
|
||
| @abstractmethod | ||
| def execute(self, task: str) -> Any: | ||
| """Execute the agent with given task.""" | ||
| pass | ||
|
|
||
|
|
||
| class PraisonAgentExecutor(AgentExecutorInterface): | ||
| """Executor for PraisonAI agents.""" | ||
|
|
||
| def __init__(self, agent): | ||
| """ | ||
| Initialize executor with a PraisonAI agent. | ||
|
|
||
| Args: | ||
| agent: PraisonAI Agent instance | ||
| """ | ||
| self.agent = agent | ||
|
|
||
| def execute(self, task: str) -> Any: | ||
| """ | ||
| Execute the agent with given task. | ||
|
|
||
| Args: | ||
| task: Task description for the agent | ||
|
|
||
| Returns: | ||
| Agent execution result | ||
| """ | ||
| try: | ||
| result = self.agent.start(task) | ||
| return result | ||
| except Exception as e: | ||
| logger.error(f"Agent execution failed: {e}") | ||
| raise | ||
|
|
||
|
|
||
| class AgentScheduler: | ||
| """ | ||
| Scheduler for running PraisonAI agents periodically. | ||
|
|
||
| Features: | ||
| - Interval-based scheduling (hourly, daily, custom) | ||
| - Thread-safe operation | ||
| - Automatic retry on failure | ||
| - Execution logging and monitoring | ||
| - Graceful shutdown | ||
|
|
||
| Example: | ||
| scheduler = AgentScheduler(agent, task="Check news") | ||
| scheduler.start(schedule_expr="hourly", max_retries=3) | ||
| # Agent runs every hour automatically | ||
| scheduler.stop() # Stop when needed | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| agent, | ||
| task: str, | ||
| config: Optional[Dict[str, Any]] = None, | ||
| on_success: Optional[Callable] = None, | ||
| on_failure: Optional[Callable] = None | ||
| ): | ||
| """ | ||
| Initialize agent scheduler. | ||
|
|
||
| Args: | ||
| agent: PraisonAI Agent instance | ||
| task: Task description to execute | ||
| config: Optional configuration dict | ||
| on_success: Callback function on successful execution | ||
| on_failure: Callback function on failed execution | ||
| """ | ||
| self.agent = agent | ||
| self.task = task | ||
| self.config = config or {} | ||
| self.on_success = on_success | ||
| self.on_failure = on_failure | ||
|
|
||
| self.is_running = False | ||
| self._stop_event = threading.Event() | ||
| self._thread = None | ||
| self._executor = PraisonAgentExecutor(agent) | ||
| self._execution_count = 0 | ||
| self._success_count = 0 | ||
| self._failure_count = 0 | ||
|
|
||
| def start( | ||
| self, | ||
| schedule_expr: str, | ||
| max_retries: int = 3, | ||
| run_immediately: bool = False | ||
| ) -> bool: | ||
| """ | ||
| Start scheduled agent execution. | ||
|
|
||
| Args: | ||
| schedule_expr: Schedule expression (e.g., "hourly", "*/1h", "3600") | ||
| max_retries: Maximum retry attempts on failure | ||
| run_immediately: If True, run agent immediately before starting schedule | ||
|
|
||
| Returns: | ||
| True if scheduler started successfully | ||
| """ | ||
| if self.is_running: | ||
| logger.warning("Scheduler is already running") | ||
| return False | ||
|
|
||
| try: | ||
| interval = ScheduleParser.parse(schedule_expr) | ||
| self.is_running = True | ||
| self._stop_event.clear() | ||
|
|
||
| logger.info(f"Starting agent scheduler: {self.agent.name if hasattr(self.agent, 'name') else 'Agent'}") | ||
| logger.info(f"Task: {self.task}") | ||
| logger.info(f"Schedule: {schedule_expr} ({interval}s interval)") | ||
| logger.info(f"Max retries: {max_retries}") | ||
|
|
||
| # Run immediately if requested | ||
| if run_immediately: | ||
| logger.info("Running agent immediately before starting schedule...") | ||
| self._execute_with_retry(max_retries) | ||
|
|
||
| self._thread = threading.Thread( | ||
| target=self._run_schedule, | ||
| args=(interval, max_retries), | ||
| daemon=True | ||
| ) | ||
| self._thread.start() | ||
|
|
||
| logger.info("Agent scheduler started successfully") | ||
| return True | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to start scheduler: {e}") | ||
| self.is_running = False | ||
| return False | ||
|
|
||
| def stop(self) -> bool: | ||
| """ | ||
| Stop the scheduler gracefully. | ||
|
|
||
| Returns: | ||
| True if stopped successfully | ||
| """ | ||
| if not self.is_running: | ||
| logger.info("Scheduler is not running") | ||
| return True | ||
|
|
||
| logger.info("Stopping agent scheduler...") | ||
| self._stop_event.set() | ||
|
|
||
| if self._thread and self._thread.is_alive(): | ||
| self._thread.join(timeout=10) | ||
|
|
||
| self.is_running = False | ||
| logger.info("Agent scheduler stopped") | ||
| logger.info(f"Execution stats - Total: {self._execution_count}, Success: {self._success_count}, Failed: {self._failure_count}") | ||
| return True | ||
|
|
||
| def get_stats(self) -> Dict[str, Any]: | ||
| """ | ||
| Get execution statistics. | ||
|
|
||
| Returns: | ||
| Dictionary with execution stats | ||
| """ | ||
| return { | ||
| "is_running": self.is_running, | ||
| "total_executions": self._execution_count, | ||
| "successful_executions": self._success_count, | ||
| "failed_executions": self._failure_count, | ||
| "success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0 | ||
| } | ||
|
|
||
| def _run_schedule(self, interval: int, max_retries: int): | ||
| """Internal method to run scheduled agent executions.""" | ||
| while not self._stop_event.is_set(): | ||
| logger.info(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Starting scheduled agent execution") | ||
|
|
||
| self._execute_with_retry(max_retries) | ||
|
|
||
| # Wait for next scheduled time | ||
| logger.info(f"Next execution in {interval} seconds ({interval/3600:.1f} hours)") | ||
| self._stop_event.wait(interval) | ||
|
|
||
| def _execute_with_retry(self, max_retries: int): | ||
| """Execute agent with retry logic.""" | ||
| self._execution_count += 1 | ||
|
|
||
| last_exc: Optional[Exception] = None | ||
| for attempt in range(max_retries): | ||
| try: | ||
| logger.info(f"Attempt {attempt + 1}/{max_retries}") | ||
| result = self._executor.execute(self.task) | ||
|
|
||
| logger.info(f"Agent execution successful on attempt {attempt + 1}") | ||
| logger.info(f"Result: {result}") | ||
|
|
||
| self._success_count += 1 | ||
| safe_call(self.on_success, result) | ||
| return | ||
|
|
||
| except Exception as e: | ||
| last_exc = e | ||
| logger.error(f"Agent execution failed on attempt {attempt + 1}: {e}") | ||
|
|
||
| if attempt < max_retries - 1: | ||
| wait_time = backoff_delay(attempt) | ||
| logger.info(f"Waiting {wait_time}s before retry...") | ||
| time.sleep(wait_time) | ||
|
|
||
| self._failure_count += 1 | ||
| logger.error(f"Agent execution failed after {max_retries} attempts") | ||
| safe_call( | ||
| self.on_failure, | ||
| last_exc if last_exc is not None | ||
| else RuntimeError(f"Failed after {max_retries} attempts") | ||
| ) | ||
|
|
||
| def execute_once(self) -> Any: | ||
| """ | ||
| Execute agent immediately (one-time execution). | ||
|
|
||
| Returns: | ||
| Agent execution result | ||
| """ | ||
| logger.info("Executing agent once") | ||
| try: | ||
| result = self._executor.execute(self.task) | ||
| logger.info(f"One-time execution successful: {result}") | ||
| return result | ||
| except Exception as e: | ||
| logger.error(f"One-time execution failed: {e}") | ||
| raise | ||
| import warnings | ||
|
|
||
| warnings.warn( | ||
| "praisonai.agent_scheduler is deprecated; " | ||
| "use 'from praisonai.scheduler import AgentScheduler' instead.", | ||
| DeprecationWarning, stacklevel=2, | ||
| ) | ||
|
|
||
| def create_agent_scheduler( | ||
| agent, | ||
| task: str, | ||
| config: Optional[Dict[str, Any]] = None | ||
| ) -> AgentScheduler: | ||
| """ | ||
| Factory function to create agent scheduler. | ||
|
|
||
| Args: | ||
| agent: PraisonAI Agent instance | ||
| task: Task description | ||
| config: Optional configuration | ||
|
|
||
| Returns: | ||
| Configured AgentScheduler instance | ||
| """ | ||
| return AgentScheduler(agent, task, config) | ||
| from praisonai.scheduler.agent_scheduler import ( # noqa: F401 | ||
| AgentScheduler, PraisonAgentExecutor, create_agent_scheduler | ||
| ) | ||
| # Preserve the legacy public name as an alias of the canonical interface | ||
| from praisonai.scheduler.base import ExecutorInterface as AgentExecutorInterface # noqa: F401 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ensure_telemetry_defaults()is dead code — never calledThe PR removes the call from
__getattr__with a comment that it should be called fromcli.PraisonAI.__init__instead, but no such call was added tocli.py(confirmed by codebase search). The function has zero callers, soOTEL_SDK_DISABLEDandEC_TELEMETRYdefaults are never set, silently disabling the Langfuse/OTEL integration that worked before this PR.