Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions src/praisonai/praisonai/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Suppress crewai.cli.config logger BEFORE any imports to prevent INFO log
import logging
import threading
logging.getLogger('crewai.cli.config').setLevel(logging.ERROR)

# Version is lightweight, import directly
Expand Down Expand Up @@ -35,32 +36,34 @@
]

# Telemetry initialization state
_telemetry_lock = threading.Lock()
_telemetry_initialized = False

def _ensure_telemetry_defaults() -> None:
"""Apply telemetry env defaults exactly once, on first observability use."""
global _telemetry_initialized
if _telemetry_initialized:
if _telemetry_initialized: # fast path, OK without lock
return
import os
langfuse_configured = bool(
os.getenv("LANGFUSE_PUBLIC_KEY")
or os.path.exists(os.path.expanduser("~/.praisonai/langfuse.env"))
)
if langfuse_configured:
# Explicitly enable OTEL for Langfuse integration
os.environ["OTEL_SDK_DISABLED"] = "false"
else:
os.environ.setdefault("OTEL_SDK_DISABLED", "true")
os.environ.setdefault("EC_TELEMETRY", "false") # respect user overrides
_telemetry_initialized = True
with _telemetry_lock:
if _telemetry_initialized:
return
import os
# Respect any value the user already set
if "OTEL_SDK_DISABLED" not in os.environ:
langfuse_configured = bool(
os.getenv("LANGFUSE_PUBLIC_KEY")
or os.path.exists(os.path.expanduser("~/.praisonai/langfuse.env"))
)
os.environ["OTEL_SDK_DISABLED"] = "false" if langfuse_configured else "true"
os.environ.setdefault("EC_TELEMETRY", "false") # respect user overrides
Comment on lines +54 to +58
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _ensure_telemetry_defaults() is dead code — never called

The PR removes the call from __getattr__ with a comment that it should be called from cli.PraisonAI.__init__ instead, but no such call was added to cli.py (confirmed by codebase search). The function has zero callers, so OTEL_SDK_DISABLED and EC_TELEMETRY defaults are never set, silently disabling the Langfuse/OTEL integration that worked before this PR.

_telemetry_initialized = True


# Lazy loading for heavy imports
def __getattr__(name):
"""Lazy load heavy modules to improve import time."""
# Ensure telemetry defaults before any lazy import that may touch OTEL.
_ensure_telemetry_defaults()
# Note: Telemetry initialization moved out of lazy hook to avoid side effects
# It should be called explicitly from cli.PraisonAI.__init__ instead

Comment on lines 42 to 67
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find every reference to _ensure_telemetry_defaults across the repo
rg -nP -C2 '\b_ensure_telemetry_defaults\b'

# And specifically inside the cli package, where the comment says it should live
fd -t f -e py . src/praisonai/praisonai/cli 2>/dev/null | xargs rg -nP -C2 '\b_ensure_telemetry_defaults\b' || true

# Also verify the unconditional OTEL setdefault in main.py is still present and unconditional
rg -nP -C3 'OTEL_SDK_DISABLED' src/praisonai/praisonai

Repository: MervinPraison/PraisonAI

Length of output: 2576


Telemetry init function is dead code — never invoked anywhere in the codebase.

_ensure_telemetry_defaults() is defined but never called. The __getattr__ invocation was removed and the comment at lines 65–66 directs callers to invoke it from cli.PraisonAI.__init__, but no such call exists. Instead, cli/main.py:336 unconditionally runs os.environ.setdefault("OTEL_SDK_DISABLED", "true"), which bypasses the Langfuse-aware logic in lines 53–57 (that checks LANGFUSE_PUBLIC_KEY and ~/.praisonai/langfuse.env).

Result: the function's smart telemetry detection never runs in normal flows, and the telemetry environment variable defaults are not applied to non-CLI consumers either.

