Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1958,12 +1958,10 @@ def _cleanup_telemetry(self):
"""Clean up telemetry system to ensure proper program termination."""
try:
# Import here to avoid circular imports
from ..telemetry import get_telemetry
from ..telemetry import force_shutdown_telemetry

# Get the global telemetry instance and shut it down
telemetry = get_telemetry()
if telemetry and hasattr(telemetry, 'shutdown'):
telemetry.shutdown()
# Force shutdown of telemetry system with comprehensive cleanup
force_shutdown_telemetry()
except Exception as e:
# Log error but don't fail the execution
logging.debug(f"Error cleaning up telemetry: {e}")
Expand Down
7 changes: 7 additions & 0 deletions src/praisonai-agents/praisonaiagents/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'get_telemetry',
'enable_telemetry',
'disable_telemetry',
'force_shutdown_telemetry',
'MinimalTelemetry',
'TelemetryCollector', # For backward compatibility
]
Expand All @@ -47,6 +48,12 @@ def disable_telemetry():
_disable_telemetry()


def force_shutdown_telemetry():
"""Force shutdown of telemetry system with comprehensive cleanup."""
from .telemetry import force_shutdown_telemetry as _force_shutdown_telemetry
_force_shutdown_telemetry()


# Auto-instrumentation and cleanup setup
_initialized = False
_atexit_registered = False
Expand Down
85 changes: 84 additions & 1 deletion src/praisonai-agents/praisonaiagents/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ def flush(self):
def shutdown(self):
"""
Shutdown telemetry and ensure all events are sent.
Forces proper cleanup of background threads to prevent hanging.
"""
if not self.enabled:
return
Expand All @@ -330,8 +331,55 @@ def shutdown(self):
try:
# Force a synchronous flush before shutdown
self._posthog.flush()

# Get the PostHog client's internal thread pool for cleanup
if hasattr(self._posthog, '_thread_pool'):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation accesses private members of the posthog client (_thread_pool). This creates a dependency on the internal implementation of the posthog-python library, which could break in future updates. Add a comment explaining why this is needed and acknowledging the risk.

# NOTE: Accessing private members of the posthog client to ensure
# all background threads are properly terminated. This is a workaround
# for a known issue and may be fragile to posthog library updates.

thread_pool = self._posthog._thread_pool
if thread_pool:
try:
# Stop accepting new tasks
thread_pool.shutdown(wait=False)
# Wait for threads to finish with timeout
thread_pool.shutdown(wait=True)
Comment on lines +341 to +343
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to thread_pool.shutdown(wait=False) is redundant when immediately followed by thread_pool.shutdown(wait=True). The shutdown(wait=True) already stops the pool and waits for tasks to complete. Simplify by removing the first call.

                            thread_pool.shutdown(wait=True)

except:
pass
Comment on lines +344 to +345
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using a bare except: is not recommended as it catches all exceptions, including system-exiting exceptions. Catch specific exceptions or, at a minimum, Exception. Log the exception for better diagnostics.

                        except Exception as e:
                            self.logger.debug(f"Error shutting down PostHog thread pool: {e}")


# Force shutdown of any remaining threads
if hasattr(self._posthog, '_consumer'):
try:
self._posthog._consumer.flush()
self._posthog._consumer.shutdown()
except:
pass
Comment on lines +352 to +353
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the previous comment, this bare except: should be avoided. Catch Exception and log the error to aid in future troubleshooting.

                    except Exception as e:
                        self.logger.debug(f"Error shutting down PostHog consumer: {e}")


# Standard shutdown
self._posthog.shutdown()
except:

# Additional cleanup - force thread termination
import threading
import time

# Wait up to 2 seconds for threads to terminate
max_wait = 2.0
start_time = time.time()

while time.time() - start_time < max_wait:
# Check for any PostHog related threads
posthog_threads = [
t for t in threading.enumerate()
if t != threading.current_thread()
and not t.daemon
and ('posthog' in t.name.lower() or 'analytics' in t.name.lower())
]

if not posthog_threads:
break

time.sleep(0.1)

except Exception as e:
# Log the error but don't fail shutdown
self.logger.debug(f"Error during PostHog shutdown: {e}")
pass


Expand Down Expand Up @@ -361,6 +409,41 @@ def disable_telemetry():
_telemetry_instance = MinimalTelemetry(enabled=False)


def force_shutdown_telemetry():
"""
Force shutdown of telemetry system with comprehensive cleanup.
This function ensures proper termination of all background threads.
"""
global _telemetry_instance
if _telemetry_instance:
_telemetry_instance.shutdown()

# Additional cleanup - wait for all threads to finish
import threading
import time

# Wait up to 3 seconds for any remaining threads to finish
max_wait = 3.0
start_time = time.time()

while time.time() - start_time < max_wait:
# Check for any analytics/telemetry related threads
analytics_threads = [
t for t in threading.enumerate()
if t != threading.current_thread()
and not t.daemon
and any(keyword in t.name.lower() for keyword in ['posthog', 'analytics', 'telemetry', 'consumer'])
]

if not analytics_threads:
break

time.sleep(0.1)
Comment on lines +421 to +441
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This thread-waiting logic is very similar to the one in the MinimalTelemetry.shutdown method (lines 358-378). To improve maintainability and avoid code duplication, consider extracting this logic into a private helper function.


# Reset the global instance
_telemetry_instance = None


def enable_telemetry():
"""Programmatically enable telemetry (if not disabled by environment)."""
global _telemetry_instance
Expand Down
Loading