Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ pub struct Config {
/// Cadence for `git gc --prune=now --aggressive` over each MEMORY_ROOT.
/// 0 = disabled.
pub git_gc_interval_days: u64,
/// Wall-clock cap for the `lsof +D <memory_root>` probe used to detect
/// live editors. A slow filesystem (sshfs, network mount) could hang the
/// blocking lsof and wedge the whole tick; this kills it instead.
/// 0 = disabled (use the old unbounded blocking call).
pub lsof_timeout_sec: u64,
/// Run housekeepers (tmux_janitor, orphan_node, pressure) in
/// report-only mode — log what *would* be killed without actually
/// sending signals. Same intent as DRY_RUN but scoped to housekeepers
/// so the memory-manager loop can still mutate while housekeepers stay
/// silent.
pub housekeeper_dry_run: bool,
pub claude_bin: String,
pub authmux_bin: String,
pub claude_accounts_dir: PathBuf,
Expand Down Expand Up @@ -96,6 +107,8 @@ impl Config {
history_max_lines: env_u64("HISTORY_MAX_LINES", 10_000),
tick_log_ttl_days: env_u64("TICK_LOG_TTL_DAYS", 7),
git_gc_interval_days: env_u64("GIT_GC_INTERVAL_DAYS", 7),
lsof_timeout_sec: env_u64("LSOF_TIMEOUT_SEC", 5),
housekeeper_dry_run: env_bool("HOUSEKEEPER_DRY_RUN", false),
claude_bin: env_str("CLAUDE_BIN", "claude"),
authmux_bin: env_str("AUTHMUX_BIN", "authmux"),
claude_accounts_dir: env_path(
Expand Down
182 changes: 156 additions & 26 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

use anyhow::{anyhow, Context, Result};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::os::fd::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
Expand Down Expand Up @@ -34,39 +35,113 @@ pub struct TickRecord {
pub exit_code: Option<i32>,
}

pub fn acquire_lock(lock_file: &Path, pid_file: &Path) -> Result<()> {
if lock_file.exists() {
let pid_str = std::fs::read_to_string(lock_file).unwrap_or_default();
if let Ok(pid) = pid_str.trim().parse::<i32>() {
// signal 0 = existence probe
if unsafe { libc_kill(pid, 0) } == 0 {
return Err(anyhow!(
"another daemon holds {} (pid={})",
lock_file.display(),
pid
));
}
}
let _ = std::fs::remove_file(lock_file);
/// RAII guard for the daemon's exclusive process-singleton lock.
///
/// Two-line summary of how this defeats the old race:
/// 1. The lock is acquired with `flock(LOCK_EX|LOCK_NB)` on an *fd we hold*.
/// Kernel guarantees only one fd at a time can hold the exclusive lock,
/// so two cmmd processes can't both pass startup, even if they call
/// `acquire_lock` at the same microsecond.
/// 2. On crash, the kernel closes our fd and releases the lock for us. No
/// stale-lock-file false positives, no PID-recycle ambiguity.
///
/// On Drop the guard removes the lock_file and pid_file. The fd is dropped
/// last, which is when the kernel releases the actual lock.
#[derive(Debug)]
pub struct LockGuard {
lock_file: PathBuf,
pid_file: PathBuf,
// Held for the lifetime of the daemon. The fd's close releases the flock.
// Keep this *after* the path fields so Drop runs lock_file/pid_file
// removal before the fd close — that ordering avoids a tiny window in
// which another daemon could see the file gone and re-acquire while our
// fd is still in close().
_fd: std::fs::File,
}

impl Drop for LockGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.lock_file);
let _ = std::fs::remove_file(&self.pid_file);
// _fd dropped here -> kernel releases the flock.
}
}

/// Acquire the daemon singleton lock.
///
/// Uses `flock(LOCK_EX|LOCK_NB)` on the lock file's fd. If the lock is held
/// by another live daemon, returns an Err that includes the holding PID
/// (read from the file body) for operator clarity. The flock is what makes
/// this race-free; the PID readback is purely cosmetic.
pub fn acquire_lock(lock_file: &Path, pid_file: &Path) -> Result<LockGuard> {
if let Some(parent) = lock_file.parent() {
std::fs::create_dir_all(parent).ok();
}
let me = std::process::id();
std::fs::write(lock_file, me.to_string())
// Open-or-create with read-write so we can flock + truncate + write the
// current pid. We do NOT use create_new(true) because that prevents
// re-acquiring after a clean shutdown left the file behind (intentional —
// the file is a marker, the flock is the actual lock).
let file = std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(lock_file)
.with_context(|| format!("open {}", lock_file.display()))?;

let fd = file.as_raw_fd();
// Non-blocking exclusive flock. If anyone else holds it, EWOULDBLOCK.
let rc = unsafe { libc_flock(fd, LOCK_EX | LOCK_NB) };
if rc != 0 {
// Best-effort read of the previous holder's PID for the error message.
// This is purely informational — flock already told us the truth.
let pid_hint = std::fs::read_to_string(lock_file)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok());
return Err(match pid_hint {
Some(p) => anyhow!(
"another daemon holds {} (pid={p})",
lock_file.display(),
),
None => anyhow!(
"another daemon holds {} (pid unknown)",
lock_file.display(),
),
});
}

// We hold the lock. Stamp our pid into both files.
let me = std::process::id().to_string();
use std::io::{Seek, SeekFrom, Write};
let mut f = file;
f.set_len(0)
.with_context(|| format!("truncate {}", lock_file.display()))?;
f.seek(SeekFrom::Start(0))
.with_context(|| format!("seek {}", lock_file.display()))?;
f.write_all(me.as_bytes())
.with_context(|| format!("write {}", lock_file.display()))?;
std::fs::write(pid_file, me.to_string())
f.sync_all()
.with_context(|| format!("sync {}", lock_file.display()))?;

std::fs::write(pid_file, &me)
.with_context(|| format!("write {}", pid_file.display()))?;
Ok(())
}

pub fn release_lock(lock_file: &Path, pid_file: &Path) {
let _ = std::fs::remove_file(lock_file);
let _ = std::fs::remove_file(pid_file);
Ok(LockGuard {
lock_file: lock_file.to_path_buf(),
pid_file: pid_file.to_path_buf(),
_fd: f,
})
}

// flock(2) is not in libc-side `nix` crate without the "fs" feature, so we
// declare the syscall directly. This is portable Linux/macOS — no surprises.
const LOCK_EX: i32 = 2;
const LOCK_NB: i32 = 4;
extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
fn flock(fd: i32, operation: i32) -> i32;
}
unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
kill(pid, sig)
unsafe fn libc_flock(fd: i32, op: i32) -> i32 {
flock(fd, op)
}

