Skip to content

Commit 75f60c8

Browse files
committed
fix(watcher): stop debounce thread on unwatch to prevent fd leak
WatcherHandle::stop() previously only signalled the async rebuild task but left the std::thread (with its notify::RecommendedWatcher) running forever. Each session switch orphaned a thread holding an OS watcher fd (kqueue/FSEvents). After enough switches, fd exhaustion silently caused RecommendedWatcher::new() or watcher.watch() to fail, so no events were ever delivered and auto-refresh stopped working. Fix: - Add thread_stop_tx/rx (std::sync::mpsc::sync_channel) alongside the existing tokio stop channel - run_debounce_loop checks thread_stop_rx at the top of each 100 ms iteration and breaks when signalled or disconnected - WatcherHandle::stop() now signals both the async task and the thread - watcher drops at thread exit, releasing the OS fd immediately Same fix applied to start_picker_watcher. Tests: debounce_loop_exits_on_stop_signal, debounce_loop_exits_when_stop_sender_dropped, watcher_handle_stop_is_idempotent
1 parent 9ff94a9 commit 75f60c8

1 file changed

Lines changed: 81 additions & 2 deletions

File tree

src-tauri/src/watcher.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,22 @@ const WATCHER_DEBOUNCE: Duration = Duration::from_millis(200);
1616

1717
/// Run a debounced file-change loop: receive notify events, apply `filter`,
1818
/// and send a signal after `WATCHER_DEBOUNCE` of quiet time.
19+
/// Exits when `thread_stop_rx` receives a value or is disconnected.
1920
fn run_debounce_loop(
2021
rx: std::sync::mpsc::Receiver<Result<notify::Event, notify::Error>>,
2122
filter: impl Fn(&notify::Event) -> bool,
2223
signal_tx: mpsc::Sender<()>,
24+
thread_stop_rx: std::sync::mpsc::Receiver<()>,
2325
) {
2426
let mut debounce_timer: Option<std::time::Instant> = None;
2527

2628
loop {
29+
// Check for an explicit stop signal before blocking on notify events.
30+
match thread_stop_rx.try_recv() {
31+
Ok(()) | Err(std::sync::mpsc::TryRecvError::Disconnected) => break,
32+
Err(std::sync::mpsc::TryRecvError::Empty) => {}
33+
}
34+
2735
match rx.recv_timeout(Duration::from_millis(100)) {
2836
Ok(Ok(event)) => {
2937
if filter(&event) {
@@ -46,12 +54,16 @@ fn run_debounce_loop(
4654

4755
/// Handle for stopping a file watcher (session or picker).
4856
pub struct WatcherHandle {
57+
/// Stops the async rebuild task.
4958
stop_tx: mpsc::Sender<()>,
59+
/// Stops the underlying std::thread running notify + debounce, releasing OS watcher fds.
60+
thread_stop_tx: std::sync::mpsc::SyncSender<()>,
5061
}
5162

5263
impl WatcherHandle {
5364
pub fn stop(&self) {
5465
let _ = self.stop_tx.try_send(());
66+
let _ = self.thread_stop_tx.try_send(());
5567
}
5668
}
5769

@@ -69,6 +81,7 @@ struct SessionUpdatePayload {
6981
pub fn start_session_watcher(path: String, app: AppHandle) -> WatcherHandle {
7082
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
7183
let (signal_tx, mut signal_rx) = mpsc::channel::<()>(4);
84+
let (thread_stop_tx, thread_stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
7285

7386
let path_clone = path.clone();
7487
let signal_tx_clone = signal_tx.clone();
@@ -125,7 +138,9 @@ pub fn start_session_watcher(path: String, app: AppHandle) -> WatcherHandle {
125138
})
126139
},
127140
signal_tx,
141+
thread_stop_rx,
128142
);
143+
// watcher dropped here → OS watcher fd released
129144
});
130145

131146
// Spawn the async rebuild loop.
@@ -238,7 +253,10 @@ pub fn start_session_watcher(path: String, app: AppHandle) -> WatcherHandle {
238253
}
239254
});
240255

241-
WatcherHandle { stop_tx }
256+
WatcherHandle {
257+
stop_tx,
258+
thread_stop_tx,
259+
}
242260
}
243261

244262
/// Serializable picker refresh event.
@@ -251,6 +269,7 @@ struct PickerRefreshPayload {
251269
pub fn start_picker_watcher(project_dirs: Vec<String>, app: AppHandle) -> WatcherHandle {
252270
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
253271
let (signal_tx, mut signal_rx) = mpsc::channel::<()>(4);
272+
let (thread_stop_tx, thread_stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
254273

255274
// Derive unique parent directories (e.g. ~/.claude/projects) from the
256275
// individual project dirs. Watching the parent instead of individual
@@ -295,7 +314,9 @@ pub fn start_picker_watcher(project_dirs: Vec<String>, app: AppHandle) -> Watche
295314
})
296315
},
297316
signal_tx,
317+
thread_stop_rx,
298318
);
319+
// watcher dropped here → OS watcher fd released
299320
});
300321

301322
// Spawn the async refresh loop.
@@ -356,5 +377,63 @@ pub fn start_picker_watcher(project_dirs: Vec<String>, app: AppHandle) -> Watche
356377
}
357378
});
358379

359-
WatcherHandle { stop_tx }
380+
WatcherHandle {
381+
stop_tx,
382+
thread_stop_tx,
383+
}
384+
}
385+
386+
#[cfg(test)]
387+
mod tests {
388+
use super::*;
389+
use tokio::sync::mpsc;
390+
391+
/// run_debounce_loop must exit when the stop signal is sent.
392+
#[test]
393+
fn debounce_loop_exits_on_stop_signal() {
394+
let (signal_tx, _signal_rx) = mpsc::channel::<()>(4);
395+
let (thread_stop_tx, thread_stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
396+
let (_notify_tx, notify_rx) = std::sync::mpsc::channel();
397+
398+
thread_stop_tx.send(()).unwrap();
399+
400+
let handle = std::thread::spawn(move || {
401+
run_debounce_loop(notify_rx, |_| false, signal_tx, thread_stop_rx);
402+
});
403+
404+
handle
405+
.join()
406+
.expect("debounce thread should exit after stop signal");
407+
}
408+
409+
/// run_debounce_loop must exit when the stop sender is dropped (Disconnected).
410+
#[test]
411+
fn debounce_loop_exits_when_stop_sender_dropped() {
412+
let (signal_tx, _signal_rx) = mpsc::channel::<()>(4);
413+
let (thread_stop_tx, thread_stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
414+
let (_notify_tx, notify_rx) = std::sync::mpsc::channel();
415+
416+
drop(thread_stop_tx);
417+
418+
let handle = std::thread::spawn(move || {
419+
run_debounce_loop(notify_rx, |_| false, signal_tx, thread_stop_rx);
420+
});
421+
422+
handle
423+
.join()
424+
.expect("debounce thread should exit when stop sender is dropped");
425+
}
426+
427+
/// WatcherHandle::stop() must not panic when called multiple times on a closed channel.
428+
#[test]
429+
fn watcher_handle_stop_is_idempotent() {
430+
let (stop_tx, _stop_rx) = mpsc::channel::<()>(1);
431+
let (thread_stop_tx, _thread_stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
432+
let handle = WatcherHandle {
433+
stop_tx,
434+
thread_stop_tx,
435+
};
436+
handle.stop();
437+
handle.stop();
438+
}
360439
}

0 commit comments

Comments
 (0)