Either:

  1. Call _ensure_telemetry_defaults() from PraisonAI.__init__ (and remove or replace the unconditional setdefault in main() so it respects the Langfuse check), or
  2. Restore the call in __getattr__ (it is idempotent and lock-protected, so the original concern is mitigated).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/__init__.py` around lines 42 - 67, The
_ensure_telemetry_defaults() function is never invoked, so its Langfuse-aware
OTEL logic is dead; either call it from PraisonAI.__init__ or restore the prior
__getattr__ call and remove the unconditional
os.environ.setdefault("OTEL_SDK_DISABLED", "true") in cli/main.py. Update
PraisonAI.__init__ to call _ensure_telemetry_defaults() early (and remove or
change the unconditional setdefault in main()) OR reintroduce a call to
_ensure_telemetry_defaults() inside __getattr__ so the function runs lazily;
ensure you keep the idempotent behavior and the
_telemetry_lock/_telemetry_initialized checks intact.

if name == 'PraisonAI':
from .cli import PraisonAI
Expand Down
300 changes: 14 additions & 286 deletions src/praisonai/praisonai/agent_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,291 +1,19 @@
"""
Agent Scheduler for PraisonAI - Run agents periodically 24/7.

This module provides scheduling capabilities for running PraisonAI agents
at regular intervals, enabling 24/7 autonomous agent operations.
"""Backward-compatible re-export. Prefer `praisonai.scheduler`.

Example:
# Run news checker every hour
from praisonai.agent_scheduler import AgentScheduler
from praisonai_agents import Agent

agent = Agent(
name="NewsChecker",
instructions="Check latest AI news and summarize",
tools=[search_tool]
)

scheduler = AgentScheduler(agent, task="Check latest AI news")
scheduler.start(schedule_expr="hourly")
This module is deprecated. Use the canonical implementation in the
scheduler package for full functionality including YAML and recipe support.
"""

import threading
import time
import logging
from datetime import datetime
from typing import Optional, Dict, Any, Callable
from abc import ABC, abstractmethod

logger = logging.getLogger(__name__)


# Import shared schedule parser
from .scheduler.shared import ScheduleParser, backoff_delay, safe_call


class AgentExecutorInterface(ABC):
"""Abstract interface for agent execution."""

@abstractmethod
def execute(self, task: str) -> Any:
"""Execute the agent with given task."""
pass


class PraisonAgentExecutor(AgentExecutorInterface):
"""Executor for PraisonAI agents."""

def __init__(self, agent):
"""
Initialize executor with a PraisonAI agent.

Args:
agent: PraisonAI Agent instance
"""
self.agent = agent

def execute(self, task: str) -> Any:
"""
Execute the agent with given task.

Args:
task: Task description for the agent

Returns:
Agent execution result
"""
try:
result = self.agent.start(task)
return result
except Exception as e:
logger.error(f"Agent execution failed: {e}")
raise


class AgentScheduler:
"""
Scheduler for running PraisonAI agents periodically.

Features:
- Interval-based scheduling (hourly, daily, custom)
- Thread-safe operation
- Automatic retry on failure
- Execution logging and monitoring
- Graceful shutdown

Example:
scheduler = AgentScheduler(agent, task="Check news")
scheduler.start(schedule_expr="hourly", max_retries=3)
# Agent runs every hour automatically
scheduler.stop() # Stop when needed
"""

def __init__(
self,
agent,
task: str,
config: Optional[Dict[str, Any]] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None
):
"""
Initialize agent scheduler.

Args:
agent: PraisonAI Agent instance
task: Task description to execute
config: Optional configuration dict
on_success: Callback function on successful execution
on_failure: Callback function on failed execution
"""
self.agent = agent
self.task = task
self.config = config or {}
self.on_success = on_success
self.on_failure = on_failure

self.is_running = False
self._stop_event = threading.Event()
self._thread = None
self._executor = PraisonAgentExecutor(agent)
self._execution_count = 0
self._success_count = 0
self._failure_count = 0

def start(
self,
schedule_expr: str,
max_retries: int = 3,
run_immediately: bool = False
) -> bool:
"""
Start scheduled agent execution.

Args:
schedule_expr: Schedule expression (e.g., "hourly", "*/1h", "3600")
max_retries: Maximum retry attempts on failure
run_immediately: If True, run agent immediately before starting schedule

Returns:
True if scheduler started successfully
"""
if self.is_running:
logger.warning("Scheduler is already running")
return False

try:
interval = ScheduleParser.parse(schedule_expr)
self.is_running = True
self._stop_event.clear()

logger.info(f"Starting agent scheduler: {self.agent.name if hasattr(self.agent, 'name') else 'Agent'}")
logger.info(f"Task: {self.task}")
logger.info(f"Schedule: {schedule_expr} ({interval}s interval)")
logger.info(f"Max retries: {max_retries}")

# Run immediately if requested
if run_immediately:
logger.info("Running agent immediately before starting schedule...")
self._execute_with_retry(max_retries)

self._thread = threading.Thread(
target=self._run_schedule,
args=(interval, max_retries),
daemon=True
)
self._thread.start()

logger.info("Agent scheduler started successfully")
return True

except Exception as e:
logger.error(f"Failed to start scheduler: {e}")
self.is_running = False
return False

def stop(self) -> bool:
"""
Stop the scheduler gracefully.

Returns:
True if stopped successfully
"""
if not self.is_running:
logger.info("Scheduler is not running")
return True

logger.info("Stopping agent scheduler...")
self._stop_event.set()

if self._thread and self._thread.is_alive():
self._thread.join(timeout=10)

self.is_running = False
logger.info("Agent scheduler stopped")
logger.info(f"Execution stats - Total: {self._execution_count}, Success: {self._success_count}, Failed: {self._failure_count}")
return True

def get_stats(self) -> Dict[str, Any]:
"""
Get execution statistics.

Returns:
Dictionary with execution stats
"""
return {
"is_running": self.is_running,
"total_executions": self._execution_count,
"successful_executions": self._success_count,
"failed_executions": self._failure_count,
"success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0
}

def _run_schedule(self, interval: int, max_retries: int):
"""Internal method to run scheduled agent executions."""
while not self._stop_event.is_set():
logger.info(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Starting scheduled agent execution")

self._execute_with_retry(max_retries)

# Wait for next scheduled time
logger.info(f"Next execution in {interval} seconds ({interval/3600:.1f} hours)")
self._stop_event.wait(interval)

def _execute_with_retry(self, max_retries: int):
"""Execute agent with retry logic."""
self._execution_count += 1

last_exc: Optional[Exception] = None
for attempt in range(max_retries):
try:
logger.info(f"Attempt {attempt + 1}/{max_retries}")
result = self._executor.execute(self.task)

logger.info(f"Agent execution successful on attempt {attempt + 1}")
logger.info(f"Result: {result}")

self._success_count += 1
safe_call(self.on_success, result)
return

except Exception as e:
last_exc = e
logger.error(f"Agent execution failed on attempt {attempt + 1}: {e}")

if attempt < max_retries - 1:
wait_time = backoff_delay(attempt)
logger.info(f"Waiting {wait_time}s before retry...")
time.sleep(wait_time)

self._failure_count += 1
logger.error(f"Agent execution failed after {max_retries} attempts")
safe_call(
self.on_failure,
last_exc if last_exc is not None
else RuntimeError(f"Failed after {max_retries} attempts")
)

def execute_once(self) -> Any:
"""
Execute agent immediately (one-time execution).

Returns:
Agent execution result
"""
logger.info("Executing agent once")
try:
result = self._executor.execute(self.task)
logger.info(f"One-time execution successful: {result}")
return result
except Exception as e:
logger.error(f"One-time execution failed: {e}")
raise
import warnings

warnings.warn(
"praisonai.agent_scheduler is deprecated; "
"use 'from praisonai.scheduler import AgentScheduler' instead.",
DeprecationWarning, stacklevel=2,
)

def create_agent_scheduler(
agent,
task: str,
config: Optional[Dict[str, Any]] = None
) -> AgentScheduler:
"""
Factory function to create agent scheduler.

Args:
agent: PraisonAI Agent instance
task: Task description
config: Optional configuration

Returns:
Configured AgentScheduler instance
"""
return AgentScheduler(agent, task, config)
from praisonai.scheduler.agent_scheduler import ( # noqa: F401
AgentScheduler, PraisonAgentExecutor, create_agent_scheduler
)
# Preserve the legacy public name as an alias of the canonical interface
from praisonai.scheduler.base import ExecutorInterface as AgentExecutorInterface # noqa: F401
Loading