diff --git a/packages/claude-code-plugin/hooks/lib/adaptive_perf.py b/packages/claude-code-plugin/hooks/lib/adaptive_perf.py new file mode 100644 index 00000000..84837c59 --- /dev/null +++ b/packages/claude-code-plugin/hooks/lib/adaptive_perf.py @@ -0,0 +1,301 @@ +"""Adaptive performance mode for CodingBuddy hooks (#1002). + +Monitors hook execution times and auto-switches to lightweight mode +when the threshold is exceeded. In lightweight mode, heavy analysis +(conflict prediction, full project scan) is skipped while core +functions (mode detection, basic validation) are preserved. +""" +import sys +import time +from typing import Any, Dict, List, Optional + + +# Default threshold in milliseconds +DEFAULT_TIMEOUT_MS = 10000 + +# Operations classified by weight +HEAVY_OPERATIONS = frozenset({ + "conflict_prediction", + "full_project_scan", + "agent_recommendations", + "file_watcher", +}) + +CORE_OPERATIONS = frozenset({ + "mode_detection", + "basic_validation", + "stats_recording", + "history_recording", + "prompt_injection", + "hook_install", +}) + + +class AdaptivePerformanceMonitor: + """Monitors hook performance and controls lightweight mode transitions. + + Tracks execution times per hook and switches to lightweight mode + when recent execution times consistently exceed the configured + threshold. The monitor uses a sliding window of recent timings + to make decisions. + """ + + _instance: Optional["AdaptivePerformanceMonitor"] = None + + def __init__(self, timeout_ms: int = DEFAULT_TIMEOUT_MS) -> None: + self._timeout_ms = timeout_ms + self._lightweight_mode = False + self._timings: Dict[str, List[float]] = {} + self._active_timers: Dict[str, float] = {} + self._switch_count = 0 + self._window_size = 5 + + @classmethod + def get_instance( + cls, config: Optional[Dict[str, Any]] = None + ) -> "AdaptivePerformanceMonitor": + """Get or create the singleton monitor instance. + + Args: + config: Optional config dict; reads hooks.timeoutMs if present. + + Returns: + The singleton AdaptivePerformanceMonitor. + """ + if cls._instance is None: + timeout_ms = DEFAULT_TIMEOUT_MS + if config: + hooks_cfg = config.get("hooks", {}) + timeout_ms = hooks_cfg.get("timeoutMs", DEFAULT_TIMEOUT_MS) + cls._instance = cls(timeout_ms=timeout_ms) + return cls._instance + + @classmethod + def reset_instance(cls) -> None: + """Reset the singleton (for testing).""" + cls._instance = None + + @property + def is_lightweight(self) -> bool: + """Whether lightweight mode is currently active.""" + return self._lightweight_mode + + @property + def timeout_ms(self) -> int: + """Current timeout threshold in milliseconds.""" + return self._timeout_ms + + @property + def switch_count(self) -> int: + """Number of times mode has been switched.""" + return self._switch_count + + def start_timing(self, hook_name: str) -> None: + """Start timing a hook execution. + + Args: + hook_name: Identifier for the hook being timed. + """ + self._active_timers[hook_name] = time.monotonic() + + def stop_timing(self, hook_name: str) -> float: + """Stop timing and record the duration. Evaluates mode switch. + + Args: + hook_name: Identifier for the hook to stop timing. + + Returns: + Elapsed time in milliseconds. + + Raises: + ValueError: If no active timer for the given hook. + """ + if hook_name not in self._active_timers: + raise ValueError(f"No active timer for: {hook_name}") + + elapsed_ms = (time.monotonic() - self._active_timers.pop(hook_name)) * 1000 + + if hook_name not in self._timings: + self._timings[hook_name] = [] + self._timings[hook_name].append(elapsed_ms) + + # Keep only the sliding window + if len(self._timings[hook_name]) > self._window_size: + self._timings[hook_name] = self._timings[hook_name][-self._window_size:] + + self._evaluate_mode() + return elapsed_ms + + def record_timing(self, hook_name: str, elapsed_ms: float) -> None: + """Record a timing directly (when start/stop is not used). + + Args: + hook_name: Identifier for the hook. + elapsed_ms: Elapsed time in milliseconds. + """ + if hook_name not in self._timings: + self._timings[hook_name] = [] + self._timings[hook_name].append(elapsed_ms) + + if len(self._timings[hook_name]) > self._window_size: + self._timings[hook_name] = self._timings[hook_name][-self._window_size:] + + self._evaluate_mode() + + def should_skip(self, operation: str) -> bool: + """Check if an operation should be skipped in current mode. + + Args: + operation: Operation name to check. + + Returns: + True if the operation should be skipped (lightweight mode + active and operation is heavy). + """ + if not self._lightweight_mode: + return False + return operation in HEAVY_OPERATIONS + + def get_status(self) -> Dict[str, Any]: + """Get current performance monitor status. + + Returns: + Dict with mode, threshold, timings summary, and switch count. + """ + timing_summary: Dict[str, Dict[str, float]] = {} + for hook_name, timings in self._timings.items(): + if timings: + timing_summary[hook_name] = { + "count": len(timings), + "avg_ms": round(sum(timings) / len(timings), 2), + "max_ms": round(max(timings), 2), + } + return { + "lightweight_mode": self._lightweight_mode, + "timeout_ms": self._timeout_ms, + "switch_count": self._switch_count, + "timings": timing_summary, + } + + def _evaluate_mode(self) -> None: + """Evaluate whether to switch modes based on recent timings. + + Switches to lightweight mode if any hook's average recent + execution time exceeds the threshold. Switches back to normal + if all hooks are under 60% of the threshold. + """ + any_over_threshold = False + all_under_recovery = True + recovery_threshold = self._timeout_ms * 0.6 + + for timings in self._timings.values(): + if not timings: + continue + avg = sum(timings) / len(timings) + if avg >= self._timeout_ms: + any_over_threshold = True + if avg >= recovery_threshold: + all_under_recovery = False + + if not self._lightweight_mode and any_over_threshold: + self._lightweight_mode = True + self._switch_count += 1 + elif self._lightweight_mode and all_under_recovery and not any_over_threshold: + self._lightweight_mode = False + self._switch_count += 1 + + +def get_monitor(config: Optional[Dict[str, Any]] = None) -> AdaptivePerformanceMonitor: + """Convenience function to get the singleton monitor. + + Args: + config: Optional codingbuddy config dict. + + Returns: + The AdaptivePerformanceMonitor singleton. + """ + return AdaptivePerformanceMonitor.get_instance(config) + + +def format_lightweight_notice(language: str = "en") -> str: + """Format user-facing notice when switching to lightweight mode. + + Args: + language: Language code (en, ko, ja, zh, es). + + Returns: + Localized notice string. + """ + notices = { + "en": ( + "[CodingBuddy] Switched to lightweight mode — " + "hooks were exceeding time threshold. " + "Heavy analysis (conflict prediction, full project scan) " + "will be skipped. Core functions remain active." + ), + "ko": ( + "[CodingBuddy] 경량 모드로 전환됨 — " + "훅 실행 시간이 임계값을 초과했습니다. " + "무거운 분석(충돌 예측, 전체 프로젝트 스캔)을 건너뜁니다. " + "핵심 기능은 유지됩니다." + ), + "ja": ( + "[CodingBuddy] 軽量モードに切り替えました — " + "フックの実行時間がしきい値を超えました。" + "重い分析(コンフリクト予測、フルプロジェクトスキャン)は" + "スキップされます。コア機能は維持されます。" + ), + "zh": ( + "[CodingBuddy] 已切换到轻量模式 — " + "钩子执行时间超过阈值。" + "将跳过重度分析(冲突预测、完整项目扫描)。" + "核心功能保持活跃。" + ), + "es": ( + "[CodingBuddy] Cambiado a modo ligero — " + "los hooks excedieron el umbral de tiempo. " + "Se omitirá el análisis pesado (predicción de conflictos, " + "escaneo completo del proyecto). Las funciones principales " + "permanecen activas." + ), + } + return notices.get(language, notices["en"]) + + +def format_normal_notice(language: str = "en") -> str: + """Format user-facing notice when returning to normal mode. + + Args: + language: Language code (en, ko, ja, zh, es). + + Returns: + Localized notice string. + """ + notices = { + "en": ( + "[CodingBuddy] Returned to normal mode — " + "hook performance has recovered. " + "Full analysis is now active." + ), + "ko": ( + "[CodingBuddy] 일반 모드로 복귀 — " + "훅 성능이 회복되었습니다. " + "전체 분석이 다시 활성화됩니다." + ), + "ja": ( + "[CodingBuddy] 通常モードに復帰 — " + "フックのパフォーマンスが回復しました。" + "完全な分析が再びアクティブです。" + ), + "zh": ( + "[CodingBuddy] 已返回正常模式 — " + "钩子性能已恢复。" + "完整分析已重新激活。" + ), + "es": ( + "[CodingBuddy] Vuelto al modo normal — " + "el rendimiento de los hooks se ha recuperado. " + "El análisis completo está activo nuevamente." + ), + } + return notices.get(language, notices["en"]) diff --git a/packages/claude-code-plugin/hooks/pre-tool-use.py b/packages/claude-code-plugin/hooks/pre-tool-use.py index cafe10e0..8c168c62 100644 --- a/packages/claude-code-plugin/hooks/pre-tool-use.py +++ b/packages/claude-code-plugin/hooks/pre-tool-use.py @@ -24,6 +24,7 @@ from safe_main import safe_main from config import get_config from agent_status import build_status_message +from adaptive_perf import get_monitor # Pattern to detect git commit in a command string _GIT_COMMIT_RE = re.compile(r"\bgit\s+commit\b") @@ -181,12 +182,17 @@ def _handle(data: dict) -> Optional[dict]: tool_name = data.get("tool_name", "") contexts = [] + # Initialize adaptive performance monitor (#1002) + perf_config = _get_hook_config() + perf_monitor = get_monitor(perf_config) + # Bash-specific checks if tool_name == "Bash": - # Check for file changes (#823) - file_change_msg = _check_file_changes(data) - if file_change_msg: - contexts.append(file_change_msg) + # Check for file changes (#823) — skip in lightweight mode (#1002) + if not perf_monitor.should_skip("file_watcher"): + file_change_msg = _check_file_changes(data) + if file_change_msg: + contexts.append(file_change_msg) command = data.get("tool_input", {}).get("command", "") diff --git a/packages/claude-code-plugin/hooks/session-start.py b/packages/claude-code-plugin/hooks/session-start.py index 4d6ccc77..0568c68d 100644 --- a/packages/claude-code-plugin/hooks/session-start.py +++ b/packages/claude-code-plugin/hooks/session-start.py @@ -542,27 +542,38 @@ def main(): # Step 6: Buddy greeting + project scan + agent recommendations (#968) # Enhanced with returning session context (#975) + # Adaptive performance mode (#1002): skip heavy scan in lightweight mode try: _ensure_lib_path() from config import get_config as _get_config - from project_scanner import scan_project, get_agent_recommendations from buddy_renderer import render_session_start + from adaptive_perf import get_monitor, format_lightweight_notice cwd = os.environ.get("CLAUDE_PROJECT_DIR", str(Path.cwd())) cfg = _get_config(cwd) tone = cfg.get("tone", "casual") language = cfg.get("language", "en") - # Scan project - scan_data = scan_project(cwd) + perf_monitor = get_monitor(cfg) - # Load agent visuals - agents_dir = _find_agents_dir() - agents = load_agent_visuals(agents_dir) if agents_dir else {} + scan_data = {} + recommendations = [] - # Generate recommendations - recommendations = get_agent_recommendations(scan_data, agents) + if not perf_monitor.should_skip("full_project_scan"): + from project_scanner import scan_project + perf_monitor.start_timing("project_scan") + scan_data = scan_project(cwd) + elapsed = perf_monitor.stop_timing("project_scan") + + if not perf_monitor.should_skip("agent_recommendations"): + agents_dir = _find_agents_dir() + agents = load_agent_visuals(agents_dir) if agents_dir else {} + from project_scanner import get_agent_recommendations + recommendations = get_agent_recommendations(scan_data, agents) + else: + # Lightweight mode: notify user + print(format_lightweight_notice(language), file=sys.stderr) # Detect returning session (#975) previous_session = None diff --git a/packages/claude-code-plugin/hooks/tests/test_adaptive_perf.py b/packages/claude-code-plugin/hooks/tests/test_adaptive_perf.py new file mode 100644 index 00000000..8c0a2fb9 --- /dev/null +++ b/packages/claude-code-plugin/hooks/tests/test_adaptive_perf.py @@ -0,0 +1,204 @@ +"""Tests for adaptive_perf module (#1002).""" +import os +import sys +import unittest + +# Ensure hooks/lib is on sys.path +_hooks_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +_lib_dir = os.path.join(_hooks_dir, "lib") +if _lib_dir not in sys.path: + sys.path.insert(0, _lib_dir) + +from adaptive_perf import ( + AdaptivePerformanceMonitor, + HEAVY_OPERATIONS, + CORE_OPERATIONS, + DEFAULT_TIMEOUT_MS, + format_lightweight_notice, + format_normal_notice, + get_monitor, +) + + +class TestAdaptivePerformanceMonitor(unittest.TestCase): + """Tests for AdaptivePerformanceMonitor.""" + + def setUp(self): + AdaptivePerformanceMonitor.reset_instance() + self.monitor = AdaptivePerformanceMonitor(timeout_ms=1000) + + def tearDown(self): + AdaptivePerformanceMonitor.reset_instance() + + def test_initial_state_is_normal_mode(self): + self.assertFalse(self.monitor.is_lightweight) + self.assertEqual(self.monitor.switch_count, 0) + + def test_default_timeout(self): + m = AdaptivePerformanceMonitor() + self.assertEqual(m.timeout_ms, DEFAULT_TIMEOUT_MS) + + def test_custom_timeout(self): + self.assertEqual(self.monitor.timeout_ms, 1000) + + def test_record_timing_below_threshold_stays_normal(self): + for _ in range(5): + self.monitor.record_timing("hook_a", 500) + self.assertFalse(self.monitor.is_lightweight) + + def test_record_timing_above_threshold_switches_to_lightweight(self): + for _ in range(5): + self.monitor.record_timing("hook_a", 1500) + self.assertTrue(self.monitor.is_lightweight) + self.assertEqual(self.monitor.switch_count, 1) + + def test_recovery_back_to_normal(self): + # Trigger lightweight mode + for _ in range(5): + self.monitor.record_timing("hook_a", 1500) + self.assertTrue(self.monitor.is_lightweight) + + # Recover: all under 60% of threshold (600ms) + for _ in range(5): + self.monitor.record_timing("hook_a", 400) + self.assertFalse(self.monitor.is_lightweight) + self.assertEqual(self.monitor.switch_count, 2) + + def test_should_skip_heavy_in_lightweight_mode(self): + # Force lightweight + for _ in range(5): + self.monitor.record_timing("hook_a", 2000) + self.assertTrue(self.monitor.is_lightweight) + + for op in HEAVY_OPERATIONS: + self.assertTrue(self.monitor.should_skip(op), f"{op} should be skipped") + + def test_should_not_skip_core_in_lightweight_mode(self): + # Force lightweight + for _ in range(5): + self.monitor.record_timing("hook_a", 2000) + self.assertTrue(self.monitor.is_lightweight) + + for op in CORE_OPERATIONS: + self.assertFalse(self.monitor.should_skip(op), f"{op} should NOT be skipped") + + def test_should_not_skip_anything_in_normal_mode(self): + for op in HEAVY_OPERATIONS | CORE_OPERATIONS: + self.assertFalse(self.monitor.should_skip(op)) + + def test_start_stop_timing(self): + self.monitor.start_timing("hook_x") + elapsed = self.monitor.stop_timing("hook_x") + self.assertGreaterEqual(elapsed, 0) + + def test_stop_without_start_raises(self): + with self.assertRaises(ValueError): + self.monitor.stop_timing("nonexistent") + + def test_sliding_window_limits_timings(self): + for i in range(20): + self.monitor.record_timing("hook_a", 100 + i) + status = self.monitor.get_status() + self.assertEqual(status["timings"]["hook_a"]["count"], 5) + + def test_get_status(self): + self.monitor.record_timing("hook_a", 500) + self.monitor.record_timing("hook_a", 600) + status = self.monitor.get_status() + + self.assertFalse(status["lightweight_mode"]) + self.assertEqual(status["timeout_ms"], 1000) + self.assertEqual(status["switch_count"], 0) + self.assertIn("hook_a", status["timings"]) + self.assertEqual(status["timings"]["hook_a"]["count"], 2) + self.assertEqual(status["timings"]["hook_a"]["avg_ms"], 550.0) + self.assertEqual(status["timings"]["hook_a"]["max_ms"], 600.0) + + +class TestSingleton(unittest.TestCase): + """Tests for singleton behavior.""" + + def setUp(self): + AdaptivePerformanceMonitor.reset_instance() + + def tearDown(self): + AdaptivePerformanceMonitor.reset_instance() + + def test_get_instance_returns_same_object(self): + m1 = AdaptivePerformanceMonitor.get_instance() + m2 = AdaptivePerformanceMonitor.get_instance() + self.assertIs(m1, m2) + + def test_get_instance_reads_config_timeout(self): + config = {"hooks": {"timeoutMs": 5000}} + m = AdaptivePerformanceMonitor.get_instance(config) + self.assertEqual(m.timeout_ms, 5000) + + def test_get_instance_default_without_config(self): + m = AdaptivePerformanceMonitor.get_instance() + self.assertEqual(m.timeout_ms, DEFAULT_TIMEOUT_MS) + + def test_get_monitor_convenience(self): + m = get_monitor({"hooks": {"timeoutMs": 3000}}) + self.assertEqual(m.timeout_ms, 3000) + self.assertIs(m, get_monitor()) + + def test_reset_instance(self): + m1 = AdaptivePerformanceMonitor.get_instance() + AdaptivePerformanceMonitor.reset_instance() + m2 = AdaptivePerformanceMonitor.get_instance() + self.assertIsNot(m1, m2) + + +class TestNoticeFormatting(unittest.TestCase): + """Tests for notification messages.""" + + def test_lightweight_notice_english(self): + notice = format_lightweight_notice("en") + self.assertIn("lightweight mode", notice) + self.assertIn("CodingBuddy", notice) + + def test_lightweight_notice_korean(self): + notice = format_lightweight_notice("ko") + self.assertIn("경량 모드", notice) + + def test_lightweight_notice_unknown_lang_falls_back(self): + notice = format_lightweight_notice("xx") + self.assertIn("lightweight mode", notice) + + def test_normal_notice_english(self): + notice = format_normal_notice("en") + self.assertIn("normal mode", notice) + + def test_normal_notice_korean(self): + notice = format_normal_notice("ko") + self.assertIn("일반 모드", notice) + + def test_all_supported_languages(self): + for lang in ("en", "ko", "ja", "zh", "es"): + lw = format_lightweight_notice(lang) + nm = format_normal_notice(lang) + self.assertTrue(len(lw) > 0, f"Empty lightweight notice for {lang}") + self.assertTrue(len(nm) > 0, f"Empty normal notice for {lang}") + + +class TestOperationSets(unittest.TestCase): + """Tests for operation classification.""" + + def test_heavy_and_core_are_disjoint(self): + overlap = HEAVY_OPERATIONS & CORE_OPERATIONS + self.assertEqual(len(overlap), 0, f"Overlap: {overlap}") + + def test_expected_heavy_operations(self): + self.assertIn("conflict_prediction", HEAVY_OPERATIONS) + self.assertIn("full_project_scan", HEAVY_OPERATIONS) + self.assertIn("agent_recommendations", HEAVY_OPERATIONS) + self.assertIn("file_watcher", HEAVY_OPERATIONS) + + def test_expected_core_operations(self): + self.assertIn("mode_detection", CORE_OPERATIONS) + self.assertIn("basic_validation", CORE_OPERATIONS) + + +if __name__ == "__main__": + unittest.main()