-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathstats.py
More file actions
320 lines (271 loc) · 11.8 KB
/
Copy pathstats.py
File metadata and controls
320 lines (271 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""Operational statistics tracker for CodingBuddy plugin (#825).
Tracks tool call count, tool names, errors, and session duration.
Uses fcntl.flock() for file-level locking on every IO operation.
Short-lived hook processes (e.g. PostToolUse) typically create a
SessionStats, call record_tool_call() once, then exit. Because the
default flush_interval is 10, that single call would never reach disk.
Two safety nets cover this:
1. Callers in short-lived processes SHOULD call ``flush()`` explicitly
right after recording, for immediate visibility (statusline, Stop
hook summary, etc.).
2. As a fallback, every live SessionStats instance is tracked in a
module-level WeakSet and flushed once at interpreter exit by a single
atexit handler. This avoids leaking a per-instance atexit handler and
keeps GC behavior unchanged.
"""
import atexit
import json
import os
import time
import weakref
from typing import Any, Dict, List, Optional
try:
import fcntl
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
DEFAULT_DATA_DIR = os.path.join(os.path.expanduser("~"), ".codingbuddy")
# Track every live SessionStats instance with weak references so they
# can be flushed at interpreter exit without preventing garbage
# collection. Using a WeakSet means dead instances are removed
# automatically; using a single module-level atexit handler avoids the
# per-instance handler leak that would otherwise accumulate when many
# SessionStats are created in one process.
_live_instances: "weakref.WeakSet[SessionStats]" = weakref.WeakSet()
def _flush_all_pending() -> None:
"""Flush every live SessionStats instance.
Registered as a single atexit handler at module load time. Iterates
a snapshot of the WeakSet so concurrent GC during iteration is safe.
All exceptions are swallowed because atexit handlers must not raise
during interpreter shutdown.
"""
for inst in list(_live_instances):
try:
inst.flush()
except Exception:
pass
atexit.register(_flush_all_pending)
class SessionStats:
"""Track operational metrics for a Claude Code session.
See module docstring for the short-lived process pattern.
"""
def __init__(self, session_id: str, data_dir: Optional[str] = None, flush_interval: int = 10):
"""Initialize stats tracker.
Args:
session_id: Unique session identifier.
data_dir: Directory for stats files.
Uses CLAUDE_PLUGIN_DATA env or ~/.codingbuddy.
flush_interval: Number of record_tool_call() invocations between
automatic disk flushes. Default 10.
"""
self.session_id = session_id
self._flush_interval = flush_interval
self._pending_count = 0
if data_dir is None:
data_dir = os.environ.get("CLAUDE_PLUGIN_DATA", DEFAULT_DATA_DIR)
self.data_dir = data_dir
os.makedirs(self.data_dir, mode=0o700, exist_ok=True)
# Fix permissions if dir already existed
try:
os.chmod(self.data_dir, 0o700)
except OSError:
import sys
sys.stderr.write(
f"[codingbuddy] Warning: could not set permissions on {self.data_dir}\n"
)
self.stats_file = os.path.join(self.data_dir, f"{session_id}.json")
# Initialize file if it doesn't exist
if not os.path.exists(self.stats_file):
self._locked_write({
"session_id": session_id,
"started_at": time.time(),
"tool_count": 0,
"error_count": 0,
"tool_names": {},
"hook_timings": {},
})
# In-memory accumulator — deltas since last flush
self._mem_tool_count = 0
self._mem_error_count = 0
self._mem_tool_names: Dict[str, int] = {}
self._mem_hook_timings: Dict[str, List[float]] = {}
# Add to the module-level WeakSet so the single atexit handler
# can flush this instance on interpreter exit. WeakSet does not
# prevent garbage collection; once the caller drops its last
# reference (and finalize() has cleared mem state), the instance
# disappears from the set automatically.
_live_instances.add(self)
def record_hook_timing(self, hook_name: str, elapsed_ms: float) -> None:
"""Record a hook execution timing in memory.
Args:
hook_name: Name of the hook (e.g. 'PreToolUse').
elapsed_ms: Elapsed time in milliseconds.
"""
if hook_name not in self._mem_hook_timings:
self._mem_hook_timings[hook_name] = []
self._mem_hook_timings[hook_name].append(elapsed_ms)
self._pending_count += 1
def record_tool_call(self, tool_name: str, success: bool = True) -> None:
"""Record a tool call in memory. Flushes to disk every flush_interval calls.
Args:
tool_name: Name of the tool called.
success: Whether the tool call succeeded.
"""
self._mem_tool_count += 1
if not success:
self._mem_error_count += 1
self._mem_tool_names[tool_name] = self._mem_tool_names.get(tool_name, 0) + 1
self._pending_count += 1
if self._pending_count >= self._flush_interval:
self.flush()
def flush(self) -> None:
"""Flush accumulated in-memory stats to disk."""
if self._pending_count == 0:
return
data = self._locked_read()
data["tool_count"] = data.get("tool_count", 0) + self._mem_tool_count
data["error_count"] = data.get("error_count", 0) + self._mem_error_count
tool_names = data.get("tool_names", {})
for name, count in self._mem_tool_names.items():
tool_names[name] = tool_names.get(name, 0) + count
data["tool_names"] = tool_names
# Merge hook timings
hook_timings = data.get("hook_timings", {})
for name, times in self._mem_hook_timings.items():
if name not in hook_timings:
hook_timings[name] = []
hook_timings[name].extend(times)
data["hook_timings"] = hook_timings
self._locked_write(data)
# Reset in-memory accumulators
self._mem_tool_count = 0
self._mem_error_count = 0
self._mem_tool_names = {}
self._mem_hook_timings = {}
self._pending_count = 0
def _merged_data(self) -> Dict[str, Any]:
"""Return disk data merged with in-memory deltas (read-only)."""
data = self._locked_read()
data["tool_count"] = data.get("tool_count", 0) + self._mem_tool_count
data["error_count"] = data.get("error_count", 0) + self._mem_error_count
tool_names = dict(data.get("tool_names", {}))
for name, count in self._mem_tool_names.items():
tool_names[name] = tool_names.get(name, 0) + count
data["tool_names"] = tool_names
# Merge hook timings
hook_timings = dict(data.get("hook_timings", {}))
for name, times in self._mem_hook_timings.items():
if name not in hook_timings:
hook_timings[name] = []
hook_timings[name] = hook_timings[name] + times
data["hook_timings"] = hook_timings
return data
def format_summary(self) -> str:
"""Format a human-readable summary.
Returns:
String like '[CB] Xm | Y tools | Z errors | Bash:N Edit:M'
"""
data = self._merged_data()
duration = time.time() - data.get("started_at", time.time())
minutes = int(duration // 60)
tool_count = data.get("tool_count", 0)
error_count = data.get("error_count", 0)
tool_names = data.get("tool_names", {})
error_word = "error" if error_count == 1 else "errors"
# Top tools sorted by count descending
sorted_tools = sorted(tool_names.items(), key=lambda x: x[1], reverse=True)
tools_str = " ".join(f"{name}:{count}" for name, count in sorted_tools[:5])
summary = f"[CB] {minutes}m | {tool_count} tools | {error_count} {error_word} | {tools_str}"
# Append hook timing report if any timings exist
hook_timings = data.get("hook_timings", {})
if hook_timings:
timing_parts = []
for name, timings in sorted(hook_timings.items()):
avg = sum(timings) / len(timings)
timing_parts.append(f"{name}:{avg:.0f}ms")
summary += f" | \u23f1 {' '.join(timing_parts)}"
return summary
def finalize(self) -> Dict[str, Any]:
"""Finalize session stats, return data, and cleanup file.
Returns:
Dict with session stats.
"""
self.flush()
data = self._locked_read()
duration = time.time() - data.get("started_at", time.time())
data["duration_seconds"] = duration
# Compute hook timing statistics
hook_timings = data.get("hook_timings", {})
if hook_timings:
hook_timing_stats: Dict[str, Any] = {}
for name, timings in hook_timings.items():
sorted_t = sorted(timings)
count = len(sorted_t)
avg_ms = sum(sorted_t) / count
p95_idx = min(int(count * 0.95), count - 1)
hook_timing_stats[name] = {
"count": count,
"avg_ms": round(avg_ms, 2),
"p95_ms": round(sorted_t[p95_idx], 2),
"max_ms": round(sorted_t[-1], 2),
}
data["hook_timing_stats"] = hook_timing_stats
# Cleanup stats file
try:
os.remove(self.stats_file)
except OSError:
pass
# Remove from the live set so the atexit flush does not resurrect
# the file we just removed.
_live_instances.discard(self)
return data
@staticmethod
def cleanup_stale(data_dir: str, max_age_hours: int = 24) -> None:
"""Remove stale stats files older than max_age_hours.
Args:
data_dir: Directory containing stats files.
max_age_hours: Maximum age in hours before cleanup.
"""
if not os.path.isdir(data_dir):
return
cutoff = time.time() - (max_age_hours * 3600)
for filename in os.listdir(data_dir):
if not filename.endswith(".json"):
continue
filepath = os.path.join(data_dir, filename)
try:
with open(filepath, "r", encoding="utf-8") as f:
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
data = json.load(f)
started_at = data.get("started_at", 0)
if started_at < cutoff:
os.remove(filepath)
except (json.JSONDecodeError, OSError, KeyError):
# Corrupted or inaccessible — try to remove
try:
os.remove(filepath)
except OSError:
pass
def _locked_read(self) -> Dict[str, Any]:
"""Read stats file with file locking."""
try:
with open(self.stats_file, "r", encoding="utf-8") as f:
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
return json.load(f)
except (json.JSONDecodeError, OSError):
return {
"session_id": self.session_id,
"started_at": time.time(),
"tool_count": 0,
"error_count": 0,
"tool_names": {},
"hook_timings": {},
}
def _locked_write(self, data: Dict[str, Any]) -> None:
"""Write stats file with file locking."""
with open(self.stats_file, "w", encoding="utf-8") as f:
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
json.dump(data, f)