/// Daemon-side handles exposed to clients via the Unix socket.
Expand Down Expand Up @@ -173,3 +248,58 @@ pub async fn query_status(sock_path: &Path) -> Result<DaemonStatus> {
fn _suppress() -> anyhow::Error {
anyhow!("unused")
}

#[cfg(test)]
mod tests {
use super::*;

fn unique_tmp(name: &str) -> std::path::PathBuf {
let p = std::env::temp_dir().join(format!("cmmd-ipc-{}-{}", std::process::id(), name));
let _ = std::fs::remove_dir_all(&p);
std::fs::create_dir_all(&p).unwrap();
p
}

#[test]
fn second_acquire_fails_while_first_held() {
let dir = unique_tmp("flock-double");
let lock = dir.join("daemon.lock");
let pid = dir.join("daemon.pid");
let first = acquire_lock(&lock, &pid).expect("first acquire");
let second = acquire_lock(&lock, &pid);
assert!(second.is_err(), "second acquire must fail while first held");
let msg = format!("{}", second.unwrap_err());
assert!(
msg.contains("another daemon holds"),
"error mentions the lock collision: {msg}"
);
drop(first);
// After drop the lock + pid files should be gone.
assert!(!lock.exists(), "lock file removed on drop");
assert!(!pid.exists(), "pid file removed on drop");
}

#[test]
fn third_acquire_succeeds_after_first_dropped() {
let dir = unique_tmp("flock-reacquire");
let lock = dir.join("daemon.lock");
let pid = dir.join("daemon.pid");
{
let _g = acquire_lock(&lock, &pid).expect("first");
}
// Once the first guard is dropped, a fresh acquire must work — this
// is the "stale lock file" case that used to require a process probe.
let _g2 = acquire_lock(&lock, &pid).expect("reacquire after drop");
}

#[test]
fn pid_file_contains_current_process_id() {
let dir = unique_tmp("flock-pid");
let lock = dir.join("daemon.lock");
let pid = dir.join("daemon.pid");
let _g = acquire_lock(&lock, &pid).expect("acquire");
let written = std::fs::read_to_string(&pid).unwrap();
let want = std::process::id().to_string();
assert_eq!(written.trim(), want);
}
}
46 changes: 36 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ fn run_orphan_node(action: OrphanNodeAction) -> Result<()> {
Ok(())
}
OrphanNodeAction::Reap { json } => {
let result = orphan_node::reap_orphans();
// CLI invocation = explicit operator action; honor with real kills.
// For dry-run behavior in the daemon tick, see HOUSEKEEPER_DRY_RUN.
let result = orphan_node::reap_orphans(false);
if json {
println!("{}", serde_json::to_string_pretty(&result)?);
} else {
Expand All @@ -237,7 +239,8 @@ fn run_orphan_node(action: OrphanNodeAction) -> Result<()> {
fn run_tmux_janitor(action: TmuxAction) -> Result<()> {
match action {
TmuxAction::Cleanup { json } => {
let result = tmux_janitor::cleanup_unattached();
// CLI invocation = explicit operator action.
let result = tmux_janitor::cleanup_unattached(false);
if json {
println!("{}", serde_json::to_string_pretty(&result)?);
} else {
Expand All @@ -253,7 +256,8 @@ fn run_tmux_janitor(action: TmuxAction) -> Result<()> {

fn run_pressure(respond: bool, json: bool) -> Result<()> {
let result = if respond {
pressure::check_and_respond()
// CLI invocation = explicit operator action.
pressure::check_and_respond(false)
} else {
pressure::PressureResponse {
mem: pressure::read_meminfo(),
Expand Down Expand Up @@ -637,7 +641,12 @@ async fn run_doctor(cfg: config::Config) -> Result<()> {
}

async fn run_daemon(cfg: config::Config, once: bool) -> Result<()> {
ipc::acquire_lock(&cfg.lock_file, &cfg.pid_file)?;
// RAII flock guard — kept alive for the lifetime of run_daemon. The flock
// is what guarantees only one cmmd can pass this point; the path-based
// probe before it was racy (two daemons could both pass the existence
// check). On Drop, the guard removes the lock + pid files; on crash, the
// kernel releases the flock automatically.
let _lock = ipc::acquire_lock(&cfg.lock_file, &cfg.pid_file)?;
info!(
pid = std::process::id(),
memory = %cfg.memory_root.display(),
Expand Down Expand Up @@ -717,7 +726,8 @@ async fn run_daemon(cfg: config::Config, once: bool) -> Result<()> {
)
.await;

ipc::release_lock(&cfg_arc.lock_file, &cfg_arc.pid_file);
// _lock drops here -> removes lock + pid files, releases flock.
drop(_lock);
info!("daemon down");
result
}
Expand Down Expand Up @@ -789,25 +799,41 @@ async fn main_loop(
log_login_summary(&am, &accts);

// --- Housekeeping: run every tick regardless of memory-manager logic ---
// HOUSEKEEPER_DRY_RUN flips all three into report-only mode so an
// operator can watch what *would* be killed before promoting to real
// signals. The counts are recorded into cmmd_housekeeper_actions_total
// (mode="real" vs mode="dry_run") so dashboards can compare.
let hk_dry = cfg.housekeeper_dry_run;
// 1. Kill orphaned tmux sessions
let tmux_result = tmux_janitor::cleanup_unattached();
let tmux_result = tmux_janitor::cleanup_unattached(hk_dry);
if !tmux_result.killed.is_empty() {
info!(killed = ?tmux_result.killed, "tmux janitor");
info!(killed = ?tmux_result.killed, dry_run = hk_dry, "tmux janitor");
}
// 2. Reap orphaned node processes (mcpvault, mcp-server, worker-service)
let orphan_result = orphan_node::reap_orphans();
let orphan_result = orphan_node::reap_orphans(hk_dry);
if !orphan_result.reaped.is_empty() {
info!(count = orphan_result.reaped.len(), "orphan node reaper");
info!(
count = orphan_result.reaped.len(),
dry_run = hk_dry,
"orphan node reaper"
);
}
// 3. RAM pressure response
let pressure_result = pressure::check_and_respond();
let pressure_result = pressure::check_and_respond(hk_dry);
if pressure_result.threshold_exceeded {
info!(
used_pct = pressure_result.mem.used_pct,
actions = ?pressure_result.actions_taken,
dry_run = hk_dry,
"pressure response"
);
}
metrics_handle.record_housekeeper(
hk_dry,
tmux_result.killed.len() as u64,
orphan_result.reaped.len() as u64,
pressure_result.actions_taken.len() as u64,
);

let dry_run_now = *dry_run_runtime.lock().await;
state.write().await.dry_run = dry_run_now;
Expand Down
Loading