-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclaude_code_spinner.py
More file actions
1641 lines (1393 loc) · 56.4 KB
/
claude_code_spinner.py
File metadata and controls
1641 lines (1393 loc) · 56.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Claude Code Spinner - Cross-platform retro ASCII spinner overlay for Claude Code.
Renders NxM ASCII art spinners directly onto the terminal via /dev/tty writes,
independent of Claude Code's Ink rendering pipeline. Ink redraws its own region;
the spinner occupies space outside that region (or gets naturally overwritten).
Modes:
enable - Register hooks in ~/.claude/settings.json (activates spinner)
disable - Remove hooks from ~/.claude/settings.json (deactivates spinner)
status - Show hook registration state and current configuration
hook - Handle Claude Code lifecycle events (invoked by hooks)
statusline - Render inline status line for Claude Code's status bar
overlay - Run spinner overlay daemon (launched automatically by hook)
kill - Kill running overlay daemon for a session
list - List available spinner variants
preview - Preview a spinner live in the terminal
init-config - Write default config.json
"""
import argparse
import json
import math
import os
import shutil
import signal
import struct
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
# ============================================================
# Spinner Library
# ============================================================
def _fmulti(frames, ms=200):
h = max(len(f) for f in frames) if frames else 1
w = max(len(line) for f in frames for line in f) if frames else 1
normalized = []
for frame in frames:
lines = [line.ljust(w) for line in frame]
if len(lines) < h:
lines += [" " * w] * (h - len(lines))
normalized.append(lines)
return {"w": w, "h": h, "ms": ms, "frames": normalized}
def _f1(frames, ms=120):
"""Helper for single-line spinner frames."""
w = max(len(f) for f in frames) if frames else 1
return {"w": w, "h": 1, "ms": ms, "frames": [[f.ljust(w)] for f in frames]}
SPINNERS = {
# --- 1-line spinners (for statusline) ---
# CRT scanning bar: left-to-right ping-pong sweep
"crt_bar": _f1([
"|> |",
"|=> |",
"|==> |",
"|===> |",
"|====>|",
"| <|",
"| <=|",
"| <==|",
"| <===|",
"|<====|",
], 90),
# Classic pipe spinner
"pipe": _f1(["|", "/", "-", "\\"], 120),
# Braille pulse
"braille": _f1([".", "o", "O", "0", "O", "o"], 100),
# sprite.txt variation #1
"variation_1": _fmulti([
[" ||", " ||", " ||", " ||", " ||"],
[" //", " //", " //", " //", "//"],
["", "", "==========", "", ""],
["\\\\", " \\\\", " \\\\", " \\\\", " \\\\"],
], 140),
# sprite.txt variation #2
"variation_2": _fmulti([
[" ||", " ||", " ||"],
["\\\\", " \\\\", " \\\\"],
["", "======", ""],
[" //", " //", "//"],
], 140),
# sprite.txt variation #3
"variation_3": _fmulti([
[" o * .", " 0 .", "@ .", " . .", " . . ."],
[" * . .", " o .", "0 .", " @ .", " . . ."],
[" . . .", " * .", "o .", " 0 .", " @ . ."],
[" . . .", " . .", "* .", " o .", " 0 @ ."],
[" . . .", " . .", ". .", " * .", " o 0 @"],
[" . . .", " . .", ". .", " . @", " * o 0"],
[" . . .", " . .", ". @", " . 0", " . * o"],
[" . . .", " . @", ". 0", " . o", " . . *"],
[" . . @", " . 0", ". o", " . *", " . . ."],
[" . @ 0", " . o", ". *", " . .", " . . ."],
[" @ 0 o", " . *", ". .", " . .", " . . ."],
[" 0 o *", " @ .", ". .", " . .", " . . ."],
], 120),
# sprite.txt variation #4
"variation_4": _fmulti([
[" 0 o", "@ .", " . ."],
[" o .", "0 .", " @ ."],
[" . .", "o .", " 0 @"],
[" . .", ". @", " o 0"],
[" . @", ". 0", " . o"],
[" @ 0", ". o", " . ."],
], 120),
}
def get_spinner(name):
return SPINNERS.get(name) or SPINNERS["variation_1"]
def get_spinner_for_statusline(name):
"""Return a spinner suitable for 1-line statusline display.
If the named spinner is multi-line, falls back to crt_bar and logs a
warning so the user knows their config needs to be updated.
"""
sp = SPINNERS.get(name)
if sp is None:
return SPINNERS["crt_bar"], f"unknown spinner '{name}', using crt_bar"
if sp["h"] > 1:
return SPINNERS["crt_bar"], f"spinner '{name}' is {sp['h']} lines tall; statusline needs h=1, using crt_bar"
return sp, None
def get_frame_at(spinner, timestamp_ms=None):
"""Time-based frame selection — used by the overlay daemon (continuous loop)."""
if timestamp_ms is None:
timestamp_ms = int(time.time() * 1000)
n = len(spinner["frames"])
idx = int((timestamp_ms / spinner["ms"]) % n)
return spinner["frames"][idx]
def get_next_statusline_frame(spinner, session_id):
"""Call-count-based frame selection — used by the statusline handler.
The statusline is invoked at a fixed refresh interval (e.g. every 1 second)
that rarely aligns with the spinner's ms-per-frame cycle. Using wall-clock
time causes the visible frame to advance by gcd(interval, cycle) steps,
making some frames never appear. Advancing by exactly +1 per call ensures
every frame is shown in order, regardless of the refresh interval.
"""
state = load_state(session_id) or {}
n = len(spinner["frames"])
idx = (state.get("sl_frame", -1) + 1) % n
state["sl_frame"] = idx
save_state(session_id, state)
return spinner["frames"][idx]
def auto_select_spinner(theme_name, avail_w, avail_h):
"""Pick the largest sprite variation that fits avail_w x avail_h."""
fitting = []
for name, sp in SPINNERS.items():
if sp["w"] <= avail_w and sp["h"] <= avail_h:
fitting.append((sp["w"] * sp["h"], sp["h"], sp["w"], name))
if fitting:
_, _, _, chosen = sorted(fitting)[-1]
return get_spinner(chosen), chosen
return SPINNERS["variation_1"], "variation_1"
# ============================================================
# Terminal I/O
# ============================================================
def _is_truthy(value):
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _run_capture(args, timeout=2, stdin_path=None):
try:
kwargs = {"capture_output": True, "text": True, "timeout": timeout}
if stdin_path:
with open(stdin_path, "r", encoding="utf-8", errors="ignore") as f:
result = subprocess.run(args, stdin=f, **kwargs)
else:
result = subprocess.run(args, **kwargs)
if result.returncode == 0:
return (result.stdout or "").strip()
except Exception:
pass
return ""
def _resolve_tmux_target():
manual_target = os.environ.get("CLAUDE_SPINNER_TMUX_TARGET", "").strip()
if manual_target:
return manual_target
pane = os.environ.get("TMUX_PANE", "").strip()
if pane:
return pane
probe = _run_capture(["tmux", "display-message", "-p", "#{pane_id}"], timeout=1)
if probe:
return probe
listing = _run_capture(
["tmux", "list-panes", "-a", "-F", "#{session_attached} #{pane_active} #{pane_id}"],
timeout=2,
)
if not listing:
return ""
for line in listing.splitlines():
parts = line.strip().split()
if len(parts) >= 3 and parts[0] == "1" and parts[1] == "1":
return parts[2]
for line in listing.splitlines():
parts = line.strip().split()
if len(parts) >= 3 and parts[0] == "1":
return parts[2]
return ""
def _resolve_tmux_tty():
target = _resolve_tmux_target()
if not target:
return ""
tty = _run_capture(["tmux", "display-message", "-p", "-t", target, "#{pane_tty}"], timeout=1)
if tty.startswith("/"):
return tty
return ""
def _walk_process_tree_tty():
"""Walk PPID chain to find the terminal TTY path (Unix only).
When CLAUDE_CODE_SPAWN_BACKEND=tmux, hooks run in subprocesses with no
controlling terminal. Walking up the process tree finds the PTY of the
ancestor terminal session. Mirrors peon-ping's _peon_walk_tty approach.
Uses two separate `ps` calls per iteration (-o tty= and -o ppid=) to
avoid relying on comma-separated field syntax that differs across
GNU ps (Linux), BSD ps (macOS), and BusyBox ps.
"""
if sys.platform == "win32":
return ""
try:
pid = os.getppid()
last_tty = ""
seen = set()
while pid > 1 and pid not in seen:
seen.add(pid)
tty_name = _run_capture(["ps", "-p", str(pid), "-o", "tty="], timeout=1)
ppid_str = _run_capture(["ps", "-p", str(pid), "-o", "ppid="], timeout=1)
if tty_name and tty_name not in ("??", ""):
# ps outputs "pts/0" (Linux) or "s004" (macOS) - prepend /dev/
last_tty = "/dev/" + tty_name
if not ppid_str or not ppid_str.isdigit():
break
pid = int(ppid_str)
return last_tty
except Exception:
return ""
class _Win32ConOut:
"""Direct WriteFile-to-CONOUT$ writer — the ctypes equivalent of C's fwrite to CONOUT$.
Python's text-mode open("CONOUT$") goes through TextIOWrapper -> BufferedWriter
-> FileIO -> WriteFile, adding codec and buffering layers. Using CreateFileW +
WriteFile directly (as a C io_Writer implementation would) removes those layers:
every write() call is a single synchronous WriteFile to the console handle.
VT processing is enabled on the handle at construction time so ANSI cursor-
positioning sequences are honoured even in a daemon spawned with redirected stdio.
"""
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
FILE_SHARE_RW = 0x00000003
OPEN_EXISTING = 3
ENABLE_VT = 0x0004
def __init__(self, name="CONOUT$"):
import ctypes
self._ctypes = ctypes
k32 = ctypes.windll.kernel32
k32.CreateFileW.restype = ctypes.c_void_p
handle = k32.CreateFileW(
name,
self.GENERIC_READ | self.GENERIC_WRITE,
self.FILE_SHARE_RW,
None, self.OPEN_EXISTING, 0, None,
)
# CreateFileW returns INVALID_HANDLE_VALUE (cast to signed: -1) on failure
if ctypes.c_void_p(handle).value == ctypes.c_void_p(-1).value:
raise OSError(f"CreateFileW({name!r}) failed")
self._handle = handle
self._k32 = k32
# Enable VT processing so ANSI sequences are honoured
mode = ctypes.c_ulong()
if k32.GetConsoleMode(ctypes.c_void_p(handle), ctypes.byref(mode)):
k32.SetConsoleMode(
ctypes.c_void_p(handle),
mode.value | self.ENABLE_VT,
)
def write(self, text):
data = text.encode("utf-8") if isinstance(text, str) else bytes(text)
if not data:
return
written = self._ctypes.c_ulong()
self._k32.WriteFile(
self._ctypes.c_void_p(self._handle),
data, len(data),
self._ctypes.byref(written),
None,
)
def flush(self):
pass # WriteFile is synchronous; nothing to flush
def close(self):
if self._handle is not None:
self._k32.CloseHandle(self._ctypes.c_void_p(self._handle))
self._handle = None
def _open_tty_path(path):
if not path:
return None
try:
return open(path, "w", encoding="utf-8", buffering=1)
except Exception:
return None
def open_tty_write():
override = os.environ.get("CLAUDE_SPINNER_TTY", "").strip()
if override:
tty = _open_tty_path(override)
if tty:
return tty, override, "env"
if sys.platform == "win32":
# Use _Win32ConOut: CreateFileW("CONOUT$") + WriteFile, with VT processing
# enabled on the handle. This is the direct ctypes equivalent of C's
# fopen("CONOUT$","w") + fwrite — the io_Writer-to-file pattern — and avoids
# Python's TextIOWrapper/BufferedWriter codec layers that sit between our
# ANSI sequences and the actual WriteFile syscall.
for con_name in ("CONOUT$", "CON"):
try:
return _Win32ConOut(con_name), con_name, "native"
except Exception:
pass
return None, "", "none"
tty = _open_tty_path("/dev/tty")
if tty:
return tty, "/dev/tty", "native"
tmux_tty = _resolve_tmux_tty()
if tmux_tty:
tmux_handle = _open_tty_path(tmux_tty)
if tmux_handle:
return tmux_handle, tmux_tty, "tmux"
proc_tty = _walk_process_tree_tty()
if proc_tty:
proc_handle = _open_tty_path(proc_tty)
if proc_handle:
return proc_handle, proc_tty, "proc"
return None, "", "none"
def query_terminal_size_win32():
"""Windows Console API: GetConsoleScreenBufferInfo on CONOUT$.
Works even when the process was spawned with redirected stdio (stdout →
DEVNULL), because we open CONOUT$ explicitly via CreateFileW rather than
relying on the inherited stdout handle. Returns the *visible window* size
(srWindow), not the scroll-back buffer size (dwSize).
"""
if sys.platform != "win32":
return None
try:
import ctypes
import ctypes.wintypes
kernel32 = ctypes.windll.kernel32
GENERIC_READ = 0x80000000
FILE_SHARE_RW = 0x00000003
OPEN_EXISTING = 3
INVALID_HANDLE = ctypes.c_void_p(-1).value
kernel32.CreateFileW.restype = ctypes.c_void_p
handle = kernel32.CreateFileW(
"CONOUT$", GENERIC_READ, FILE_SHARE_RW,
None, OPEN_EXISTING, 0, None,
)
if handle == INVALID_HANDLE:
return None
class _COORD(ctypes.Structure):
_fields_ = [("X", ctypes.c_short), ("Y", ctypes.c_short)]
class _SMALL_RECT(ctypes.Structure):
_fields_ = [
("Left", ctypes.c_short), ("Top", ctypes.c_short),
("Right", ctypes.c_short), ("Bottom", ctypes.c_short),
]
class _CSBI(ctypes.Structure):
_fields_ = [
("dwSize", _COORD),
("dwCursorPosition", _COORD),
("wAttributes", ctypes.c_ushort),
("srWindow", _SMALL_RECT),
("dwMaximumWindowSize", _COORD),
]
info = _CSBI()
ok = kernel32.GetConsoleScreenBufferInfo(handle, ctypes.byref(info))
kernel32.CloseHandle(handle)
if ok:
cols = info.srWindow.Right - info.srWindow.Left + 1
rows = info.srWindow.Bottom - info.srWindow.Top + 1
if rows > 0 and cols > 0:
return rows, cols
except Exception:
pass
return None
def query_terminal_size_ioctl(tty_path):
"""ioctl TIOCGWINSZ → (rows, cols) or None."""
if sys.platform == "win32":
return None
candidate = tty_path or "/dev/tty"
try:
import fcntl
import termios
with open(candidate, "r", encoding="utf-8", errors="ignore") as f:
result = fcntl.ioctl(f.fileno(), termios.TIOCGWINSZ, b'\x00' * 8)
rows, cols = struct.unpack('HHHH', result)[:2]
if rows > 0 and cols > 0:
return rows, cols
except Exception:
pass
return None
def query_terminal_size_tmux():
target = _resolve_tmux_target()
if not target:
return None
out = _run_capture(
["tmux", "display-message", "-p", "-t", target, "#{pane_height} #{pane_width}"],
timeout=1,
)
if not out:
return None
parts = out.split()
if len(parts) != 2:
return None
try:
rows = int(parts[0])
cols = int(parts[1])
if rows > 0 and cols > 0:
return rows, cols
except Exception:
pass
return None
def get_terminal_size(tty_path=""):
"""(rows, cols) via win32 API → ioctl → os → stty → tmux → env → fallback."""
r = query_terminal_size_win32()
if r:
return r
r = query_terminal_size_ioctl(tty_path)
if r:
return r
try:
cols, rows = os.get_terminal_size()
if rows > 0 and cols > 0:
return rows, cols
except Exception:
pass
stty_stdin = tty_path if tty_path else "/dev/tty"
out = _run_capture(["stty", "size"], stdin_path=stty_stdin, timeout=1)
if out:
parts = out.split()
if len(parts) == 2:
try:
return int(parts[0]), int(parts[1])
except Exception:
pass
tmux_size = query_terminal_size_tmux()
if tmux_size:
return tmux_size
try:
rows = int(os.environ.get("LINES", 0))
cols = int(os.environ.get("COLUMNS", 0))
if rows > 0 and cols > 0:
return rows, cols
except Exception:
pass
return 24, 80
# ============================================================
# Config & State
# ============================================================
DEFAULT_CONFIG = {
"spinner": "auto",
"theme": "sprite",
"position": "top-right",
"margin_row": 1,
"margin_col": 2,
"max_width_ratio": 0.25,
"max_height_ratio": 0.33,
"color": "1;36",
"idle_clear": True,
# Screen re-render rate (Hz). Higher = recovers faster after Ink's logUpdate
# erases the overlay with \033[K. Ink redraws at ~20-60Hz during streaming;
# at 100Hz our recovery gap is at most 10ms — imperceptible to the user.
"redraw_hz": 100,
"statusline": {
"enabled": False,
"spinner": "crt_bar",
"label": "CRT",
"show_model": True,
"show_git": True,
"show_cost": True,
"show_elapsed": True,
},
"debug": False,
}
def get_config_dir():
env = os.environ.get("CLAUDE_SPINNER_DIR")
if env:
p = Path(env)
else:
p = Path.home() / ".claude" / "claude-code-spinner"
p.mkdir(parents=True, exist_ok=True)
return p
def load_config():
cfg_path = get_config_dir() / "config.json"
cfg = json.loads(json.dumps(DEFAULT_CONFIG))
if cfg_path.exists():
try:
with open(cfg_path, "r", encoding="utf-8") as f:
user = json.load(f)
for k, v in user.items():
if k == "statusline" and isinstance(v, dict):
cfg["statusline"] = {**DEFAULT_CONFIG["statusline"], **v}
else:
cfg[k] = v
except Exception:
pass
return cfg
def get_state_path(session_id):
if not session_id:
return None
return get_config_dir() / f"{session_id}.state.json"
def save_state(session_id, state):
p = get_state_path(session_id)
if not p:
return
try:
p.write_text(json.dumps(state), encoding="utf-8")
except Exception:
pass
def load_state(session_id):
p = get_state_path(session_id)
if not p or not p.exists():
return None
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
return None
def debug_log(cfg, phase, **fields):
enabled = _is_truthy(os.environ.get("CLAUDE_SPINNER_DEBUG", "0")) or bool(cfg.get("debug", False))
if not enabled:
return
try:
log_path = get_config_dir() / "debug.log"
ts = datetime.now().isoformat(timespec="milliseconds")
pairs = []
for k, v in fields.items():
sval = str(v)
if " " in sval or "=" in sval:
sval = '"' + sval.replace("\\", "\\\\").replace('"', '\\"') + '"'
pairs.append(f"{k}={sval}")
line = f"{ts} [{phase}] " + " ".join(pairs) + "\n"
with open(log_path, "a", encoding="utf-8") as f:
f.write(line)
except Exception:
pass
def get_pid_path(session_id):
return get_config_dir() / f"{session_id}.overlay.pid"
def get_stop_path(session_id):
return get_config_dir() / f"{session_id}.overlay.stop"
def read_stdin_json():
try:
raw = sys.stdin.read()
if not raw or not raw.strip():
return None
return json.loads(raw)
except Exception:
return None
def deep_get(obj, *keys, default=None):
cur = obj
for k in keys:
if isinstance(cur, dict) and k in cur:
cur = cur[k]
else:
return default
return cur
# ============================================================
# Position & Sizing
# ============================================================
def compute_position(cfg, spinner_w, spinner_h, term_rows, term_cols):
"""→ (row, col), 1-indexed."""
position = cfg.get("position", "top-right")
margin_r = cfg.get("margin_row", 1)
margin_c = cfg.get("margin_col", 2)
if isinstance(position, dict):
return position.get("row", 1), position.get("col", 1)
pos = str(position).lower().replace("-", "").replace("_", "")
r = (1 + margin_r) if "top" in pos else max(1, term_rows - spinner_h - margin_r + 1)
c = (1 + margin_c) if "left" in pos else max(1, term_cols - spinner_w - margin_c + 1)
return r, c
def compute_available_space(cfg, term_rows, term_cols):
"""Max spinner dimensions at the configured position."""
margin_r = cfg.get("margin_row", 1)
margin_c = cfg.get("margin_col", 2)
max_w = min(term_cols - margin_c * 2, int(term_cols * cfg.get("max_width_ratio", 0.25)))
max_h = min(term_rows - margin_r * 2, int(term_rows * cfg.get("max_height_ratio", 0.33)))
return max(1, max_w), max(1, max_h)
# ============================================================
# Overlay Daemon
# ============================================================
def render_frame(tty, frame, row, col, color=None, drift_rows=2):
# \033[s / \033[u: save and restore cursor around the overlay write so the
# foreground renderer (Claude Code / Ink) can continue from where it left off.
# This requires sharing the same console (ensured on Windows by the
# FreeConsole + AttachConsole dance in run_overlay).
#
# drift_rows: number of extra blank rows written immediately BELOW the
# spinner after each frame. When the terminal scrolls, the previous
# frame shifts down by one row and becomes a "ghost". Blanking those
# rows on every render erases scroll-drift artifacts without needing a
# separate scroll-region VT state change (which interferes with Ink).
buf = ["\033[s"]
c_on = f"\033[{color}m" if color else ""
c_off = "\033[0m" if color else ""
# Erase drift zone below spinner
blank = " " * (max(len(line) for line in frame) + 2)
for i in range(drift_rows):
buf.append(f"\033[{row + len(frame) + i};{col}H{blank}")
# Draw spinner frame
for i, line in enumerate(frame):
buf.append(f"\033[{row + i};{col}H{c_on}{line}{c_off}")
buf.append("\033[u")
tty.write("".join(buf))
tty.flush()
def clear_area(tty, h, w, row, col):
buf = ["\033[s"]
blank = " " * (w + 2)
for i in range(h):
buf.append(f"\033[{row + i};{col}H{blank}")
buf.append("\033[u")
tty.write("".join(buf))
tty.flush()
def _win_attach_parent_console(cfg, session_id):
"""FreeConsole() then find a console-owning process to attach to.
Strategy (in order):
1. Walk the parent chain (daemon → hook → node → bash → ...).
The hook may have already exited, so the chain can be short.
2. If the chain is exhausted without success, search the full process
list for well-known interactive shell/terminal executables and try
each one. This handles the case where the hook subprocess has exited
before the snapshot is taken, breaking the ancestor walk.
Mirrors the core pattern from process_b.py:
kernel32.FreeConsole()
kernel32.AttachConsole(pid_that_owns_console)
"""
if sys.platform != "win32":
return False
try:
import ctypes
k32 = ctypes.windll.kernel32
k32.FreeConsole()
# Snapshot all running processes → {pid: ppid}, {pid: exe_name}
TH32CS_SNAPPROCESS = 0x00000002
k32.CreateToolhelp32Snapshot.restype = ctypes.c_void_p
snap = k32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
inv = ctypes.c_void_p(-1).value
if snap == inv or snap is None:
debug_log(cfg, "attach", session=session_id, error="snapshot_failed")
return False
class PROCESSENTRY32W(ctypes.Structure):
_fields_ = [
("dwSize", ctypes.c_ulong),
("cntUsage", ctypes.c_ulong),
("th32ProcessID", ctypes.c_ulong),
("th32DefaultHeapID", ctypes.c_size_t),
("th32ModuleID", ctypes.c_ulong),
("cntThreads", ctypes.c_ulong),
("th32ParentProcessID", ctypes.c_ulong),
("pcPriClassBase", ctypes.c_long),
("dwFlags", ctypes.c_ulong),
("szExeFile", ctypes.c_wchar * 260),
]
entry = PROCESSENTRY32W()
entry.dwSize = ctypes.sizeof(PROCESSENTRY32W)
parent_of = {}
name_of = {}
if k32.Process32FirstW(snap, ctypes.byref(entry)):
while True:
parent_of[entry.th32ProcessID] = entry.th32ParentProcessID
name_of[entry.th32ProcessID] = entry.szExeFile
if not k32.Process32NextW(snap, ctypes.byref(entry)):
break
k32.CloseHandle(ctypes.c_void_p(snap))
seen = set()
# --- Pass 0: try PIDs passed from the hook via env var ---
# start_overlay_daemon() records hook_pid and claude_pid BEFORE the hook
# exits, so we can try them even though hook is gone from the process list.
pids_env = os.environ.get("CLAUDE_SPINNER_ATTACH_PIDS", "")
if pids_env:
for epid_str in pids_env.split(","):
try:
epid = int(epid_str.strip())
except ValueError:
continue
if not epid or epid in seen:
continue
seen.add(epid)
ok = bool(k32.AttachConsole(epid))
debug_log(cfg, "attach_try_env", session=session_id,
pid=epid, exe=name_of.get(epid, "?"), ok=ok)
if ok:
return True
# --- Pass 1: walk ancestor chain from daemon's recorded parent ---
# hook may already be gone; chain truncates after first missing pid.
pid = os.getpid()
while True:
ppid = parent_of.get(pid, 0)
if not ppid or ppid == pid or ppid in seen:
break
seen.add(ppid)
ok = bool(k32.AttachConsole(ppid))
debug_log(cfg, "attach_try", session=session_id,
pid=ppid, exe=name_of.get(ppid, "?"), ok=ok)
if ok:
return True
pid = ppid
# --- Pass 2: search known shell/console hosts, ANCESTORS first ---
# The correct bash is the one that is a process-tree ancestor of this
# daemon (it is the shell the user typed 'claude' in). Subshells
# spawned by Claude Code for tool execution are DESCENDANTS, not
# ancestors. Build the daemon's full ancestor set from the snapshot
# and try ancestor candidates before non-ancestor ones.
CONSOLE_HOSTS = {
"bash.exe", "sh.exe", "zsh.exe",
"mintty.exe",
"powershell.exe", "pwsh.exe",
"cmd.exe",
"conhost.exe",
}
daemon_ancestors: set = set()
_pid = os.getpid()
while True:
_ppid = parent_of.get(_pid, 0)
if not _ppid or _ppid in daemon_ancestors:
break
daemon_ancestors.add(_ppid)
_pid = _ppid
# Sort: ancestors first (key=False sorts before True)
candidates = sorted(
((cpid, exe) for cpid, exe in name_of.items()
if exe.lower() in CONSOLE_HOSTS and cpid not in seen),
key=lambda ce: ce[0] not in daemon_ancestors,
)
for cpid, exe in candidates:
seen.add(cpid)
is_ancestor = cpid in daemon_ancestors
ok = bool(k32.AttachConsole(cpid))
debug_log(cfg, "attach_try_host", session=session_id,
pid=cpid, exe=exe, ancestor=is_ancestor, ok=ok)
if ok:
return True
return False
except Exception as exc:
debug_log(cfg, "attach_error", session=session_id, error=str(exc)[:120])
return False
def run_overlay(session_id, cfg):
# On Windows: the daemon inherits a console from the hook subprocess, but
# that console is a background/invisible console created by Claude Code
# (node.js/Electron) — NOT the visible terminal. Opening CONOUT$ on the
# inherited console succeeds mechanically but writes go to an invisible
# buffer. We MUST FreeConsole + AttachConsole to the shell's visible
# console (bash.exe, conhost.exe, etc.) before opening CONOUT$.
# This mirrors process_b.py: FreeConsole() + AttachConsole(process_a_pid).
if sys.platform == "win32":
ok = _win_attach_parent_console(cfg, session_id)
if not ok:
debug_log(cfg, "overlay", session=session_id,
status="attach_failed_no_console")
tty, tty_path, tty_source = open_tty_write()
if tty is None:
debug_log(cfg, "overlay", session=session_id, status="no_tty")
sys.exit(1)
debug_log(cfg, "overlay", session=session_id, tty=tty_path or "-", source=tty_source)
spinner_name = cfg.get("spinner", "auto")
theme = cfg.get("theme", "sprite")
color = cfg.get("color", "1;36")
term_rows, term_cols = get_terminal_size(tty_path)
avail_w, avail_h = compute_available_space(cfg, term_rows, term_cols)
debug_log(cfg, "overlay", session=session_id,
term=f"{term_cols}x{term_rows}", avail=f"{avail_w}x{avail_h}")
if spinner_name == "auto":
sp, resolved = auto_select_spinner(theme, avail_w, avail_h)
else:
sp = get_spinner(spinner_name)
resolved = spinner_name
row, col = compute_position(cfg, sp["w"], sp["h"], term_rows, term_cols)
debug_log(cfg, "overlay", session=session_id,
spinner=resolved, size=f"{sp['w']}x{sp['h']}", pos=f"{row},{col}")
# Animation speed (ms per frame) is controlled by the spinner definition.
# Screen re-render rate is independent: re-draw at redraw_hz even when the
# frame hasn't changed, so Ink/other renderers can overwrite us at most for
# 1/redraw_hz seconds before we re-appear.
redraw_hz = max(1, cfg.get("redraw_hz", 20))
redraw_interval = 1.0 / redraw_hz
stop_path = get_stop_path(session_id)
# Remove any stale stop file left from a previous crashed session
try:
stop_path.unlink()
except Exception:
pass
last_size = (term_rows, term_cols)
poll_counter = 0
# Resize check every ~2 s; stop-file check every ~100 ms (every 10 frames at 100 hz)
stop_check_frames = max(1, int(0.1 / max(redraw_interval, 0.001)))
_cleanup_done = False
def cleanup(signum=None, _frame=None):
nonlocal _cleanup_done
if _cleanup_done:
sys.exit(0)
_cleanup_done = True
try:
clear_area(tty, sp["h"], sp["w"], row, col)
except Exception:
pass
try:
tty.close()
except Exception:
pass
sys.exit(0)
signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)
def handle_resize(signum=None, _frame=None):
nonlocal term_rows, term_cols, avail_w, avail_h, sp, resolved, row, col
try:
clear_area(tty, sp["h"], sp["w"], row, col)
except Exception:
pass
term_rows, term_cols = get_terminal_size(tty_path)
avail_w, avail_h = compute_available_space(cfg, term_rows, term_cols)
if spinner_name == "auto":
sp, resolved = auto_select_spinner(theme, avail_w, avail_h)
row, col = compute_position(cfg, sp["w"], sp["h"], term_rows, term_cols)
if hasattr(signal, "SIGWINCH"):
signal.signal(signal.SIGWINCH, handle_resize)
render_count = 0
try:
while True:
# get_frame_at is time-based: animation advances at sp["ms"] pace
# regardless of how often we call render_frame.
frame_data = get_frame_at(sp)
render_frame(tty, frame_data, row, col, color)
render_count += 1
if render_count == 1:
debug_log(cfg, "overlay_render1", session=session_id,
pos=f"{row},{col}", frame0=repr(frame_data[0][:20]))
time.sleep(redraw_interval)
poll_counter += 1
# Stop-file check: stop_overlay_daemon() writes this before force-killing
# so the daemon has a chance to clear the screen before termination.
if poll_counter % stop_check_frames == 0:
if stop_path.exists():
try:
stop_path.unlink()
except Exception:
pass
cleanup()
# Poll resize on platforms without SIGWINCH (~every 2 seconds)
if poll_counter >= int(2.0 / max(redraw_interval, 0.01)):
poll_counter = 0
new_size = get_terminal_size(tty_path)
if new_size != last_size:
last_size = new_size
handle_resize()
except (KeyboardInterrupt, SystemExit):
cleanup()
except Exception as exc:
debug_log(cfg, "overlay_error", session=session_id,
error=type(exc).__name__, detail=str(exc)[:120])
cleanup()
def start_overlay_daemon(session_id, cfg):