diff --git a/src/config.rs b/src/config.rs index 21cb95f..4373c17 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,6 +39,17 @@ pub struct Config { /// Cadence for `git gc --prune=now --aggressive` over each MEMORY_ROOT. /// 0 = disabled. pub git_gc_interval_days: u64, + /// Wall-clock cap for the `lsof +D ` probe used to detect + /// live editors. A slow filesystem (sshfs, network mount) could hang the + /// blocking lsof and wedge the whole tick; this kills it instead. + /// 0 = disabled (use the old unbounded blocking call). + pub lsof_timeout_sec: u64, + /// Run housekeepers (tmux_janitor, orphan_node, pressure) in + /// report-only mode — log what *would* be killed without actually + /// sending signals. Same intent as DRY_RUN but scoped to housekeepers + /// so the memory-manager loop can still mutate while housekeepers stay + /// silent. + pub housekeeper_dry_run: bool, pub claude_bin: String, pub authmux_bin: String, pub claude_accounts_dir: PathBuf, @@ -96,6 +107,8 @@ impl Config { history_max_lines: env_u64("HISTORY_MAX_LINES", 10_000), tick_log_ttl_days: env_u64("TICK_LOG_TTL_DAYS", 7), git_gc_interval_days: env_u64("GIT_GC_INTERVAL_DAYS", 7), + lsof_timeout_sec: env_u64("LSOF_TIMEOUT_SEC", 5), + housekeeper_dry_run: env_bool("HOUSEKEEPER_DRY_RUN", false), claude_bin: env_str("CLAUDE_BIN", "claude"), authmux_bin: env_str("AUTHMUX_BIN", "authmux"), claude_accounts_dir: env_path( diff --git a/src/ipc.rs b/src/ipc.rs index eb4138e..34ab573 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -5,7 +5,8 @@ use anyhow::{anyhow, Context, Result}; use serde::{Deserialize, Serialize}; -use std::path::Path; +use std::os::fd::AsRawFd; +use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{UnixListener, UnixStream}; @@ -34,39 +35,113 @@ pub struct TickRecord { pub exit_code: Option, } -pub fn acquire_lock(lock_file: &Path, pid_file: &Path) -> Result<()> { - if lock_file.exists() { - let pid_str = std::fs::read_to_string(lock_file).unwrap_or_default(); - if let Ok(pid) = pid_str.trim().parse::() { - // signal 0 = existence probe - if unsafe { libc_kill(pid, 0) } == 0 { - return Err(anyhow!( - "another daemon holds {} (pid={})", - lock_file.display(), - pid - )); - } - } - let _ = std::fs::remove_file(lock_file); +/// RAII guard for the daemon's exclusive process-singleton lock. +/// +/// Two-line summary of how this defeats the old race: +/// 1. The lock is acquired with `flock(LOCK_EX|LOCK_NB)` on an *fd we hold*. +/// Kernel guarantees only one fd at a time can hold the exclusive lock, +/// so two cmmd processes can't both pass startup, even if they call +/// `acquire_lock` at the same microsecond. +/// 2. On crash, the kernel closes our fd and releases the lock for us. No +/// stale-lock-file false positives, no PID-recycle ambiguity. +/// +/// On Drop the guard removes the lock_file and pid_file. The fd is dropped +/// last, which is when the kernel releases the actual lock. +#[derive(Debug)] +pub struct LockGuard { + lock_file: PathBuf, + pid_file: PathBuf, + // Held for the lifetime of the daemon. The fd's close releases the flock. + // Keep this *after* the path fields so Drop runs lock_file/pid_file + // removal before the fd close — that ordering avoids a tiny window in + // which another daemon could see the file gone and re-acquire while our + // fd is still in close(). + _fd: std::fs::File, +} + +impl Drop for LockGuard { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.lock_file); + let _ = std::fs::remove_file(&self.pid_file); + // _fd dropped here -> kernel releases the flock. + } +} + +/// Acquire the daemon singleton lock. +/// +/// Uses `flock(LOCK_EX|LOCK_NB)` on the lock file's fd. If the lock is held +/// by another live daemon, returns an Err that includes the holding PID +/// (read from the file body) for operator clarity. The flock is what makes +/// this race-free; the PID readback is purely cosmetic. +pub fn acquire_lock(lock_file: &Path, pid_file: &Path) -> Result { + if let Some(parent) = lock_file.parent() { + std::fs::create_dir_all(parent).ok(); } - let me = std::process::id(); - std::fs::write(lock_file, me.to_string()) + // Open-or-create with read-write so we can flock + truncate + write the + // current pid. We do NOT use create_new(true) because that prevents + // re-acquiring after a clean shutdown left the file behind (intentional — + // the file is a marker, the flock is the actual lock). + let file = std::fs::OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(false) + .open(lock_file) + .with_context(|| format!("open {}", lock_file.display()))?; + + let fd = file.as_raw_fd(); + // Non-blocking exclusive flock. If anyone else holds it, EWOULDBLOCK. + let rc = unsafe { libc_flock(fd, LOCK_EX | LOCK_NB) }; + if rc != 0 { + // Best-effort read of the previous holder's PID for the error message. + // This is purely informational — flock already told us the truth. + let pid_hint = std::fs::read_to_string(lock_file) + .ok() + .and_then(|s| s.trim().parse::().ok()); + return Err(match pid_hint { + Some(p) => anyhow!( + "another daemon holds {} (pid={p})", + lock_file.display(), + ), + None => anyhow!( + "another daemon holds {} (pid unknown)", + lock_file.display(), + ), + }); + } + + // We hold the lock. Stamp our pid into both files. + let me = std::process::id().to_string(); + use std::io::{Seek, SeekFrom, Write}; + let mut f = file; + f.set_len(0) + .with_context(|| format!("truncate {}", lock_file.display()))?; + f.seek(SeekFrom::Start(0)) + .with_context(|| format!("seek {}", lock_file.display()))?; + f.write_all(me.as_bytes()) .with_context(|| format!("write {}", lock_file.display()))?; - std::fs::write(pid_file, me.to_string()) + f.sync_all() + .with_context(|| format!("sync {}", lock_file.display()))?; + + std::fs::write(pid_file, &me) .with_context(|| format!("write {}", pid_file.display()))?; - Ok(()) -} -pub fn release_lock(lock_file: &Path, pid_file: &Path) { - let _ = std::fs::remove_file(lock_file); - let _ = std::fs::remove_file(pid_file); + Ok(LockGuard { + lock_file: lock_file.to_path_buf(), + pid_file: pid_file.to_path_buf(), + _fd: f, + }) } +// flock(2) is not in libc-side `nix` crate without the "fs" feature, so we +// declare the syscall directly. This is portable Linux/macOS — no surprises. +const LOCK_EX: i32 = 2; +const LOCK_NB: i32 = 4; extern "C" { - fn kill(pid: i32, sig: i32) -> i32; + fn flock(fd: i32, operation: i32) -> i32; } -unsafe fn libc_kill(pid: i32, sig: i32) -> i32 { - kill(pid, sig) +unsafe fn libc_flock(fd: i32, op: i32) -> i32 { + flock(fd, op) } /// Daemon-side handles exposed to clients via the Unix socket. @@ -173,3 +248,58 @@ pub async fn query_status(sock_path: &Path) -> Result { fn _suppress() -> anyhow::Error { anyhow!("unused") } + +#[cfg(test)] +mod tests { + use super::*; + + fn unique_tmp(name: &str) -> std::path::PathBuf { + let p = std::env::temp_dir().join(format!("cmmd-ipc-{}-{}", std::process::id(), name)); + let _ = std::fs::remove_dir_all(&p); + std::fs::create_dir_all(&p).unwrap(); + p + } + + #[test] + fn second_acquire_fails_while_first_held() { + let dir = unique_tmp("flock-double"); + let lock = dir.join("daemon.lock"); + let pid = dir.join("daemon.pid"); + let first = acquire_lock(&lock, &pid).expect("first acquire"); + let second = acquire_lock(&lock, &pid); + assert!(second.is_err(), "second acquire must fail while first held"); + let msg = format!("{}", second.unwrap_err()); + assert!( + msg.contains("another daemon holds"), + "error mentions the lock collision: {msg}" + ); + drop(first); + // After drop the lock + pid files should be gone. + assert!(!lock.exists(), "lock file removed on drop"); + assert!(!pid.exists(), "pid file removed on drop"); + } + + #[test] + fn third_acquire_succeeds_after_first_dropped() { + let dir = unique_tmp("flock-reacquire"); + let lock = dir.join("daemon.lock"); + let pid = dir.join("daemon.pid"); + { + let _g = acquire_lock(&lock, &pid).expect("first"); + } + // Once the first guard is dropped, a fresh acquire must work — this + // is the "stale lock file" case that used to require a process probe. + let _g2 = acquire_lock(&lock, &pid).expect("reacquire after drop"); + } + + #[test] + fn pid_file_contains_current_process_id() { + let dir = unique_tmp("flock-pid"); + let lock = dir.join("daemon.lock"); + let pid = dir.join("daemon.pid"); + let _g = acquire_lock(&lock, &pid).expect("acquire"); + let written = std::fs::read_to_string(&pid).unwrap(); + let want = std::process::id().to_string(); + assert_eq!(written.trim(), want); + } +} diff --git a/src/main.rs b/src/main.rs index 491e07d..8ef6ad8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -216,7 +216,9 @@ fn run_orphan_node(action: OrphanNodeAction) -> Result<()> { Ok(()) } OrphanNodeAction::Reap { json } => { - let result = orphan_node::reap_orphans(); + // CLI invocation = explicit operator action; honor with real kills. + // For dry-run behavior in the daemon tick, see HOUSEKEEPER_DRY_RUN. + let result = orphan_node::reap_orphans(false); if json { println!("{}", serde_json::to_string_pretty(&result)?); } else { @@ -237,7 +239,8 @@ fn run_orphan_node(action: OrphanNodeAction) -> Result<()> { fn run_tmux_janitor(action: TmuxAction) -> Result<()> { match action { TmuxAction::Cleanup { json } => { - let result = tmux_janitor::cleanup_unattached(); + // CLI invocation = explicit operator action. + let result = tmux_janitor::cleanup_unattached(false); if json { println!("{}", serde_json::to_string_pretty(&result)?); } else { @@ -253,7 +256,8 @@ fn run_tmux_janitor(action: TmuxAction) -> Result<()> { fn run_pressure(respond: bool, json: bool) -> Result<()> { let result = if respond { - pressure::check_and_respond() + // CLI invocation = explicit operator action. + pressure::check_and_respond(false) } else { pressure::PressureResponse { mem: pressure::read_meminfo(), @@ -637,7 +641,12 @@ async fn run_doctor(cfg: config::Config) -> Result<()> { } async fn run_daemon(cfg: config::Config, once: bool) -> Result<()> { - ipc::acquire_lock(&cfg.lock_file, &cfg.pid_file)?; + // RAII flock guard — kept alive for the lifetime of run_daemon. The flock + // is what guarantees only one cmmd can pass this point; the path-based + // probe before it was racy (two daemons could both pass the existence + // check). On Drop, the guard removes the lock + pid files; on crash, the + // kernel releases the flock automatically. + let _lock = ipc::acquire_lock(&cfg.lock_file, &cfg.pid_file)?; info!( pid = std::process::id(), memory = %cfg.memory_root.display(), @@ -717,7 +726,8 @@ async fn run_daemon(cfg: config::Config, once: bool) -> Result<()> { ) .await; - ipc::release_lock(&cfg_arc.lock_file, &cfg_arc.pid_file); + // _lock drops here -> removes lock + pid files, releases flock. + drop(_lock); info!("daemon down"); result } @@ -789,25 +799,41 @@ async fn main_loop( log_login_summary(&am, &accts); // --- Housekeeping: run every tick regardless of memory-manager logic --- + // HOUSEKEEPER_DRY_RUN flips all three into report-only mode so an + // operator can watch what *would* be killed before promoting to real + // signals. The counts are recorded into cmmd_housekeeper_actions_total + // (mode="real" vs mode="dry_run") so dashboards can compare. + let hk_dry = cfg.housekeeper_dry_run; // 1. Kill orphaned tmux sessions - let tmux_result = tmux_janitor::cleanup_unattached(); + let tmux_result = tmux_janitor::cleanup_unattached(hk_dry); if !tmux_result.killed.is_empty() { - info!(killed = ?tmux_result.killed, "tmux janitor"); + info!(killed = ?tmux_result.killed, dry_run = hk_dry, "tmux janitor"); } // 2. Reap orphaned node processes (mcpvault, mcp-server, worker-service) - let orphan_result = orphan_node::reap_orphans(); + let orphan_result = orphan_node::reap_orphans(hk_dry); if !orphan_result.reaped.is_empty() { - info!(count = orphan_result.reaped.len(), "orphan node reaper"); + info!( + count = orphan_result.reaped.len(), + dry_run = hk_dry, + "orphan node reaper" + ); } // 3. RAM pressure response - let pressure_result = pressure::check_and_respond(); + let pressure_result = pressure::check_and_respond(hk_dry); if pressure_result.threshold_exceeded { info!( used_pct = pressure_result.mem.used_pct, actions = ?pressure_result.actions_taken, + dry_run = hk_dry, "pressure response" ); } + metrics_handle.record_housekeeper( + hk_dry, + tmux_result.killed.len() as u64, + orphan_result.reaped.len() as u64, + pressure_result.actions_taken.len() as u64, + ); let dry_run_now = *dry_run_runtime.lock().await; state.write().await.dry_run = dry_run_now; diff --git a/src/metrics.rs b/src/metrics.rs index ce52a5d..0e45ba6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -31,6 +31,17 @@ pub struct Metrics { pub ticks_blocked_daily_cap_total: AtomicU64, pub ticks_reverted_fix_cap_total: AtomicU64, pub day_ticks_ran: AtomicU64, + // Housekeeper visibility. One counter per kind, summed across ticks. + // The render layer emits them as a single Prometheus metric with a + // `kind` label so dashboards can filter cleanly. A "dry_run" variant + // is published separately so an operator can see what *would* have + // been killed without enabling kills. + pub housekeeper_kills_tmux_total: AtomicU64, + pub housekeeper_kills_orphan_node_total: AtomicU64, + pub housekeeper_actions_pressure_total: AtomicU64, + pub housekeeper_dryrun_tmux_total: AtomicU64, + pub housekeeper_dryrun_orphan_node_total: AtomicU64, + pub housekeeper_dryrun_pressure_total: AtomicU64, } impl Metrics { @@ -90,6 +101,32 @@ impl Metrics { self.day_ticks_ran.store(n, Ordering::Relaxed); } + /// Record housekeeper kill/action counts for this tick. Pass the count + /// for each kind; pass 0 for kinds that did nothing. + pub fn record_housekeeper( + &self, + dry_run: bool, + tmux: u64, + orphan_node: u64, + pressure: u64, + ) { + if dry_run { + self.housekeeper_dryrun_tmux_total + .fetch_add(tmux, Ordering::Relaxed); + self.housekeeper_dryrun_orphan_node_total + .fetch_add(orphan_node, Ordering::Relaxed); + self.housekeeper_dryrun_pressure_total + .fetch_add(pressure, Ordering::Relaxed); + } else { + self.housekeeper_kills_tmux_total + .fetch_add(tmux, Ordering::Relaxed); + self.housekeeper_kills_orphan_node_total + .fetch_add(orphan_node, Ordering::Relaxed); + self.housekeeper_actions_pressure_total + .fetch_add(pressure, Ordering::Relaxed); + } + } + /// `tick_interval_sec` lets the exporter publish a derived gauge for /// "is the daemon stuck?" — staleness > 2× interval is unhealthy. pub fn render_prometheus(&self, tick_interval_sec: u64) -> String { @@ -197,6 +234,40 @@ impl Metrics { "cmmd_day_ticks_ran {}\n", self.day_ticks_ran.load(Ordering::Relaxed) )); + // One labeled metric per housekeeper kind. Two label dimensions: + // `kind` (which housekeeper) and `mode` (real / dry_run). + out.push_str( + "# HELP cmmd_housekeeper_actions_total Kills/actions taken by housekeepers. Labels: kind, mode.\n", + ); + out.push_str("# TYPE cmmd_housekeeper_actions_total counter\n"); + for (kind, real, dryrun) in [ + ( + "tmux", + self.housekeeper_kills_tmux_total.load(Ordering::Relaxed), + self.housekeeper_dryrun_tmux_total.load(Ordering::Relaxed), + ), + ( + "orphan_node", + self.housekeeper_kills_orphan_node_total + .load(Ordering::Relaxed), + self.housekeeper_dryrun_orphan_node_total + .load(Ordering::Relaxed), + ), + ( + "pressure", + self.housekeeper_actions_pressure_total + .load(Ordering::Relaxed), + self.housekeeper_dryrun_pressure_total + .load(Ordering::Relaxed), + ), + ] { + out.push_str(&format!( + "cmmd_housekeeper_actions_total{{kind=\"{kind}\",mode=\"real\"}} {real}\n" + )); + out.push_str(&format!( + "cmmd_housekeeper_actions_total{{kind=\"{kind}\",mode=\"dry_run\"}} {dryrun}\n" + )); + } out } } @@ -293,9 +364,43 @@ mod tests { "cmmd_history_appends_total", "cmmd_last_tick_unix", "cmmd_tick_staleness_seconds", + "cmmd_history_rotations_total", + "cmmd_tick_logs_swept_total", + "cmmd_git_gc_runs_total", + "cmmd_ticks_blocked_daily_cap_total", + "cmmd_ticks_reverted_fix_cap_total", + "cmmd_day_ticks_ran", + "cmmd_housekeeper_actions_total", ] { assert!(out.contains(name), "missing metric {name} in:\n{out}"); } + // Housekeeper metric publishes both real and dry_run modes for each kind. + for kind in ["tmux", "orphan_node", "pressure"] { + for mode in ["real", "dry_run"] { + let needle = format!("kind=\"{kind}\",mode=\"{mode}\""); + assert!( + out.contains(&needle), + "missing housekeeper label combo {needle}" + ); + } + } + } + + #[test] + fn housekeeper_counters_route_by_mode() { + let m = Metrics::default(); + m.record_housekeeper(false, 2, 3, 0); + m.record_housekeeper(true, 1, 1, 5); + assert_eq!(m.housekeeper_kills_tmux_total.load(Ordering::Relaxed), 2); + assert_eq!( + m.housekeeper_kills_orphan_node_total.load(Ordering::Relaxed), + 3 + ); + assert_eq!(m.housekeeper_dryrun_tmux_total.load(Ordering::Relaxed), 1); + assert_eq!( + m.housekeeper_dryrun_pressure_total.load(Ordering::Relaxed), + 5 + ); } #[test] diff --git a/src/orphan_node.rs b/src/orphan_node.rs index b73c256..f8ccdcf 100644 --- a/src/orphan_node.rs +++ b/src/orphan_node.rs @@ -105,12 +105,28 @@ pub fn find_orphans() -> Vec { } /// Kill orphaned node processes. Returns what was reaped. -pub fn reap_orphans() -> ReapResult { +/// +/// `dry_run=true` reports candidates without sending SIGTERM. The returned +/// `reaped` list represents what *would* have been killed — operators can +/// inspect it via the new `cmmd_housekeeper_kills_total{kind="orphan_node"}` +/// counter and the daemon log to validate the pattern set before enabling +/// real kills. +pub fn reap_orphans(dry_run: bool) -> ReapResult { let orphans = find_orphans(); let scanned = orphans.len(); let mut reaped = Vec::new(); for proc in orphans { + if dry_run { + info!( + pid = proc.pid, + rss_kb = proc.rss_kb, + cmd = %proc.cmdline.chars().take(80).collect::(), + "dry-run: would reap orphan node process" + ); + reaped.push(proc); + continue; + } let pid = Pid::from_raw(proc.pid as i32); if kill(pid, Signal::SIGTERM).is_ok() { info!( diff --git a/src/pressure.rs b/src/pressure.rs index 3c0010e..cbab4c5 100644 --- a/src/pressure.rs +++ b/src/pressure.rs @@ -58,7 +58,11 @@ fn parse_kb(line: &str) -> u64 { } /// Check RAM pressure and take actions if thresholds exceeded. -pub fn check_and_respond() -> PressureResponse { +/// +/// When `dry_run` is true the function still reports the actions it *would* +/// take in `actions_taken`, but skips the side effects: no `docker container +/// prune`, no `drop_caches`, no writes to `/proc/sys/vm/...`. +pub fn check_and_respond(dry_run: bool) -> PressureResponse { let mem = read_meminfo(); let mut actions = Vec::new(); @@ -70,32 +74,50 @@ pub fn check_and_respond() -> PressureResponse { }; } - info!(used_pct = mem.used_pct, "RAM pressure detected"); + info!(used_pct = mem.used_pct, dry_run, "RAM pressure detected"); // Level 1: Prune stopped docker containers - if run_cmd("docker", &["container", "prune", "-f"]) { + if dry_run { + actions.push("docker container prune (dry-run)".into()); + } else if run_cmd("docker", &["container", "prune", "-f"]) { actions.push("docker container prune".into()); } // Level 2: Drop page cache (safe — kernel rebuilds as needed) if mem.used_pct >= CRITICAL_THRESHOLD_PCT { - // sync first to flush dirty pages - let _ = Command::new("sync").status(); - if fs::write("/proc/sys/vm/drop_caches", "1").is_ok() { - actions.push("drop_caches=1".into()); + if dry_run { + actions.push("drop_caches=1 (dry-run)".into()); } else { - // Try via sudo - if run_cmd("sudo", &["sh", "-c", "echo 1 > /proc/sys/vm/drop_caches"]) { - actions.push("drop_caches=1 (sudo)".into()); + // sync first to flush dirty pages + let _ = Command::new("sync").status(); + if fs::write("/proc/sys/vm/drop_caches", "1").is_ok() { + actions.push("drop_caches=1".into()); + } else { + // Try via sudo + if run_cmd("sudo", &["sh", "-c", "echo 1 > /proc/sys/vm/drop_caches"]) { + actions.push("drop_caches=1 (sudo)".into()); + } } } } // Level 3: Enforce swap tuning (in case it was reset) - enforce_swap_tuning(&mut actions); + if dry_run { + // Probe the current values without writing so the operator can see + // what the housekeeper would have changed. + for path in ["/proc/sys/vm/swappiness", "/proc/sys/vm/page-cluster"] { + let current = fs::read_to_string(path) + .unwrap_or_default() + .trim() + .to_string(); + actions.push(format!("{path}={current} (dry-run, no write)")); + } + } else { + enforce_swap_tuning(&mut actions); + } if !actions.is_empty() { - info!(actions = ?actions, "pressure response complete"); + info!(actions = ?actions, dry_run, "pressure response complete"); } PressureResponse { diff --git a/src/process.rs b/src/process.rs index d9ef824..07ba32f 100644 --- a/src/process.rs +++ b/src/process.rs @@ -78,8 +78,10 @@ pub struct MemoryHolder { /// Implementation: shells out to `lsof +D `. If lsof is missing /// or fails, returns `Err` so the caller can decide whether to fall back to /// the conservative process-name guard. +/// +/// Sync variant — kept for non-async callers (`cmmd doctor`, tests). The +/// hot path (`tick::run`) uses the async timeout-wrapped version below. pub fn memory_holders(memory_root: &Path) -> Result, String> { - let me = std::process::id(); let out = Command::new("lsof") .arg("-F") .arg("pcn") // machine-readable: p=pid c=cmd n=name @@ -88,8 +90,48 @@ pub fn memory_holders(memory_root: &Path) -> Result, String> { .output() .map_err(|e| format!("lsof spawn failed: {e}"))?; // lsof returns 1 when there are no matching files — that's success-with-zero-results. - let stdout = String::from_utf8_lossy(&out.stdout); + Ok(parse_lsof_pcn(&String::from_utf8_lossy(&out.stdout))) +} +/// Same as [`memory_holders`] but spawns lsof under a wall-clock timeout. If +/// lsof doesn't return within `timeout_secs`, the spawned process is killed +/// and `Err("lsof timeout")` is returned so the caller can fall back to the +/// conservative process-name guard. +/// +/// This exists because the blocking lsof recursively walks `memory_root`; +/// on a slow filesystem (sshfs, network mount) it can hang for the lifetime +/// of the tick and wedge every subsequent iteration. The 0-timeout variant +/// is preserved for parity with the sync function. +pub async fn memory_holders_with_timeout( + memory_root: &Path, + timeout_secs: u64, +) -> Result, String> { + if timeout_secs == 0 { + // Opt-out path: behave exactly like the sync version. Useful for + // operators who can't tolerate the kill-on-deadline behavior. + return memory_holders(memory_root); + } + let dur = std::time::Duration::from_secs(timeout_secs); + let mut cmd = tokio::process::Command::new("lsof"); + cmd.arg("-F") + .arg("pcn") + .arg("+D") + .arg(memory_root) + .kill_on_drop(true); + let fut = cmd.output(); + let out = match tokio::time::timeout(dur, fut).await { + Ok(Ok(o)) => o, + Ok(Err(e)) => return Err(format!("lsof spawn failed: {e}")), + Err(_) => return Err(format!("lsof timeout after {timeout_secs}s")), + }; + Ok(parse_lsof_pcn(&String::from_utf8_lossy(&out.stdout))) +} + +/// Parse the `lsof -F pcn` machine-readable format. Lifted out so both the +/// sync and async variants share the same parser — easier to test, easier to +/// keep in sync if the lsof flags change. +fn parse_lsof_pcn(stdout: &str) -> Vec { + let me = std::process::id(); let mut holders: Vec = Vec::new(); let mut cur_pid: Option = None; let mut cur_name: Option = None; @@ -109,5 +151,39 @@ pub fn memory_holders(memory_root: &Path) -> Result, String> { } } } - Ok(holders) + holders +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_lsof_skips_self_pid() { + let me = std::process::id(); + let other = me + 1; + let body = format!("p{me}\nccmd-self\nn/x\np{other}\nccmd-other\nn/y\n"); + let h = parse_lsof_pcn(&body); + assert_eq!(h.len(), 1, "self pid filtered: got {:?}", h); + assert_eq!(h[0].pid, other); + assert_eq!(h[0].name, "cmd-other"); + } + + #[test] + fn parse_lsof_dedupes_repeated_pid() { + let other = std::process::id() + 99; + let body = format!( + "p{other}\ncfoo\nn/a\np{other}\ncfoo\nn/b\np{other}\ncfoo\nn/c\n" + ); + let h = parse_lsof_pcn(&body); + assert_eq!(h.len(), 1, "duplicate pid coalesced"); + } + + #[tokio::test] + async fn memory_holders_zero_timeout_falls_back_to_sync() { + // Smoke test: zero timeout means "behave like the sync call". + // We pass /tmp which always exists; we don't care about the result, + // only that the function returns without hanging. + let _ = memory_holders_with_timeout(std::path::Path::new("/tmp"), 0).await; + } } diff --git a/src/tick.rs b/src/tick.rs index bff422f..9f87f2f 100644 --- a/src/tick.rs +++ b/src/tick.rs @@ -47,7 +47,11 @@ pub async fn run( // Strategy: ask lsof which (foreign) PIDs currently hold a file under // memory_root open. If lsof is unavailable, fall back to the conservative // process-name guard (any other claude/kiro process aborts the tick). - match crate::process::memory_holders(memory_root) { + // + // The lsof call is bounded by `cfg.lsof_timeout_sec` — on a slow FS it + // would otherwise hang the whole tick. On timeout, treat it the same as + // "lsof unavailable" and fall back to the name-only guard. + match crate::process::memory_holders_with_timeout(memory_root, cfg.lsof_timeout_sec).await { Ok(holders) if !holders.is_empty() => { let names: Vec = holders .iter() diff --git a/src/tmux_janitor.rs b/src/tmux_janitor.rs index a757193..fff97d5 100644 --- a/src/tmux_janitor.rs +++ b/src/tmux_janitor.rs @@ -50,12 +50,21 @@ fn list_sessions() -> Vec { /// Kill unattached sessions matching the `term-*` pattern. /// These are auto-created by .bashrc and should die when the window closes. -pub fn cleanup_unattached() -> CleanupResult { +/// +/// When `dry_run` is true, the function still enumerates matching sessions +/// and reports them via the returned `killed` list (operator-facing: "would +/// have killed these") but does NOT actually send the kill-session command. +pub fn cleanup_unattached(dry_run: bool) -> CleanupResult { let sessions = list_sessions(); let mut killed = Vec::new(); for sess in &sessions { if !sess.attached && sess.name.starts_with("term-") { + if dry_run { + info!(session = %sess.name, "dry-run: would kill unattached tmux session"); + killed.push(sess.name.clone()); + continue; + } let status = Command::new("tmux") .args(["kill-session", "-t", &sess.name]) .status();