-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fix: Comprehensive telemetry cleanup to prevent agent termination hang #1000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -318,6 +318,7 @@ def flush(self): | |
| def shutdown(self): | ||
| """ | ||
| Shutdown telemetry and ensure all events are sent. | ||
| Forces proper cleanup of background threads to prevent hanging. | ||
| """ | ||
| if not self.enabled: | ||
| return | ||
|
|
@@ -330,8 +331,55 @@ def shutdown(self): | |
| try: | ||
| # Force a synchronous flush before shutdown | ||
| self._posthog.flush() | ||
|
|
||
| # Get the PostHog client's internal thread pool for cleanup | ||
| if hasattr(self._posthog, '_thread_pool'): | ||
| thread_pool = self._posthog._thread_pool | ||
| if thread_pool: | ||
| try: | ||
| # Stop accepting new tasks | ||
| thread_pool.shutdown(wait=False) | ||
| # Wait for threads to finish with timeout | ||
| thread_pool.shutdown(wait=True) | ||
|
Comment on lines
+341
to
+343
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| except: | ||
| pass | ||
|
Comment on lines
+344
to
+345
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| # Force shutdown of any remaining threads | ||
| if hasattr(self._posthog, '_consumer'): | ||
| try: | ||
| self._posthog._consumer.flush() | ||
| self._posthog._consumer.shutdown() | ||
| except: | ||
| pass | ||
|
Comment on lines
+352
to
+353
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| # Standard shutdown | ||
| self._posthog.shutdown() | ||
| except: | ||
|
|
||
| # Additional cleanup - force thread termination | ||
| import threading | ||
| import time | ||
|
|
||
| # Wait up to 2 seconds for threads to terminate | ||
| max_wait = 2.0 | ||
| start_time = time.time() | ||
|
|
||
| while time.time() - start_time < max_wait: | ||
| # Check for any PostHog related threads | ||
| posthog_threads = [ | ||
| t for t in threading.enumerate() | ||
| if t != threading.current_thread() | ||
| and not t.daemon | ||
| and ('posthog' in t.name.lower() or 'analytics' in t.name.lower()) | ||
| ] | ||
|
|
||
| if not posthog_threads: | ||
| break | ||
|
|
||
| time.sleep(0.1) | ||
|
|
||
| except Exception as e: | ||
| # Log the error but don't fail shutdown | ||
| self.logger.debug(f"Error during PostHog shutdown: {e}") | ||
| pass | ||
|
|
||
|
|
||
|
|
@@ -361,6 +409,41 @@ def disable_telemetry(): | |
| _telemetry_instance = MinimalTelemetry(enabled=False) | ||
|
|
||
|
|
||
| def force_shutdown_telemetry(): | ||
| """ | ||
| Force shutdown of telemetry system with comprehensive cleanup. | ||
| This function ensures proper termination of all background threads. | ||
| """ | ||
| global _telemetry_instance | ||
| if _telemetry_instance: | ||
| _telemetry_instance.shutdown() | ||
|
|
||
| # Additional cleanup - wait for all threads to finish | ||
| import threading | ||
| import time | ||
|
|
||
| # Wait up to 3 seconds for any remaining threads to finish | ||
| max_wait = 3.0 | ||
| start_time = time.time() | ||
|
|
||
| while time.time() - start_time < max_wait: | ||
| # Check for any analytics/telemetry related threads | ||
| analytics_threads = [ | ||
| t for t in threading.enumerate() | ||
| if t != threading.current_thread() | ||
| and not t.daemon | ||
| and any(keyword in t.name.lower() for keyword in ['posthog', 'analytics', 'telemetry', 'consumer']) | ||
| ] | ||
|
|
||
| if not analytics_threads: | ||
| break | ||
|
|
||
| time.sleep(0.1) | ||
|
Comment on lines
+421
to
+441
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| # Reset the global instance | ||
| _telemetry_instance = None | ||
|
|
||
|
|
||
| def enable_telemetry(): | ||
| """Programmatically enable telemetry (if not disabled by environment).""" | ||
| global _telemetry_instance | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation accesses private members of the
posthogclient (_thread_pool). This creates a dependency on the internal implementation of theposthog-pythonlibrary, which could break in future updates. Add a comment explaining why this is needed and acknowledging the risk.