From cc3b079006cfed137ec14830490fd1ed2d1b8c41 Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 08:24:16 -0700 Subject: [PATCH 1/4] feat(config): add ConfigFileWatcher and ConfigReloadState (#402) Polling-based file watcher for hot-reloading CODEFRAME.md/AGENTS.md/ CLAUDE.md during batch execution. Thread-safe shared state with last-known-good fallback on parse failure. Adds CONFIG_RELOADED and CONFIG_RELOAD_FAILED event types. --- codeframe/core/config_watcher.py | 259 ++++++++++++++++++++++++++++++ codeframe/core/events.py | 4 + tests/core/test_config_watcher.py | 249 ++++++++++++++++++++++++++++ 3 files changed, 512 insertions(+) create mode 100644 codeframe/core/config_watcher.py create mode 100644 tests/core/test_config_watcher.py diff --git a/codeframe/core/config_watcher.py b/codeframe/core/config_watcher.py new file mode 100644 index 00000000..42a0bdad --- /dev/null +++ b/codeframe/core/config_watcher.py @@ -0,0 +1,259 @@ +"""Dynamic configuration reload for batch execution. + +Watches CODEFRAME.md, AGENTS.md, and CLAUDE.md for changes during +long-running batch executions and hot-reloads configuration without +restarting. Inspired by Symphony's dynamic WORKFLOW.md reload pattern. + +Components: +- ConfigReloadState: Thread-safe shared state holding current config +- ConfigFileWatcher: Daemon thread polling file mtimes for changes + +What reloads dynamically: +- Agent prompt / system prompt supplement (always_do, ask_first, never_do, raw_content) +- Tool preferences (tooling, commands) +- Code style preferences (code_style) + +What requires restart (not reloaded): +- Engine selection (react/plan) +- Workspace path +- Tech stack + +This module is headless - no FastAPI or HTTP dependencies. +""" + +import logging +import threading +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from codeframe.core.agents_config import AgentPreferences, load_preferences + +logger = logging.getLogger(__name__) + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +# Config files to watch, in priority order +_CONFIG_FILES = ("CODEFRAME.md", "AGENTS.md", "CLAUDE.md") + + +class ConfigReloadState: + """Thread-safe shared state for dynamic config reload. + + Holds the last-known-good configuration and tracks reload history. + Accessed by the watcher thread (writes) and the conductor (reads). + """ + + def __init__(self, initial_prefs: AgentPreferences) -> None: + self._prefs = initial_prefs + self._lock = threading.Lock() + self.last_reload_at: Optional[datetime] = None + self.last_error: Optional[str] = None + self.reload_timestamps: list[str] = [] + + def get_prefs(self) -> AgentPreferences: + with self._lock: + return self._prefs + + def apply_reload(self, new_prefs: AgentPreferences, timestamp: datetime) -> None: + with self._lock: + self._prefs = new_prefs + self.last_reload_at = timestamp + self.last_error = None + self.reload_timestamps.append(timestamp.isoformat()) + + def record_error(self, msg: str) -> None: + with self._lock: + self.last_error = msg + + def has_reloaded_since(self, since: datetime) -> bool: + with self._lock: + return self.last_reload_at is not None and self.last_reload_at > since + + +class ConfigFileWatcher: + """Polling-based file watcher for config hot-reload. + + Monitors CODEFRAME.md, AGENTS.md, and CLAUDE.md for mtime changes. + On change, re-parses and validates the config, updating shared state. + + Follows the same daemon-thread pattern as StallMonitor. + + Usage:: + + watcher = ConfigFileWatcher(workspace_path, poll_interval_s=2.0) + state = watcher.start(initial_prefs) + try: + # ... batch execution ... + # Check state.get_prefs() between tasks + finally: + watcher.stop() + """ + + def __init__( + self, + workspace_path: Path, + poll_interval_s: float = 2.0, + ) -> None: + self._workspace_path = workspace_path + self._poll_interval_s = poll_interval_s + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + self._state: Optional[ConfigReloadState] = None + self._watched_mtimes: dict[Path, float] = {} + + def start(self, initial_prefs: AgentPreferences) -> ConfigReloadState: + """Begin watching config files. + + Args: + initial_prefs: The current agent preferences to use as baseline. + + Returns: + Shared ConfigReloadState that the conductor can query. + """ + self.stop() + + self._state = ConfigReloadState(initial_prefs) + self._stop_event.clear() + + # Snapshot current mtimes + self._watched_mtimes = {} + for name in _CONFIG_FILES: + path = self._workspace_path / name + if path.is_file(): + self._watched_mtimes[path] = path.stat().st_mtime + + self._thread = threading.Thread( + target=self._poll_loop, + name="config-file-watcher", + daemon=True, + ) + self._thread.start() + return self._state + + def stop(self) -> None: + """Stop the watcher thread.""" + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=2.0) + self._thread = None + + def _poll_loop(self) -> None: + """Daemon thread: check file mtimes at regular intervals.""" + while not self._stop_event.wait(timeout=self._poll_interval_s): + self._check_for_changes() + + def _check_for_changes(self) -> None: + """Compare current mtimes against snapshots.""" + changed = False + + for name in _CONFIG_FILES: + path = self._workspace_path / name + if not path.is_file(): + # File might have been created since start + if path not in self._watched_mtimes: + continue + # File was deleted — skip + continue + + try: + current_mtime = path.stat().st_mtime + except OSError: + continue + + prev_mtime = self._watched_mtimes.get(path, 0.0) + if current_mtime > prev_mtime: + self._watched_mtimes[path] = current_mtime + changed = True + + # Also detect newly created config files + for name in _CONFIG_FILES: + path = self._workspace_path / name + if path.is_file() and path not in self._watched_mtimes: + self._watched_mtimes[path] = path.stat().st_mtime + changed = True + + if changed: + self._reload() + + def _reload(self) -> None: + """Re-parse config files and update shared state.""" + try: + new_prefs = load_preferences(self._workspace_path) + + if not new_prefs.has_preferences() and not new_prefs.raw_content: + msg = "Reloaded config is empty — retaining previous config" + logger.warning(msg) + if self._state: + self._state.record_error(msg) + self._emit_reload_failed(msg) + return + + now = _utc_now() + if self._state: + old_prefs = self._state.get_prefs() + self._state.apply_reload(new_prefs, now) + diff = self._build_diff_summary(old_prefs, new_prefs) + logger.info("Config reloaded: %s", diff) + self._emit_reload_success(diff) + + except Exception as e: + msg = f"Config reload failed: {e}" + logger.warning(msg) + if self._state: + self._state.record_error(msg) + self._emit_reload_failed(msg) + + def _build_diff_summary( + self, old: AgentPreferences, new: AgentPreferences + ) -> str: + """Build a human-readable summary of what changed.""" + changes = [] + if old.always_do != new.always_do: + changes.append(f"always_do: {len(old.always_do)} -> {len(new.always_do)} items") + if old.never_do != new.never_do: + changes.append(f"never_do: {len(old.never_do)} -> {len(new.never_do)} items") + if old.ask_first != new.ask_first: + changes.append(f"ask_first: {len(old.ask_first)} -> {len(new.ask_first)} items") + if old.tooling != new.tooling: + changes.append("tooling changed") + if old.commands != new.commands: + changes.append("commands changed") + if old.code_style != new.code_style: + changes.append("code_style changed") + return "; ".join(changes) if changes else "content changed" + + def _emit_reload_success(self, diff_summary: str) -> None: + """Emit CONFIG_RELOADED event (best-effort).""" + try: + from codeframe.core import events + from codeframe.core.workspace import get_workspace + + workspace = get_workspace(self._workspace_path) + events.emit_for_workspace( + workspace, + events.EventType.CONFIG_RELOADED, + {"diff_summary": diff_summary}, + print_event=True, + ) + except Exception: + logger.debug("Failed to emit CONFIG_RELOADED event", exc_info=True) + + def _emit_reload_failed(self, error_msg: str) -> None: + """Emit CONFIG_RELOAD_FAILED event (best-effort).""" + try: + from codeframe.core import events + from codeframe.core.workspace import get_workspace + + workspace = get_workspace(self._workspace_path) + events.emit_for_workspace( + workspace, + events.EventType.CONFIG_RELOAD_FAILED, + {"error": error_msg}, + print_event=True, + ) + except Exception: + logger.debug("Failed to emit CONFIG_RELOAD_FAILED event", exc_info=True) diff --git a/codeframe/core/events.py b/codeframe/core/events.py index 36941d2e..260fe370 100644 --- a/codeframe/core/events.py +++ b/codeframe/core/events.py @@ -114,6 +114,10 @@ class EventType: RECONCILIATION_TASK_REQUEUED = "RECONCILIATION_TASK_REQUEUED" RECONCILIATION_ERROR = "RECONCILIATION_ERROR" + # Config reload events + CONFIG_RELOADED = "CONFIG_RELOADED" + CONFIG_RELOAD_FAILED = "CONFIG_RELOAD_FAILED" + @dataclass class Event: diff --git a/tests/core/test_config_watcher.py b/tests/core/test_config_watcher.py new file mode 100644 index 00000000..9aad64c9 --- /dev/null +++ b/tests/core/test_config_watcher.py @@ -0,0 +1,249 @@ +"""Tests for dynamic configuration reload during batch execution. + +Tests cover: +- ConfigReloadState thread-safe shared state +- ConfigFileWatcher mtime-based file watching +- Validation of reloaded config (valid vs invalid) +- Event emission on reload +""" + +import os +import threading +import time +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from codeframe.core.agents_config import AgentPreferences +from codeframe.core.workspace import create_or_load_workspace + +pytestmark = pytest.mark.v2 + + +@pytest.fixture +def workspace(tmp_path: Path): + repo_path = tmp_path / "test_repo" + repo_path.mkdir() + return create_or_load_workspace(repo_path) + + +@pytest.fixture +def agents_md_path(workspace): + """Create an AGENTS.md file in the workspace.""" + path = workspace.repo_path / "AGENTS.md" + path.write_text( + "# Always Do\n" + "- Run tests after changes\n" + "\n" + "# Never Do\n" + "- Delete production data\n" + ) + return path + + +@pytest.fixture +def initial_prefs(): + return AgentPreferences( + always_do=["Run tests after changes"], + never_do=["Delete production data"], + raw_content="initial config", + ) + + +# ============================================================================= +# ConfigReloadState tests +# ============================================================================= + + +class TestConfigReloadState: + """Tests for thread-safe shared configuration state.""" + + def test_initial_state(self, initial_prefs): + from codeframe.core.config_watcher import ConfigReloadState + + state = ConfigReloadState(initial_prefs) + prefs = state.get_prefs() + assert prefs.always_do == ["Run tests after changes"] + assert state.last_reload_at is None + assert state.last_error is None + + def test_apply_reload_updates_prefs(self, initial_prefs): + from codeframe.core.config_watcher import ConfigReloadState + + state = ConfigReloadState(initial_prefs) + new_prefs = AgentPreferences( + always_do=["Updated action"], + raw_content="updated config", + ) + now = datetime.now(timezone.utc) + state.apply_reload(new_prefs, now) + + assert state.get_prefs().always_do == ["Updated action"] + assert state.last_reload_at == now + assert state.last_error is None + + def test_record_error_preserves_last_good(self, initial_prefs): + from codeframe.core.config_watcher import ConfigReloadState + + state = ConfigReloadState(initial_prefs) + state.record_error("Parse failed: invalid YAML") + + assert state.get_prefs().always_do == ["Run tests after changes"] + assert state.last_error == "Parse failed: invalid YAML" + + def test_thread_safe_access(self, initial_prefs): + from codeframe.core.config_watcher import ConfigReloadState + + state = ConfigReloadState(initial_prefs) + errors = [] + + def reader(): + for _ in range(100): + prefs = state.get_prefs() + if prefs is None: + errors.append("got None") + + def writer(): + for i in range(100): + new_prefs = AgentPreferences( + always_do=[f"action-{i}"], + raw_content=f"config-{i}", + ) + state.apply_reload(new_prefs, datetime.now(timezone.utc)) + + threads = [threading.Thread(target=reader) for _ in range(3)] + threads.append(threading.Thread(target=writer)) + for t in threads: + t.start() + for t in threads: + t.join() + + assert errors == [] + assert state.get_prefs() is not None + + def test_has_reloaded_since(self, initial_prefs): + from codeframe.core.config_watcher import ConfigReloadState + + state = ConfigReloadState(initial_prefs) + before = datetime.now(timezone.utc) + + assert not state.has_reloaded_since(before) + + new_prefs = AgentPreferences(raw_content="new") + state.apply_reload(new_prefs, datetime.now(timezone.utc)) + + assert state.has_reloaded_since(before) + + +# ============================================================================= +# ConfigFileWatcher tests +# ============================================================================= + + +class TestConfigFileWatcher: + """Tests for file-watching daemon thread.""" + + def test_start_and_stop(self, workspace, agents_md_path, initial_prefs): + from codeframe.core.config_watcher import ConfigFileWatcher + + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + assert state is not None + assert state.get_prefs().always_do == ["Run tests after changes"] + watcher.stop() + + def test_detects_file_change(self, workspace, agents_md_path, initial_prefs): + from codeframe.core.config_watcher import ConfigFileWatcher + + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + + try: + # Modify the file + time.sleep(0.3) + agents_md_path.write_text( + "# Always Do\n" + "- Updated action item\n" + "\n" + "# Never Do\n" + "- Updated forbidden action\n" + ) + # Bump mtime to ensure detection + future_time = time.time() + 1 + os.utime(agents_md_path, (future_time, future_time)) + + # Wait for detection + time.sleep(0.5) + + assert state.last_reload_at is not None + prefs = state.get_prefs() + assert "Updated action item" in prefs.always_do + finally: + watcher.stop() + + def test_invalid_config_retains_previous(self, workspace, agents_md_path, initial_prefs): + from unittest.mock import patch + + from codeframe.core.config_watcher import ConfigFileWatcher + + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + + try: + time.sleep(0.3) + # Make load_preferences raise to simulate parse failure + with patch( + "codeframe.core.config_watcher.load_preferences", + side_effect=ValueError("Corrupt config file"), + ): + # Touch the file to trigger mtime change + agents_md_path.write_text("corrupt data") + future_time = time.time() + 1 + os.utime(agents_md_path, (future_time, future_time)) + + time.sleep(0.5) + + # Previous config should be retained + prefs = state.get_prefs() + assert prefs.always_do == ["Run tests after changes"] + assert state.last_error is not None + assert "Corrupt config" in state.last_error + finally: + watcher.stop() + + def test_watches_multiple_files(self, workspace, initial_prefs): + from codeframe.core.config_watcher import ConfigFileWatcher + + # Create both AGENTS.md and CODEFRAME.md + agents_path = workspace.repo_path / "AGENTS.md" + agents_path.write_text("# Always Do\n- Original\n") + + codeframe_path = workspace.repo_path / "CODEFRAME.md" + codeframe_path.write_text("---\nengine: react\n---\n# Always Do\n- From codeframe\n") + + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + + try: + time.sleep(0.3) + # Modify CODEFRAME.md + codeframe_path.write_text( + "---\nengine: react\n---\n# Always Do\n- Updated from codeframe\n" + ) + future_time = time.time() + 1 + os.utime(codeframe_path, (future_time, future_time)) + + time.sleep(0.5) + + assert state.last_reload_at is not None + finally: + watcher.stop() + + def test_stop_is_idempotent(self, workspace, agents_md_path, initial_prefs): + from codeframe.core.config_watcher import ConfigFileWatcher + + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + watcher.start(initial_prefs) + watcher.stop() + watcher.stop() # Should not raise From 08f4eb1ef7564a0030567abe1c394dc394ac60dc Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 08:30:35 -0700 Subject: [PATCH 2/4] feat(config): integrate config watcher into batch execution (#402) Start/stop ConfigFileWatcher in _execute_serial and _execute_parallel. Check for config reloads between task dispatches. Store reload timestamps in batch results. Display in batch status output. --- codeframe/cli/app.py | 12 + codeframe/core/conductor.py | 518 ++++++++++++++++------------ tests/cli/test_config_reload_cli.py | 109 ++++++ 3 files changed, 418 insertions(+), 221 deletions(-) create mode 100644 tests/cli/test_config_reload_cli.py diff --git a/codeframe/cli/app.py b/codeframe/cli/app.py index c1e42934..dc2725dc 100644 --- a/codeframe/cli/app.py +++ b/codeframe/cli/app.py @@ -3796,6 +3796,18 @@ def batch_status( console.print(f" {icon} {tid[:8]} - {title}") + # Show config reloads if any occurred during batch + config_reloads = batch.results.get("__config_reloads__") + if config_reloads: + console.print("\n [bold]Config Reloads:[/bold]") + for ts in config_reloads: + try: + from datetime import datetime as _dt + dt = _dt.fromisoformat(ts) + console.print(f" Config reloaded at {dt.strftime('%H:%M:%S')}") + except (ValueError, TypeError): + console.print(f" Config reloaded at {ts}") + else: # List recent batches batches = conductor.list_batches(workspace, limit=10) diff --git a/codeframe/core/conductor.py b/codeframe/core/conductor.py index 19ea6d43..f2b45d9b 100644 --- a/codeframe/core/conductor.py +++ b/codeframe/core/conductor.py @@ -19,7 +19,10 @@ from datetime import datetime, timezone from pathlib import Path from enum import Enum -from typing import Callable, Optional +from typing import TYPE_CHECKING, Callable, Optional + +if TYPE_CHECKING: + from codeframe.core.config_watcher import ConfigReloadState from codeframe.core.workspace import Workspace, get_db_connection from codeframe.core import events, tasks, blockers @@ -1302,6 +1305,33 @@ def _run_batch_level_validation(workspace: Workspace, batch: BatchRun) -> tuple[ return False, failure_summary +def _apply_pending_config_reload( + batch: BatchRun, + workspace: Workspace, + reload_state: "ConfigReloadState", + last_seen_reload: datetime, +) -> datetime: + """Check for config reloads and record them in batch results. + + Args: + batch: Current batch run. + workspace: Target workspace. + reload_state: Shared reload state from ConfigFileWatcher. + last_seen_reload: Timestamp of the last check. + + Returns: + Updated last_seen_reload timestamp. + """ + if reload_state.has_reloaded_since(last_seen_reload): + now = datetime.now(timezone.utc) + reloads = batch.results.setdefault("__config_reloads__", []) + reloads.append(now.isoformat()) + _save_batch(workspace, batch) + print(f" [config] Configuration reloaded at {now.strftime('%H:%M:%S')}") + return now + return last_seen_reload + + def _execute_serial( workspace: Workspace, batch: BatchRun, @@ -1317,148 +1347,171 @@ def _execute_serial( interval = env_config.reconciliation_interval_seconds if env_config else 30 reconcile_stop = _start_reconciliation_thread(workspace, batch, interval_seconds=interval) + # Start config file watcher (optional — failure must not break batch) + config_watcher = None + reload_state = None + _last_seen_reload = datetime.now(timezone.utc) + try: + from codeframe.core.config_watcher import ConfigFileWatcher + from codeframe.core.agents_config import load_preferences + + config_watcher = ConfigFileWatcher(Path(workspace.repo_path)) + reload_state = config_watcher.start(load_preferences(Path(workspace.repo_path))) + except Exception: + logger.debug("Config watcher failed to start in serial execution", exc_info=True) + completed_count = 0 failed_count = 0 blocked_count = 0 - for i, task_id in enumerate(batch.task_ids): - # Check if batch was cancelled - current_batch = get_batch(workspace, batch.id) - if current_batch and current_batch.status == BatchStatus.CANCELLED: - break - - # Emit task queued event - events.emit_for_workspace( - workspace, - events.EventType.BATCH_TASK_QUEUED, - {"batch_id": batch.id, "task_id": task_id, "position": i + 1}, - print_event=True, - ) - - # Get task info for display - task = tasks.get(workspace, task_id) - task_title = task.title if task else task_id - - print(f"\n[{i + 1}/{len(batch.task_ids)}] Starting task {task_id}: {task_title}") - - # Emit task started event - events.emit_for_workspace( - workspace, - events.EventType.BATCH_TASK_STARTED, - {"batch_id": batch.id, "task_id": task_id}, - print_event=True, - ) - - if on_event: - on_event("batch_task_started", {"task_id": task_id, "position": i + 1}) - - # Execute task via subprocess - result_status = _execute_task_subprocess(workspace, task_id, batch.id, engine=batch.engine, stall_timeout_s=batch.stall_timeout_s, stall_action=batch.stall_action) - - # If task is BLOCKED, try supervisor resolution - if result_status == RunStatus.BLOCKED.value: - supervisor = get_supervisor(workspace) - if supervisor.try_resolve_blocked_task(task_id): - # Supervisor resolved the blocker - retry the task - print(" [Supervisor] Retrying task after auto-resolution...") - result_status = _execute_task_subprocess(workspace, task_id, batch.id, engine=batch.engine, stall_timeout_s=batch.stall_timeout_s, stall_action=batch.stall_action) + try: + for i, task_id in enumerate(batch.task_ids): + # Check if batch was cancelled + current_batch = get_batch(workspace, batch.id) + if current_batch and current_batch.status == BatchStatus.CANCELLED: + break - # Record result - batch.results[task_id] = result_status - _save_batch(workspace, batch) + # Check for config reloads between tasks + if reload_state is not None: + _last_seen_reload = _apply_pending_config_reload( + batch, workspace, reload_state, _last_seen_reload + ) - # Emit appropriate event based on result - if result_status == RunStatus.COMPLETED.value: - completed_count += 1 + # Emit task queued event events.emit_for_workspace( workspace, - events.EventType.BATCH_TASK_COMPLETED, - {"batch_id": batch.id, "task_id": task_id}, + events.EventType.BATCH_TASK_QUEUED, + {"batch_id": batch.id, "task_id": task_id, "position": i + 1}, print_event=True, ) - print(" ✓ Completed") - elif result_status == RunStatus.BLOCKED.value: - blocked_count += 1 + + # Get task info for display + task = tasks.get(workspace, task_id) + task_title = task.title if task else task_id + + print(f"\n[{i + 1}/{len(batch.task_ids)}] Starting task {task_id}: {task_title}") + + # Emit task started event events.emit_for_workspace( workspace, - events.EventType.BATCH_TASK_BLOCKED, + events.EventType.BATCH_TASK_STARTED, {"batch_id": batch.id, "task_id": task_id}, print_event=True, ) - print(" ⊘ Blocked (requires human input)") - else: - failed_count += 1 - events.emit_for_workspace( - workspace, - events.EventType.BATCH_TASK_FAILED, - {"batch_id": batch.id, "task_id": task_id, "status": result_status}, - print_event=True, - ) - print(f" ✗ Failed: {result_status}") - # Check on_failure behavior - if batch.on_failure == OnFailure.STOP: - print("\nStopping batch due to --on-failure=stop") - break + if on_event: + on_event("batch_task_started", {"task_id": task_id, "position": i + 1}) - if on_event: - on_event("batch_task_completed", {"task_id": task_id, "status": result_status}) + # Execute task via subprocess + result_status = _execute_task_subprocess(workspace, task_id, batch.id, engine=batch.engine, stall_timeout_s=batch.stall_timeout_s, stall_action=batch.stall_action) - # Determine final batch status - total = len(batch.task_ids) - completed_count + failed_count + blocked_count + # If task is BLOCKED, try supervisor resolution + if result_status == RunStatus.BLOCKED.value: + supervisor = get_supervisor(workspace) + if supervisor.try_resolve_blocked_task(task_id): + # Supervisor resolved the blocker - retry the task + print(" [Supervisor] Retrying task after auto-resolution...") + result_status = _execute_task_subprocess(workspace, task_id, batch.id, engine=batch.engine, stall_timeout_s=batch.stall_timeout_s, stall_action=batch.stall_action) - if completed_count == total: - batch.status = BatchStatus.COMPLETED - event_type = events.EventType.BATCH_COMPLETED + # Record result + batch.results[task_id] = result_status + _save_batch(workspace, batch) - # Run batch-level validation (full gate sweep) - validation_passed, validation_error = _run_batch_level_validation(workspace, batch) + # Emit appropriate event based on result + if result_status == RunStatus.COMPLETED.value: + completed_count += 1 + events.emit_for_workspace( + workspace, + events.EventType.BATCH_TASK_COMPLETED, + {"batch_id": batch.id, "task_id": task_id}, + print_event=True, + ) + print(" ✓ Completed") + elif result_status == RunStatus.BLOCKED.value: + blocked_count += 1 + events.emit_for_workspace( + workspace, + events.EventType.BATCH_TASK_BLOCKED, + {"batch_id": batch.id, "task_id": task_id}, + print_event=True, + ) + print(" ⊘ Blocked (requires human input)") + else: + failed_count += 1 + events.emit_for_workspace( + workspace, + events.EventType.BATCH_TASK_FAILED, + {"batch_id": batch.id, "task_id": task_id, "status": result_status}, + print_event=True, + ) + print(f" ✗ Failed: {result_status}") - if not validation_passed: - # Gates failed - change status to PARTIAL (tasks done, integration broken) + # Check on_failure behavior + if batch.on_failure == OnFailure.STOP: + print("\nStopping batch due to --on-failure=stop") + break + + if on_event: + on_event("batch_task_completed", {"task_id": task_id, "status": result_status}) + + # Determine final batch status + total = len(batch.task_ids) + completed_count + failed_count + blocked_count + + if completed_count == total: + batch.status = BatchStatus.COMPLETED + event_type = events.EventType.BATCH_COMPLETED + + # Run batch-level validation (full gate sweep) + validation_passed, validation_error = _run_batch_level_validation(workspace, batch) + + if not validation_passed: + # Gates failed - change status to PARTIAL (tasks done, integration broken) + batch.status = BatchStatus.PARTIAL + event_type = events.EventType.BATCH_PARTIAL + print("\n⚠️ Batch marked PARTIAL due to failed batch-level gates") + print(f"Validation error: {validation_error}") + + elif completed_count == 0 and (failed_count > 0 or blocked_count > 0): + batch.status = BatchStatus.FAILED + event_type = events.EventType.BATCH_FAILED + elif completed_count > 0: batch.status = BatchStatus.PARTIAL event_type = events.EventType.BATCH_PARTIAL - print("\n⚠️ Batch marked PARTIAL due to failed batch-level gates") - print(f"Validation error: {validation_error}") - - elif completed_count == 0 and (failed_count > 0 or blocked_count > 0): - batch.status = BatchStatus.FAILED - event_type = events.EventType.BATCH_FAILED - elif completed_count > 0: - batch.status = BatchStatus.PARTIAL - event_type = events.EventType.BATCH_PARTIAL - else: - # Nothing executed (e.g., cancelled before start) - batch.status = BatchStatus.CANCELLED - event_type = events.EventType.BATCH_CANCELLED + else: + # Nothing executed (e.g., cancelled before start) + batch.status = BatchStatus.CANCELLED + event_type = events.EventType.BATCH_CANCELLED - batch.completed_at = _utc_now() - _save_batch(workspace, batch) + batch.completed_at = _utc_now() + _save_batch(workspace, batch) - # Emit batch completion event - events.emit_for_workspace( - workspace, - event_type, - { - "batch_id": batch.id, - "completed": completed_count, - "failed": failed_count, - "blocked": blocked_count, - "total": total, - }, - print_event=True, - ) + # Emit batch completion event + events.emit_for_workspace( + workspace, + event_type, + { + "batch_id": batch.id, + "completed": completed_count, + "failed": failed_count, + "blocked": blocked_count, + "total": total, + }, + print_event=True, + ) - # Stop reconciliation thread - reconcile_stop.set() + # Stop reconciliation thread + reconcile_stop.set() - # Print summary - print(f"\nBatch {batch.status.value.lower()}: {completed_count}/{total} tasks completed") - if failed_count > 0: - print(f" Failed: {failed_count}") - if blocked_count > 0: - print(f" Blocked: {blocked_count}") + # Print summary + print(f"\nBatch {batch.status.value.lower()}: {completed_count}/{total} tasks completed") + if failed_count > 0: + print(f" Failed: {failed_count}") + if blocked_count > 0: + print(f" Blocked: {blocked_count}") + finally: + if config_watcher is not None: + config_watcher.stop() def _execute_parallel( @@ -1494,132 +1547,155 @@ def _execute_parallel( _interval_p = _env_config_p.reconciliation_interval_seconds if _env_config_p else 30 _reconcile_stop_p = _start_reconciliation_thread(workspace, batch, interval_seconds=_interval_p) + # Start config file watcher (optional — failure must not break batch) + config_watcher_p = None + reload_state_p = None + _last_seen_reload_p = datetime.now(timezone.utc) + try: + from codeframe.core.config_watcher import ConfigFileWatcher as _CFW + from codeframe.core.agents_config import load_preferences as _load_prefs + + config_watcher_p = _CFW(Path(workspace.repo_path)) + reload_state_p = config_watcher_p.start(_load_prefs(Path(workspace.repo_path))) + except Exception: + logger.debug("Config watcher failed to start in parallel execution", exc_info=True) + completed_count = 0 failed_count = 0 blocked_count = 0 task_index = 0 # Global task counter for progress display - for group_idx, group in enumerate(plan.groups): - # Check if batch was cancelled - current_batch = get_batch(workspace, batch.id) - if current_batch and current_batch.status == BatchStatus.CANCELLED: - break - - # Check if any previous failure should stop execution - if batch.on_failure == OnFailure.STOP and failed_count > 0: - print("\nStopping batch due to --on-failure=stop") - break - - group_size = len(group) - print(f"\n{'─'*60}") - print(f"Group {group_idx + 1}/{plan.num_groups}: {group_size} task(s)") + try: + for group_idx, group in enumerate(plan.groups): + # Check if batch was cancelled + current_batch = get_batch(workspace, batch.id) + if current_batch and current_batch.status == BatchStatus.CANCELLED: + break - if group_size == 1: - # Single task - run directly - task_id = group[0] - task_index += 1 - result = _execute_single_task( - workspace, batch, task_id, task_index, len(batch.task_ids), on_event - ) - if result == RunStatus.COMPLETED.value: - completed_count += 1 - elif result == RunStatus.BLOCKED.value: - blocked_count += 1 - else: - failed_count += 1 - else: - # Multiple tasks - run in parallel (use per-status limits if configured) - if batch.concurrency.by_status: - group_statuses = [] - for tid in group: - t = tasks.get(workspace, tid) - if t: - group_statuses.append(t.status.value) - effective_workers = batch.concurrency.effective_workers( - statuses=group_statuses, group_size=group_size, global_running=0, + # Check for config reloads between groups + if reload_state_p is not None: + _last_seen_reload_p = _apply_pending_config_reload( + batch, workspace, reload_state_p, _last_seen_reload_p ) - else: - effective_workers = min(group_size, batch.max_parallel) - print(f"Running {group_size} tasks with {effective_workers} workers") - # Execute group in parallel - results = _execute_group_parallel( - workspace, batch, group, task_index, len(batch.task_ids), - effective_workers, on_event - ) + # Check if any previous failure should stop execution + if batch.on_failure == OnFailure.STOP and failed_count > 0: + print("\nStopping batch due to --on-failure=stop") + break + + group_size = len(group) + print(f"\n{'─'*60}") + print(f"Group {group_idx + 1}/{plan.num_groups}: {group_size} task(s)") - # Process results - for task_id, result_status in results.items(): + if group_size == 1: + # Single task - run directly + task_id = group[0] task_index += 1 - if result_status == RunStatus.COMPLETED.value: + result = _execute_single_task( + workspace, batch, task_id, task_index, len(batch.task_ids), on_event + ) + if result == RunStatus.COMPLETED.value: completed_count += 1 - elif result_status == RunStatus.BLOCKED.value: + elif result == RunStatus.BLOCKED.value: blocked_count += 1 else: failed_count += 1 + else: + # Multiple tasks - run in parallel (use per-status limits if configured) + if batch.concurrency.by_status: + group_statuses = [] + for tid in group: + t = tasks.get(workspace, tid) + if t: + group_statuses.append(t.status.value) + effective_workers = batch.concurrency.effective_workers( + statuses=group_statuses, group_size=group_size, global_running=0, + ) + else: + effective_workers = min(group_size, batch.max_parallel) + print(f"Running {group_size} tasks with {effective_workers} workers") - # Check stop on failure within parallel group - if batch.on_failure == OnFailure.STOP and failed_count > 0: - # Can't stop mid-group, but will stop after group completes - pass - - # Determine final batch status - total = len(batch.task_ids) - completed_count + failed_count + blocked_count - - if completed_count == total: - batch.status = BatchStatus.COMPLETED - event_type = events.EventType.BATCH_COMPLETED - - # Run batch-level validation (full gate sweep) - validation_passed, validation_error = _run_batch_level_validation(workspace, batch) + # Execute group in parallel + results = _execute_group_parallel( + workspace, batch, group, task_index, len(batch.task_ids), + effective_workers, on_event + ) - if not validation_passed: - # Gates failed - change status to PARTIAL (tasks done, integration broken) + # Process results + for task_id, result_status in results.items(): + task_index += 1 + if result_status == RunStatus.COMPLETED.value: + completed_count += 1 + elif result_status == RunStatus.BLOCKED.value: + blocked_count += 1 + else: + failed_count += 1 + + # Check stop on failure within parallel group + if batch.on_failure == OnFailure.STOP and failed_count > 0: + # Can't stop mid-group, but will stop after group completes + pass + + # Determine final batch status + total = len(batch.task_ids) + completed_count + failed_count + blocked_count + + if completed_count == total: + batch.status = BatchStatus.COMPLETED + event_type = events.EventType.BATCH_COMPLETED + + # Run batch-level validation (full gate sweep) + validation_passed, validation_error = _run_batch_level_validation(workspace, batch) + + if not validation_passed: + # Gates failed - change status to PARTIAL (tasks done, integration broken) + batch.status = BatchStatus.PARTIAL + event_type = events.EventType.BATCH_PARTIAL + print("\n⚠️ Batch marked PARTIAL due to failed batch-level gates") + print(f"Validation error: {validation_error}") + + elif completed_count == 0 and (failed_count > 0 or blocked_count > 0): + batch.status = BatchStatus.FAILED + event_type = events.EventType.BATCH_FAILED + elif completed_count > 0: batch.status = BatchStatus.PARTIAL event_type = events.EventType.BATCH_PARTIAL - print("\n⚠️ Batch marked PARTIAL due to failed batch-level gates") - print(f"Validation error: {validation_error}") - - elif completed_count == 0 and (failed_count > 0 or blocked_count > 0): - batch.status = BatchStatus.FAILED - event_type = events.EventType.BATCH_FAILED - elif completed_count > 0: - batch.status = BatchStatus.PARTIAL - event_type = events.EventType.BATCH_PARTIAL - else: - batch.status = BatchStatus.CANCELLED - event_type = events.EventType.BATCH_CANCELLED - - batch.completed_at = _utc_now() - _save_batch(workspace, batch) + else: + batch.status = BatchStatus.CANCELLED + event_type = events.EventType.BATCH_CANCELLED - # Emit batch completion event - events.emit_for_workspace( - workspace, - event_type, - { - "batch_id": batch.id, - "completed": completed_count, - "failed": failed_count, - "blocked": blocked_count, - "total": total, - "strategy": "parallel", - "groups": plan.num_groups, - }, - print_event=True, - ) + batch.completed_at = _utc_now() + _save_batch(workspace, batch) - # Stop reconciliation thread - _reconcile_stop_p.set() + # Emit batch completion event + events.emit_for_workspace( + workspace, + event_type, + { + "batch_id": batch.id, + "completed": completed_count, + "failed": failed_count, + "blocked": blocked_count, + "total": total, + "strategy": "parallel", + "groups": plan.num_groups, + }, + print_event=True, + ) - # Print summary - print(f"\nBatch {batch.status.value.lower()}: {completed_count}/{total} tasks completed") - print(f" Execution: {plan.num_groups} groups (parallel strategy)") - if failed_count > 0: - print(f" Failed: {failed_count}") - if blocked_count > 0: - print(f" Blocked: {blocked_count}") + # Stop reconciliation thread + _reconcile_stop_p.set() + + # Print summary + print(f"\nBatch {batch.status.value.lower()}: {completed_count}/{total} tasks completed") + print(f" Execution: {plan.num_groups} groups (parallel strategy)") + if failed_count > 0: + print(f" Failed: {failed_count}") + if blocked_count > 0: + print(f" Blocked: {blocked_count}") + finally: + if config_watcher_p is not None: + config_watcher_p.stop() def _start_reconciliation_thread( diff --git a/tests/cli/test_config_reload_cli.py b/tests/cli/test_config_reload_cli.py new file mode 100644 index 00000000..9866828c --- /dev/null +++ b/tests/cli/test_config_reload_cli.py @@ -0,0 +1,109 @@ +"""Tests for config reload display in batch status CLI output. + +Verifies that config reload timestamps stored in batch results +are displayed in the batch status command output. +""" + +from datetime import datetime, timezone + +import pytest +from typer.testing import CliRunner + +from codeframe.cli.app import app +from codeframe.core import conductor +from codeframe.core.workspace import create_or_load_workspace + +pytestmark = pytest.mark.v2 + +runner = CliRunner() + + +@pytest.fixture() +def workspace_with_batch(tmp_path): + """Create a workspace with a batch that has config reload timestamps.""" + workspace = create_or_load_workspace(tmp_path) + + # Create a batch record directly via the database + batch = conductor.BatchRun( + id="test-batch-reload-001", + workspace_id=workspace.id, + task_ids=["task-1"], + status=conductor.BatchStatus.COMPLETED, + strategy="serial", + max_parallel=1, + on_failure=conductor.OnFailure.CONTINUE, + started_at=datetime(2026, 3, 17, 10, 0, 0, tzinfo=timezone.utc), + completed_at=datetime(2026, 3, 17, 10, 5, 0, tzinfo=timezone.utc), + results={ + "task-1": "COMPLETED", + "__config_reloads__": [ + "2026-03-17T10:01:30+00:00", + "2026-03-17T10:03:45+00:00", + ], + }, + ) + conductor._save_batch(workspace, batch) + return tmp_path + + +@pytest.fixture() +def workspace_with_batch_no_reloads(tmp_path): + """Create a workspace with a batch that has no config reloads.""" + workspace = create_or_load_workspace(tmp_path) + + batch = conductor.BatchRun( + id="test-batch-noreload-001", + workspace_id=workspace.id, + task_ids=["task-1"], + status=conductor.BatchStatus.COMPLETED, + strategy="serial", + max_parallel=1, + on_failure=conductor.OnFailure.CONTINUE, + started_at=datetime(2026, 3, 17, 10, 0, 0, tzinfo=timezone.utc), + completed_at=datetime(2026, 3, 17, 10, 5, 0, tzinfo=timezone.utc), + results={"task-1": "COMPLETED"}, + ) + conductor._save_batch(workspace, batch) + return tmp_path + + +class TestBatchStatusConfigReloads: + """Tests for config reload display in batch status output.""" + + def test_batch_status_shows_config_reloads(self, workspace_with_batch): + """Batch status should show config reload timestamps when present.""" + result = runner.invoke( + app, + [ + "work", "batch", "status", "test-batch-reload-001", + "-w", str(workspace_with_batch), + ], + ) + assert result.exit_code == 0 + assert "Config reloaded at 10:01:30" in result.output + assert "Config reloaded at 10:03:45" in result.output + + def test_batch_status_shows_config_reloads_header(self, workspace_with_batch): + """Batch status should show the Config Reloads section header.""" + result = runner.invoke( + app, + [ + "work", "batch", "status", "test-batch-reload-001", + "-w", str(workspace_with_batch), + ], + ) + assert result.exit_code == 0 + assert "Config Reloads" in result.output + + def test_batch_status_no_reloads_no_section(self, workspace_with_batch_no_reloads): + """Batch status should not show config reload section when none occurred.""" + result = runner.invoke( + app, + [ + "work", "batch", "status", "test-batch-noreload-001", + "-w", str(workspace_with_batch_no_reloads), + ], + ) + assert result.exit_code == 0 + assert "Config reloaded at" not in result.output + assert "Config Reloads" not in result.output From cbc757c88b244b392a6dac41b380a6e1b919e1c3 Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 08:33:25 -0700 Subject: [PATCH 3/4] test(config): add integration tests for config reload lifecycle (#402) End-to-end tests: modify config during watcher execution, verify reload detection, multiple reloads, invalid config preservation, has_reloaded_since tracking, and clean thread shutdown. --- tests/core/test_config_reload_integration.py | 191 +++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 tests/core/test_config_reload_integration.py diff --git a/tests/core/test_config_reload_integration.py b/tests/core/test_config_reload_integration.py new file mode 100644 index 00000000..1bc02687 --- /dev/null +++ b/tests/core/test_config_reload_integration.py @@ -0,0 +1,191 @@ +"""Integration tests for dynamic config reload during batch execution. + +Tests the full flow: ConfigFileWatcher detects file changes and the +conductor applies reloaded config to subsequent task dispatches. +""" + +import os +import time +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from codeframe.core.agents_config import load_preferences +from codeframe.core.config_watcher import ConfigFileWatcher +from codeframe.core.workspace import create_or_load_workspace + +pytestmark = pytest.mark.v2 + + +@pytest.fixture +def workspace(tmp_path: Path): + repo_path = tmp_path / "test_repo" + repo_path.mkdir() + return create_or_load_workspace(repo_path) + + +class TestConfigReloadLifecycle: + """End-to-end: write config → detect change → reload → verify new prefs.""" + + def test_full_reload_cycle(self, workspace): + """Simulate a batch execution where config is modified mid-run.""" + # Step 1: Create initial config + agents_path = workspace.repo_path / "AGENTS.md" + agents_path.write_text( + "# Always Do\n" + "- Run tests after changes\n" + "- Use type hints\n" + "\n" + "# Never Do\n" + "- Delete production data\n" + ) + + # Step 2: Load initial preferences + initial_prefs = load_preferences(workspace.repo_path) + assert "Run tests after changes" in initial_prefs.always_do + + # Step 3: Start watcher + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + + try: + # Step 4: Verify initial state + prefs = state.get_prefs() + assert "Run tests after changes" in prefs.always_do + assert state.last_reload_at is None + + # Step 5: Modify config (simulating operator change during batch) + time.sleep(0.3) + agents_path.write_text( + "# Always Do\n" + "- Run tests after changes\n" + "- Use type hints\n" + "- Log all API calls\n" + "\n" + "# Never Do\n" + "- Delete production data\n" + "- Modify database schema directly\n" + ) + future_time = time.time() + 1 + os.utime(agents_path, (future_time, future_time)) + + # Step 6: Wait for detection (poll_interval=0.1s, should detect within 0.5s) + time.sleep(0.5) + + # Step 7: Verify reload happened + assert state.last_reload_at is not None + new_prefs = state.get_prefs() + assert "Log all API calls" in new_prefs.always_do + assert "Modify database schema directly" in new_prefs.never_do + assert len(state.reload_timestamps) == 1 + + finally: + watcher.stop() + + def test_multiple_reloads(self, workspace): + """Config changed twice during execution → two reload events.""" + agents_path = workspace.repo_path / "AGENTS.md" + agents_path.write_text("# Always Do\n- Original\n") + + initial_prefs = load_preferences(workspace.repo_path) + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + + try: + # First change + time.sleep(0.3) + agents_path.write_text("# Always Do\n- First update\n") + os.utime(agents_path, (time.time() + 1, time.time() + 1)) + time.sleep(0.5) + + assert len(state.reload_timestamps) == 1 + assert "First update" in state.get_prefs().always_do + + # Second change + agents_path.write_text("# Always Do\n- Second update\n") + os.utime(agents_path, (time.time() + 2, time.time() + 2)) + time.sleep(0.5) + + assert len(state.reload_timestamps) == 2 + assert "Second update" in state.get_prefs().always_do + + finally: + watcher.stop() + + def test_invalid_reload_preserves_last_good(self, workspace): + """Invalid config change doesn't overwrite good config.""" + from unittest.mock import patch + + agents_path = workspace.repo_path / "AGENTS.md" + agents_path.write_text("# Always Do\n- Original good config\n") + + initial_prefs = load_preferences(workspace.repo_path) + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + + try: + time.sleep(0.3) + + # Simulate a parse failure on reload + with patch( + "codeframe.core.config_watcher.load_preferences", + side_effect=RuntimeError("Parse failed"), + ): + agents_path.write_text("corrupt") + os.utime(agents_path, (time.time() + 1, time.time() + 1)) + time.sleep(0.5) + + # Original config preserved + assert "Original good config" in state.get_prefs().always_do + assert state.last_error is not None + assert len(state.reload_timestamps) == 0 # No successful reloads + + finally: + watcher.stop() + + def test_has_reloaded_since_tracks_correctly(self, workspace): + """has_reloaded_since correctly reports reload relative to checkpoint.""" + agents_path = workspace.repo_path / "AGENTS.md" + agents_path.write_text("# Always Do\n- Initial\n") + + initial_prefs = load_preferences(workspace.repo_path) + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + state = watcher.start(initial_prefs) + + try: + checkpoint = datetime.now(timezone.utc) + assert not state.has_reloaded_since(checkpoint) + + time.sleep(0.3) + agents_path.write_text("# Always Do\n- Changed\n") + os.utime(agents_path, (time.time() + 1, time.time() + 1)) + time.sleep(0.5) + + assert state.has_reloaded_since(checkpoint) + + # New checkpoint after the reload + new_checkpoint = datetime.now(timezone.utc) + assert not state.has_reloaded_since(new_checkpoint) + + finally: + watcher.stop() + + def test_watcher_stops_cleanly(self, workspace): + """Watcher thread terminates within reasonable time on stop().""" + agents_path = workspace.repo_path / "AGENTS.md" + agents_path.write_text("# Always Do\n- Test\n") + + initial_prefs = load_preferences(workspace.repo_path) + watcher = ConfigFileWatcher(workspace.repo_path, poll_interval_s=0.1) + watcher.start(initial_prefs) + + import threading + + active_before = threading.active_count() + watcher.stop() + + # Thread should have stopped (or at most still cleaning up) + time.sleep(0.2) + active_after = threading.active_count() + assert active_after <= active_before From c501ceb331326dd74fa1763dd17e6d4bb45570a8 Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 15:30:57 -0700 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20OSError=20race,=20unbounded=20list,=20cached=20work?= =?UTF-8?q?space=20(#402)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wrap newly-created file detection in try/except OSError - Cap reload_timestamps at 100 entries - Cache workspace reference to avoid DB access on every emit - Move import outside loop in batch_status --- codeframe/cli/app.py | 3 ++- codeframe/core/config_watcher.py | 27 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/codeframe/cli/app.py b/codeframe/cli/app.py index dc2725dc..ac115766 100644 --- a/codeframe/cli/app.py +++ b/codeframe/cli/app.py @@ -3799,10 +3799,11 @@ def batch_status( # Show config reloads if any occurred during batch config_reloads = batch.results.get("__config_reloads__") if config_reloads: + from datetime import datetime as _dt + console.print("\n [bold]Config Reloads:[/bold]") for ts in config_reloads: try: - from datetime import datetime as _dt dt = _dt.fromisoformat(ts) console.print(f" Config reloaded at {dt.strftime('%H:%M:%S')}") except (ValueError, TypeError): diff --git a/codeframe/core/config_watcher.py b/codeframe/core/config_watcher.py index 42a0bdad..4254562e 100644 --- a/codeframe/core/config_watcher.py +++ b/codeframe/core/config_watcher.py @@ -64,6 +64,8 @@ def apply_reload(self, new_prefs: AgentPreferences, timestamp: datetime) -> None self.last_reload_at = timestamp self.last_error = None self.reload_timestamps.append(timestamp.isoformat()) + if len(self.reload_timestamps) > 100: + self.reload_timestamps = self.reload_timestamps[-100:] def record_error(self, msg: str) -> None: with self._lock: @@ -104,6 +106,7 @@ def __init__( self._thread: Optional[threading.Thread] = None self._state: Optional[ConfigReloadState] = None self._watched_mtimes: dict[Path, float] = {} + self._workspace_ref: Optional[object] = None # Cached workspace for event emission def start(self, initial_prefs: AgentPreferences) -> ConfigReloadState: """Begin watching config files. @@ -172,9 +175,12 @@ def _check_for_changes(self) -> None: # Also detect newly created config files for name in _CONFIG_FILES: path = self._workspace_path / name - if path.is_file() and path not in self._watched_mtimes: - self._watched_mtimes[path] = path.stat().st_mtime - changed = True + try: + if path.is_file() and path not in self._watched_mtimes: + self._watched_mtimes[path] = path.stat().st_mtime + changed = True + except OSError: + continue if changed: self._reload() @@ -226,15 +232,20 @@ def _build_diff_summary( changes.append("code_style changed") return "; ".join(changes) if changes else "content changed" + def _get_workspace(self): + """Get cached workspace reference for event emission.""" + if self._workspace_ref is None: + from codeframe.core.workspace import get_workspace + self._workspace_ref = get_workspace(self._workspace_path) + return self._workspace_ref + def _emit_reload_success(self, diff_summary: str) -> None: """Emit CONFIG_RELOADED event (best-effort).""" try: from codeframe.core import events - from codeframe.core.workspace import get_workspace - workspace = get_workspace(self._workspace_path) events.emit_for_workspace( - workspace, + self._get_workspace(), events.EventType.CONFIG_RELOADED, {"diff_summary": diff_summary}, print_event=True, @@ -246,11 +257,9 @@ def _emit_reload_failed(self, error_msg: str) -> None: """Emit CONFIG_RELOAD_FAILED event (best-effort).""" try: from codeframe.core import events - from codeframe.core.workspace import get_workspace - workspace = get_workspace(self._workspace_path) events.emit_for_workspace( - workspace, + self._get_workspace(), events.EventType.CONFIG_RELOAD_FAILED, {"error": error_msg}, print_event=True,