Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 83 additions & 49 deletions src/praisonai-agents/praisonaiagents/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def __init__(self, enabled: bool = None):

self.logger = logging.getLogger(__name__)

# Add shutdown tracking to prevent double shutdown
self._shutdown_complete = False
self._shutdown_lock = threading.Lock()

if not self.enabled:
self.logger.debug("Telemetry is disabled")
return
Expand All @@ -72,6 +76,7 @@ def __init__(self, enabled: bool = None):
"errors": 0,
}
self._metrics_lock = threading.Lock()
self._max_timing_entries = 1000 # Limit to prevent memory leaks

# Collect basic environment info (anonymous)
self._environment = {
Expand Down Expand Up @@ -102,7 +107,7 @@ def _get_framework_version(self) -> str:
try:
from .. import __version__
return __version__
except ImportError:
except (ImportError, KeyError, AttributeError):
return "unknown"

def track_agent_execution(self, agent_name: str = None, success: bool = True):
Expand Down Expand Up @@ -174,15 +179,21 @@ def track_tool_usage(self, tool_name: str, success: bool = True, execution_time:
with self._metrics_lock:
self._metrics["tool_calls"] += 1

# Add timing metrics if provided
# Add timing metrics if provided (with memory management)
if execution_time is not None:
if "tool_execution_times" not in self._metrics:
self._metrics["tool_execution_times"] = []
self._metrics["tool_execution_times"].append({

timing_list = self._metrics["tool_execution_times"]
timing_list.append({
"tool_name": tool_name,
"execution_time": execution_time,
"success": success
})

# Prevent memory leaks by limiting stored entries
if len(timing_list) > self._max_timing_entries:
timing_list[:] = timing_list[-self._max_timing_entries:]

# Send event to PostHog
if self._posthog:
Expand Down Expand Up @@ -322,65 +333,88 @@ def shutdown(self):
"""
if not self.enabled:
return

# Use lock to prevent concurrent shutdown calls
with self._shutdown_lock:
if self._shutdown_complete:
return
self._shutdown_complete = True

# Final flush
self.flush()

# Shutdown PostHog if available
if hasattr(self, '_posthog') and self._posthog:
posthog_client = getattr(self, '_posthog', None)
if posthog_client:
try:
# Force a synchronous flush before shutdown
self._posthog.flush()

# Get the PostHog client's internal thread pool for cleanup
if hasattr(self._posthog, '_thread_pool'):
thread_pool = self._posthog._thread_pool
if thread_pool:
try:
# Stop accepting new tasks
thread_pool.shutdown(wait=False)
# Wait for threads to finish with timeout
thread_pool.shutdown(wait=True)
except:
pass

# Force shutdown of any remaining threads
if hasattr(self._posthog, '_consumer'):
try:
self._posthog._consumer.flush()
self._posthog._consumer.shutdown()
except:
pass

# Standard shutdown
self._posthog.shutdown()

# Additional cleanup - force thread termination
# Use a timeout-based flush to prevent hanging
import threading
import time
import concurrent.futures

# Wait up to 2 seconds for threads to terminate
max_wait = 2.0
start_time = time.time()

while time.time() - start_time < max_wait:
# Check for any PostHog related threads
posthog_threads = [
t for t in threading.enumerate()
if t != threading.current_thread()
and not t.daemon
and ('posthog' in t.name.lower() or 'analytics' in t.name.lower())
]
# Use ThreadPoolExecutor for better control
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
flush_future = executor.submit(self._safe_flush_posthog, posthog_client)

if not posthog_threads:
break

time.sleep(0.1)
try:
flush_future.result(timeout=5.0) # 5 second timeout
self.logger.debug("PostHog flush completed successfully")
except concurrent.futures.TimeoutError:
self.logger.warning("PostHog flush timed out")
flush_future.cancel()
except Exception as e:
self.logger.error(f"PostHog flush failed: {e}")

# Cleanup PostHog threads safely
self._shutdown_posthog_threads(posthog_client)

# Standard shutdown
posthog_client.shutdown()

except Exception as e:
# Log the error but don't fail shutdown
self.logger.debug(f"Error during PostHog shutdown: {e}")
pass
self.logger.error(f"Error during PostHog shutdown: {e}")
finally:
self._posthog = None

def _safe_flush_posthog(self, posthog_client):
"""Safely flush PostHog data with error handling."""
try:
posthog_client.flush()
return True
except Exception as e:
self.logger.debug(f"PostHog flush error: {e}")
return False

def _shutdown_posthog_threads(self, posthog_client):
"""Safely shutdown PostHog background threads."""
try:
# Access thread pool safely (fix double shutdown issue)
thread_pool = getattr(posthog_client, '_thread_pool', None)
if thread_pool:
try:
# Single shutdown call with timeout
if hasattr(thread_pool, 'shutdown'):
thread_pool.shutdown(wait=False)
# Wait briefly for graceful shutdown
import time
time.sleep(0.5)
except Exception as e:
self.logger.debug(f"Thread pool shutdown error: {e}")

# Clean up consumer
consumer = getattr(posthog_client, '_consumer', None)
if consumer:
try:
if hasattr(consumer, 'flush'):
consumer.flush()
if hasattr(consumer, 'shutdown'):
consumer.shutdown()
except Exception as e:
self.logger.debug(f"Consumer shutdown error: {e}")

except Exception as e:
self.logger.debug(f"Error during PostHog thread cleanup: {e}")


# Global telemetry instance
Expand Down