From 82705301b329cb53c905f9405911035360e54f15 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 18 Jul 2025 17:43:38 +0000 Subject: [PATCH 1/2] fix: prevent telemetry shutdown from hanging indefinitely - Added timeout-based flush operation in MinimalTelemetry.shutdown() - Replaced blocking PostHog flush with threaded timeout approach - Prevents indefinite blocking during program termination - Maintains backward compatibility and data integrity - Fixes issue where PostHog queue.join() would hang on exit Resolves #1009 Co-authored-by: Mervin Praison --- .../praisonaiagents/telemetry/telemetry.py | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py index b51260d52..877dd4cae 100644 --- a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py +++ b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py @@ -329,8 +329,32 @@ def shutdown(self): # Shutdown PostHog if available if hasattr(self, '_posthog') and self._posthog: try: - # Force a synchronous flush before shutdown - self._posthog.flush() + # Use a timeout-based flush to prevent hanging + import threading + import time + + flush_timeout = 2.0 # Maximum time to wait for flush + flush_complete = threading.Event() + + def flush_with_timeout(): + """Flush PostHog data with timeout protection.""" + try: + self._posthog.flush() + flush_complete.set() + except Exception as e: + self.logger.debug(f"PostHog flush error: {e}") + flush_complete.set() + + # Start flush in a separate thread + flush_thread = threading.Thread(target=flush_with_timeout, daemon=True) + flush_thread.start() + + # Wait for flush to complete or timeout + flush_complete.wait(timeout=flush_timeout) + + # If flush didn't complete, log and continue + if not flush_complete.is_set(): + self.logger.debug("PostHog flush timed out, continuing with shutdown") # Get the PostHog client's internal thread pool for cleanup if hasattr(self._posthog, '_thread_pool'): @@ -356,9 +380,6 @@ def shutdown(self): self._posthog.shutdown() # Additional cleanup - force thread termination - import threading - import time - # Wait up to 2 seconds for threads to terminate max_wait = 2.0 start_time = time.time() From be68f3e782394e8f32e53d9805e5955490637ff9 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 18 Jul 2025 17:53:46 +0000 Subject: [PATCH 2/2] fix: enhance telemetry shutdown robustness and prevent critical issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add thread safety with shutdown locks to prevent concurrent shutdown calls - Fix double thread pool shutdown issue that could cause undefined behavior - Implement memory leak prevention with bounded metrics storage (max 1000 entries) - Replace unsafe daemon threads with ThreadPoolExecutor for better control - Increase flush timeout from 2s to 5s for improved reliability - Add comprehensive error handling with proper logging levels - Split shutdown logic into maintainable helper methods - Maintain full backward compatibility with existing API These changes address critical thread safety issues while preserving all existing functionality and ensuring reliable telemetry shutdown behavior. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Mervin Praison --- .../praisonaiagents/telemetry/telemetry.py | 143 ++++++++++-------- 1 file changed, 78 insertions(+), 65 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py index 877dd4cae..eee051101 100644 --- a/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py +++ b/src/praisonai-agents/praisonaiagents/telemetry/telemetry.py @@ -56,6 +56,10 @@ def __init__(self, enabled: bool = None): self.logger = logging.getLogger(__name__) + # Add shutdown tracking to prevent double shutdown + self._shutdown_complete = False + self._shutdown_lock = threading.Lock() + if not self.enabled: self.logger.debug("Telemetry is disabled") return @@ -72,6 +76,7 @@ def __init__(self, enabled: bool = None): "errors": 0, } self._metrics_lock = threading.Lock() + self._max_timing_entries = 1000 # Limit to prevent memory leaks # Collect basic environment info (anonymous) self._environment = { @@ -102,7 +107,7 @@ def _get_framework_version(self) -> str: try: from .. import __version__ return __version__ - except ImportError: + except (ImportError, KeyError, AttributeError): return "unknown" def track_agent_execution(self, agent_name: str = None, success: bool = True): @@ -174,15 +179,21 @@ def track_tool_usage(self, tool_name: str, success: bool = True, execution_time: with self._metrics_lock: self._metrics["tool_calls"] += 1 - # Add timing metrics if provided + # Add timing metrics if provided (with memory management) if execution_time is not None: if "tool_execution_times" not in self._metrics: self._metrics["tool_execution_times"] = [] - self._metrics["tool_execution_times"].append({ + + timing_list = self._metrics["tool_execution_times"] + timing_list.append({ "tool_name": tool_name, "execution_time": execution_time, "success": success }) + + # Prevent memory leaks by limiting stored entries + if len(timing_list) > self._max_timing_entries: + timing_list[:] = timing_list[-self._max_timing_entries:] # Send event to PostHog if self._posthog: @@ -322,86 +333,88 @@ def shutdown(self): """ if not self.enabled: return + + # Use lock to prevent concurrent shutdown calls + with self._shutdown_lock: + if self._shutdown_complete: + return + self._shutdown_complete = True # Final flush self.flush() # Shutdown PostHog if available - if hasattr(self, '_posthog') and self._posthog: + posthog_client = getattr(self, '_posthog', None) + if posthog_client: try: # Use a timeout-based flush to prevent hanging import threading import time + import concurrent.futures - flush_timeout = 2.0 # Maximum time to wait for flush - flush_complete = threading.Event() - - def flush_with_timeout(): - """Flush PostHog data with timeout protection.""" + # Use ThreadPoolExecutor for better control + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + flush_future = executor.submit(self._safe_flush_posthog, posthog_client) + try: - self._posthog.flush() - flush_complete.set() + flush_future.result(timeout=5.0) # 5 second timeout + self.logger.debug("PostHog flush completed successfully") + except concurrent.futures.TimeoutError: + self.logger.warning("PostHog flush timed out") + flush_future.cancel() except Exception as e: - self.logger.debug(f"PostHog flush error: {e}") - flush_complete.set() - - # Start flush in a separate thread - flush_thread = threading.Thread(target=flush_with_timeout, daemon=True) - flush_thread.start() - - # Wait for flush to complete or timeout - flush_complete.wait(timeout=flush_timeout) + self.logger.error(f"PostHog flush failed: {e}") - # If flush didn't complete, log and continue - if not flush_complete.is_set(): - self.logger.debug("PostHog flush timed out, continuing with shutdown") - - # Get the PostHog client's internal thread pool for cleanup - if hasattr(self._posthog, '_thread_pool'): - thread_pool = self._posthog._thread_pool - if thread_pool: - try: - # Stop accepting new tasks - thread_pool.shutdown(wait=False) - # Wait for threads to finish with timeout - thread_pool.shutdown(wait=True) - except: - pass - - # Force shutdown of any remaining threads - if hasattr(self._posthog, '_consumer'): - try: - self._posthog._consumer.flush() - self._posthog._consumer.shutdown() - except: - pass + # Cleanup PostHog threads safely + self._shutdown_posthog_threads(posthog_client) # Standard shutdown - self._posthog.shutdown() - - # Additional cleanup - force thread termination - # Wait up to 2 seconds for threads to terminate - max_wait = 2.0 - start_time = time.time() - - while time.time() - start_time < max_wait: - # Check for any PostHog related threads - posthog_threads = [ - t for t in threading.enumerate() - if t != threading.current_thread() - and not t.daemon - and ('posthog' in t.name.lower() or 'analytics' in t.name.lower()) - ] - - if not posthog_threads: - break - - time.sleep(0.1) + posthog_client.shutdown() except Exception as e: # Log the error but don't fail shutdown - self.logger.debug(f"Error during PostHog shutdown: {e}") - pass + self.logger.error(f"Error during PostHog shutdown: {e}") + finally: + self._posthog = None + + def _safe_flush_posthog(self, posthog_client): + """Safely flush PostHog data with error handling.""" + try: + posthog_client.flush() + return True + except Exception as e: + self.logger.debug(f"PostHog flush error: {e}") + return False + + def _shutdown_posthog_threads(self, posthog_client): + """Safely shutdown PostHog background threads.""" + try: + # Access thread pool safely (fix double shutdown issue) + thread_pool = getattr(posthog_client, '_thread_pool', None) + if thread_pool: + try: + # Single shutdown call with timeout + if hasattr(thread_pool, 'shutdown'): + thread_pool.shutdown(wait=False) + # Wait briefly for graceful shutdown + import time + time.sleep(0.5) + except Exception as e: + self.logger.debug(f"Thread pool shutdown error: {e}") + + # Clean up consumer + consumer = getattr(posthog_client, '_consumer', None) + if consumer: + try: + if hasattr(consumer, 'flush'): + consumer.flush() + if hasattr(consumer, 'shutdown'): + consumer.shutdown() + except Exception as e: + self.logger.debug(f"Consumer shutdown error: {e}") + + except Exception as e: + self.logger.debug(f"Error during PostHog thread cleanup: {e}") # Global telemetry instance