diff --git a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py index b51260d52..eee051101 100644 --- a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py +++ b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py @@ -56,6 +56,10 @@ def __init__(self, enabled: bool = None): self.logger = logging.getLogger(__name__) + # Add shutdown tracking to prevent double shutdown + self._shutdown_complete = False + self._shutdown_lock = threading.Lock() + if not self.enabled: self.logger.debug("Telemetry is disabled") return @@ -72,6 +76,7 @@ def __init__(self, enabled: bool = None): "errors": 0, } self._metrics_lock = threading.Lock() + self._max_timing_entries = 1000 # Limit to prevent memory leaks # Collect basic environment info (anonymous) self._environment = { @@ -102,7 +107,7 @@ def _get_framework_version(self) -> str: try: from .. import __version__ return __version__ - except ImportError: + except (ImportError, KeyError, AttributeError): return "unknown" def track_agent_execution(self, agent_name: str = None, success: bool = True): @@ -174,15 +179,21 @@ def track_tool_usage(self, tool_name: str, success: bool = True, execution_time: with self._metrics_lock: self._metrics["tool_calls"] += 1 - # Add timing metrics if provided + # Add timing metrics if provided (with memory management) if execution_time is not None: if "tool_execution_times" not in self._metrics: self._metrics["tool_execution_times"] = [] - self._metrics["tool_execution_times"].append({ + + timing_list = self._metrics["tool_execution_times"] + timing_list.append({ "tool_name": tool_name, "execution_time": execution_time, "success": success }) + + # Prevent memory leaks by limiting stored entries + if len(timing_list) > self._max_timing_entries: + timing_list[:] = timing_list[-self._max_timing_entries:] # Send event to PostHog if self._posthog: @@ -322,65 +333,88 @@ def shutdown(self): """ if not self.enabled: return + + # Use lock to prevent concurrent shutdown calls + with self._shutdown_lock: + if self._shutdown_complete: + return + self._shutdown_complete = True # Final flush self.flush() # Shutdown PostHog if available - if hasattr(self, '_posthog') and self._posthog: + posthog_client = getattr(self, '_posthog', None) + if posthog_client: try: - # Force a synchronous flush before shutdown - self._posthog.flush() - - # Get the PostHog client's internal thread pool for cleanup - if hasattr(self._posthog, '_thread_pool'): - thread_pool = self._posthog._thread_pool - if thread_pool: - try: - # Stop accepting new tasks - thread_pool.shutdown(wait=False) - # Wait for threads to finish with timeout - thread_pool.shutdown(wait=True) - except: - pass - - # Force shutdown of any remaining threads - if hasattr(self._posthog, '_consumer'): - try: - self._posthog._consumer.flush() - self._posthog._consumer.shutdown() - except: - pass - - # Standard shutdown - self._posthog.shutdown() - - # Additional cleanup - force thread termination + # Use a timeout-based flush to prevent hanging import threading import time + import concurrent.futures - # Wait up to 2 seconds for threads to terminate - max_wait = 2.0 - start_time = time.time() - - while time.time() - start_time < max_wait: - # Check for any PostHog related threads - posthog_threads = [ - t for t in threading.enumerate() - if t != threading.current_thread() - and not t.daemon - and ('posthog' in t.name.lower() or 'analytics' in t.name.lower()) - ] + # Use ThreadPoolExecutor for better control + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + flush_future = executor.submit(self._safe_flush_posthog, posthog_client) - if not posthog_threads: - break - - time.sleep(0.1) + try: + flush_future.result(timeout=5.0) # 5 second timeout + self.logger.debug("PostHog flush completed successfully") + except concurrent.futures.TimeoutError: + self.logger.warning("PostHog flush timed out") + flush_future.cancel() + except Exception as e: + self.logger.error(f"PostHog flush failed: {e}") + + # Cleanup PostHog threads safely + self._shutdown_posthog_threads(posthog_client) + + # Standard shutdown + posthog_client.shutdown() except Exception as e: # Log the error but don't fail shutdown - self.logger.debug(f"Error during PostHog shutdown: {e}") - pass + self.logger.error(f"Error during PostHog shutdown: {e}") + finally: + self._posthog = None + + def _safe_flush_posthog(self, posthog_client): + """Safely flush PostHog data with error handling.""" + try: + posthog_client.flush() + return True + except Exception as e: + self.logger.debug(f"PostHog flush error: {e}") + return False + + def _shutdown_posthog_threads(self, posthog_client): + """Safely shutdown PostHog background threads.""" + try: + # Access thread pool safely (fix double shutdown issue) + thread_pool = getattr(posthog_client, '_thread_pool', None) + if thread_pool: + try: + # Single shutdown call with timeout + if hasattr(thread_pool, 'shutdown'): + thread_pool.shutdown(wait=False) + # Wait briefly for graceful shutdown + import time + time.sleep(0.5) + except Exception as e: + self.logger.debug(f"Thread pool shutdown error: {e}") + + # Clean up consumer + consumer = getattr(posthog_client, '_consumer', None) + if consumer: + try: + if hasattr(consumer, 'flush'): + consumer.flush() + if hasattr(consumer, 'shutdown'): + consumer.shutdown() + except Exception as e: + self.logger.debug(f"Consumer shutdown error: {e}") + + except Exception as e: + self.logger.debug(f"Error during PostHog thread cleanup: {e}") # Global telemetry instance