diff --git a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py index eee051101..35a67ee15 100644 --- a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py +++ b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py @@ -41,6 +41,15 @@ class MinimalTelemetry: - Can be disabled via environment variables """ + # Common error phrases that indicate interpreter shutdown + _SHUTDOWN_ERROR_PHRASES = [ + 'cannot schedule new futures', + 'interpreter shutdown', + 'atexit after shutdown', + 'event loop closed', + 'runtime is shutting down' + ] + def __init__(self, enabled: bool = None): """ Initialize the minimal telemetry collector. @@ -347,6 +356,11 @@ def shutdown(self): posthog_client = getattr(self, '_posthog', None) if posthog_client: try: + # Check if Python interpreter is shutting down + if self._is_interpreter_shutting_down(): + self.logger.debug("Interpreter shutting down, skipping PostHog operations") + return + # Use a timeout-based flush to prevent hanging import threading import time @@ -368,27 +382,87 @@ def shutdown(self): # Cleanup PostHog threads safely self._shutdown_posthog_threads(posthog_client) - # Standard shutdown - posthog_client.shutdown() + # Standard shutdown - with interpreter shutdown check + if not self._is_interpreter_shutting_down(): + posthog_client.shutdown() + else: + self.logger.debug("Skipping PostHog shutdown call due to interpreter shutdown") except Exception as e: - # Log the error but don't fail shutdown - self.logger.error(f"Error during PostHog shutdown: {e}") + # Handle specific shutdown-related errors gracefully + if self._is_shutdown_related_error(e): + self.logger.debug(f"PostHog shutdown prevented due to interpreter shutdown: {e}") + else: + self.logger.error(f"Error during PostHog shutdown: {e}") finally: self._posthog = None + def _is_shutdown_related_error(self, error: Exception) -> bool: + """ + Check if an error is related to interpreter shutdown. + + Args: + error: The exception to check + + Returns: + True if the error is shutdown-related, False otherwise + """ + error_msg = str(error).lower() + return any(phrase in error_msg for phrase in self._SHUTDOWN_ERROR_PHRASES) + + def _is_interpreter_shutting_down(self) -> bool: + """ + Check if the Python interpreter is shutting down. + + Returns: + True if interpreter is shutting down, False otherwise + """ + try: + import sys + + # Check if the interpreter is in shutdown mode + if hasattr(sys, 'is_finalizing') and sys.is_finalizing(): + return True + + # Check if we can create new threads (fails during shutdown) + try: + test_thread = threading.Thread(target=lambda: None) + test_thread.daemon = True + test_thread.start() + test_thread.join(timeout=0.001) + return False + except (RuntimeError, threading.ThreadError): + return True + + except Exception: + # If we can't determine state, assume we're shutting down to be safe + return True + def _safe_flush_posthog(self, posthog_client): """Safely flush PostHog data with error handling.""" try: + # Skip flush if interpreter is shutting down + if self._is_interpreter_shutting_down(): + self.logger.debug("Skipping PostHog flush due to interpreter shutdown") + return False + posthog_client.flush() return True except Exception as e: - self.logger.debug(f"PostHog flush error: {e}") + if self._is_shutdown_related_error(e): + self.logger.debug(f"PostHog flush prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"PostHog flush error: {e}") return False def _shutdown_posthog_threads(self, posthog_client): """Safely shutdown PostHog background threads.""" try: + # Skip thread cleanup if interpreter is shutting down + if self._is_interpreter_shutting_down(): + self.logger.debug("Skipping PostHog thread cleanup due to interpreter shutdown") + return + # Access thread pool safely (fix double shutdown issue) thread_pool = getattr(posthog_client, '_thread_pool', None) if thread_pool: @@ -400,7 +474,10 @@ def _shutdown_posthog_threads(self, posthog_client): import time time.sleep(0.5) except Exception as e: - self.logger.debug(f"Thread pool shutdown error: {e}") + if self._is_shutdown_related_error(e): + self.logger.debug(f"Thread pool shutdown prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"Thread pool shutdown error: {e}") # Clean up consumer consumer = getattr(posthog_client, '_consumer', None) @@ -411,10 +488,16 @@ def _shutdown_posthog_threads(self, posthog_client): if hasattr(consumer, 'shutdown'): consumer.shutdown() except Exception as e: - self.logger.debug(f"Consumer shutdown error: {e}") + if self._is_shutdown_related_error(e): + self.logger.debug(f"Consumer shutdown prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"Consumer shutdown error: {e}") except Exception as e: - self.logger.debug(f"Error during PostHog thread cleanup: {e}") + if self._is_shutdown_related_error(e): + self.logger.debug(f"PostHog thread cleanup prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"Error during PostHog thread cleanup: {e}") # Global telemetry instance