From eb19c7fad9fa5a692326a961943a2565cc02b0cf Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 18 Jul 2025 18:03:08 +0000 Subject: [PATCH 1/3] fix: prevent PostHog shutdown errors during interpreter shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add interpreter shutdown detection using sys.is_finalizing() and thread tests - Skip PostHog operations when interpreter is shutting down to prevent: - cannot schedule new futures after interpreter shutdown errors - can''t register atexit after shutdown errors - Event loop and threading errors during shutdown - Enhanced error handling with specific shutdown error message filtering - Apply shutdown prevention to flush, thread cleanup, and shutdown operations - Maintain full backward compatibility with existing API - Add comprehensive debug logging for shutdown scenarios Fixes issue #1011 where PostHog telemetry was causing shutdown errors when force_shutdown_telemetry() was called during interpreter shutdown. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Mervin Praison --- .../praisonaiagents/telemetry/telemetry.py | 112 ++++++++++++++++-- 1 file changed, 104 insertions(+), 8 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py index eee051101..4600a5f6b 100644 --- a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py +++ b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py @@ -347,6 +347,12 @@ def shutdown(self): posthog_client = getattr(self, '_posthog', None) if posthog_client: try: + # Check if Python interpreter is shutting down + if self._is_interpreter_shutting_down(): + self.logger.debug("Interpreter shutting down, skipping PostHog operations") + self._posthog = None + return + # Use a timeout-based flush to prevent hanging import threading import time @@ -368,27 +374,90 @@ def shutdown(self): # Cleanup PostHog threads safely self._shutdown_posthog_threads(posthog_client) - # Standard shutdown - posthog_client.shutdown() + # Standard shutdown - with interpreter shutdown check + if not self._is_interpreter_shutting_down(): + posthog_client.shutdown() + else: + self.logger.debug("Skipping PostHog shutdown call due to interpreter shutdown") except Exception as e: - # Log the error but don't fail shutdown - self.logger.error(f"Error during PostHog shutdown: {e}") + # Handle specific shutdown-related errors gracefully + error_msg = str(e).lower() + if any(phrase in error_msg for phrase in [ + 'cannot schedule new futures', + 'interpreter shutdown', + 'atexit after shutdown', + 'event loop closed', + 'runtime is shutting down' + ]): + self.logger.debug(f"PostHog shutdown prevented due to interpreter shutdown: {e}") + else: + self.logger.error(f"Error during PostHog shutdown: {e}") finally: self._posthog = None + def _is_interpreter_shutting_down(self) -> bool: + """ + Check if the Python interpreter is shutting down. + + Returns: + True if interpreter is shutting down, False otherwise + """ + try: + import sys + import threading + + # Check if the interpreter is in shutdown mode + if hasattr(sys, 'is_finalizing') and sys.is_finalizing(): + return True + + # Check if we can create new threads (fails during shutdown) + try: + import threading + test_thread = threading.Thread(target=lambda: None) + test_thread.daemon = True + test_thread.start() + test_thread.join(timeout=0.001) + return False + except (RuntimeError, threading.ThreadError): + return True + + except Exception: + # If we can't determine state, assume we're shutting down to be safe + return True + def _safe_flush_posthog(self, posthog_client): """Safely flush PostHog data with error handling.""" try: + # Skip flush if interpreter is shutting down + if self._is_interpreter_shutting_down(): + self.logger.debug("Skipping PostHog flush due to interpreter shutdown") + return False + posthog_client.flush() return True except Exception as e: - self.logger.debug(f"PostHog flush error: {e}") + error_msg = str(e).lower() + if any(phrase in error_msg for phrase in [ + 'cannot schedule new futures', + 'interpreter shutdown', + 'atexit after shutdown', + 'event loop closed', + 'runtime is shutting down' + ]): + self.logger.debug(f"PostHog flush prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"PostHog flush error: {e}") return False def _shutdown_posthog_threads(self, posthog_client): """Safely shutdown PostHog background threads.""" try: + # Skip thread cleanup if interpreter is shutting down + if self._is_interpreter_shutting_down(): + self.logger.debug("Skipping PostHog thread cleanup due to interpreter shutdown") + return + # Access thread pool safely (fix double shutdown issue) thread_pool = getattr(posthog_client, '_thread_pool', None) if thread_pool: @@ -400,7 +469,16 @@ def _shutdown_posthog_threads(self, posthog_client): import time time.sleep(0.5) except Exception as e: - self.logger.debug(f"Thread pool shutdown error: {e}") + error_msg = str(e).lower() + if any(phrase in error_msg for phrase in [ + 'cannot schedule new futures', + 'interpreter shutdown', + 'atexit after shutdown', + 'event loop closed' + ]): + self.logger.debug(f"Thread pool shutdown prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"Thread pool shutdown error: {e}") # Clean up consumer consumer = getattr(posthog_client, '_consumer', None) @@ -411,10 +489,28 @@ def _shutdown_posthog_threads(self, posthog_client): if hasattr(consumer, 'shutdown'): consumer.shutdown() except Exception as e: - self.logger.debug(f"Consumer shutdown error: {e}") + error_msg = str(e).lower() + if any(phrase in error_msg for phrase in [ + 'cannot schedule new futures', + 'interpreter shutdown', + 'atexit after shutdown', + 'event loop closed' + ]): + self.logger.debug(f"Consumer shutdown prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"Consumer shutdown error: {e}") except Exception as e: - self.logger.debug(f"Error during PostHog thread cleanup: {e}") + error_msg = str(e).lower() + if any(phrase in error_msg for phrase in [ + 'cannot schedule new futures', + 'interpreter shutdown', + 'atexit after shutdown', + 'event loop closed' + ]): + self.logger.debug(f"PostHog thread cleanup prevented due to interpreter shutdown: {e}") + else: + self.logger.debug(f"Error during PostHog thread cleanup: {e}") # Global telemetry instance From 36160c700a9bcc8d4f8847a7e1ae274d47bf6bb2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 18 Jul 2025 20:54:58 +0000 Subject: [PATCH 2/3] refactor: improve telemetry shutdown error handling and reduce code duplication - Add centralized _SHUTDOWN_ERROR_PHRASES constant for consistent error detection - Create _is_shutdown_related_error() helper method to eliminate code duplication - Remove duplicate import threading in _is_interpreter_shutting_down() - Remove redundant _posthog = None assignment handled by finally block - Ensure all shutdown error detection uses consistent phrase list Addresses reviewer feedback on duplicate imports, inconsistent error handling, and code duplication while maintaining full backward compatibility. Co-authored-by: Mervin Praison --- .../praisonaiagents/telemetry/telemetry.py | 66 ++++++++----------- 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py index 4600a5f6b..074c7180e 100644 --- a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py +++ b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py @@ -41,6 +41,15 @@ class MinimalTelemetry: - Can be disabled via environment variables """ + # Common error phrases that indicate interpreter shutdown + _SHUTDOWN_ERROR_PHRASES = [ + 'cannot schedule new futures', + 'interpreter shutdown', + 'atexit after shutdown', + 'event loop closed', + 'runtime is shutting down' + ] + def __init__(self, enabled: bool = None): """ Initialize the minimal telemetry collector. @@ -350,7 +359,6 @@ def shutdown(self): # Check if Python interpreter is shutting down if self._is_interpreter_shutting_down(): self.logger.debug("Interpreter shutting down, skipping PostHog operations") - self._posthog = None return # Use a timeout-based flush to prevent hanging @@ -382,20 +390,26 @@ def shutdown(self): except Exception as e: # Handle specific shutdown-related errors gracefully - error_msg = str(e).lower() - if any(phrase in error_msg for phrase in [ - 'cannot schedule new futures', - 'interpreter shutdown', - 'atexit after shutdown', - 'event loop closed', - 'runtime is shutting down' - ]): + if self._is_shutdown_related_error(e): self.logger.debug(f"PostHog shutdown prevented due to interpreter shutdown: {e}") else: self.logger.error(f"Error during PostHog shutdown: {e}") finally: self._posthog = None + def _is_shutdown_related_error(self, error: Exception) -> bool: + """ + Check if an error is related to interpreter shutdown. + + Args: + error: The exception to check + + Returns: + True if the error is shutdown-related, False otherwise + """ + error_msg = str(error).lower() + return any(phrase in error_msg for phrase in self._SHUTDOWN_ERROR_PHRASES) + def _is_interpreter_shutting_down(self) -> bool: """ Check if the Python interpreter is shutting down. @@ -413,7 +427,6 @@ def _is_interpreter_shutting_down(self) -> bool: # Check if we can create new threads (fails during shutdown) try: - import threading test_thread = threading.Thread(target=lambda: None) test_thread.daemon = True test_thread.start() @@ -437,14 +450,7 @@ def _safe_flush_posthog(self, posthog_client): posthog_client.flush() return True except Exception as e: - error_msg = str(e).lower() - if any(phrase in error_msg for phrase in [ - 'cannot schedule new futures', - 'interpreter shutdown', - 'atexit after shutdown', - 'event loop closed', - 'runtime is shutting down' - ]): + if self._is_shutdown_related_error(e): self.logger.debug(f"PostHog flush prevented due to interpreter shutdown: {e}") else: self.logger.debug(f"PostHog flush error: {e}") @@ -469,13 +475,7 @@ def _shutdown_posthog_threads(self, posthog_client): import time time.sleep(0.5) except Exception as e: - error_msg = str(e).lower() - if any(phrase in error_msg for phrase in [ - 'cannot schedule new futures', - 'interpreter shutdown', - 'atexit after shutdown', - 'event loop closed' - ]): + if self._is_shutdown_related_error(e): self.logger.debug(f"Thread pool shutdown prevented due to interpreter shutdown: {e}") else: self.logger.debug(f"Thread pool shutdown error: {e}") @@ -489,25 +489,13 @@ def _shutdown_posthog_threads(self, posthog_client): if hasattr(consumer, 'shutdown'): consumer.shutdown() except Exception as e: - error_msg = str(e).lower() - if any(phrase in error_msg for phrase in [ - 'cannot schedule new futures', - 'interpreter shutdown', - 'atexit after shutdown', - 'event loop closed' - ]): + if self._is_shutdown_related_error(e): self.logger.debug(f"Consumer shutdown prevented due to interpreter shutdown: {e}") else: self.logger.debug(f"Consumer shutdown error: {e}") except Exception as e: - error_msg = str(e).lower() - if any(phrase in error_msg for phrase in [ - 'cannot schedule new futures', - 'interpreter shutdown', - 'atexit after shutdown', - 'event loop closed' - ]): + if self._is_shutdown_related_error(e): self.logger.debug(f"PostHog thread cleanup prevented due to interpreter shutdown: {e}") else: self.logger.debug(f"Error during PostHog thread cleanup: {e}") From 65846f4f487f563ce0e8fbcf7d88d712fa68a94d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 18 Jul 2025 20:59:52 +0000 Subject: [PATCH 3/3] refactor: remove duplicate threading import in _is_interpreter_shutting_down method - Remove redundant ''import threading'' since threading is already imported at module level - Addresses reviewer feedback on code duplication - Maintains all functionality while improving code cleanliness Co-authored-by: Mervin Praison --- src/praisonai-agents/praisonaiagents/telemetry/telemetry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py index 074c7180e..35a67ee15 100644 --- a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py +++ b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py @@ -419,7 +419,6 @@ def _is_interpreter_shutting_down(self) -> bool: """ try: import sys - import threading # Check if the interpreter is in shutdown mode if hasattr(sys, 'is_finalizing') and sys.is_finalizing():