Skip to content

Commit d3a0729

Browse files
committed
fix(s5_node): use CancellationToken so rw debounce loop exits on unmount
The rw debounce task spawned by `MountManager::spawn_rw_mount` previously took `std::future::pending::<()>()` as its `until` future, so it had no path to exit. On unmount, the FUSE session cancel oneshot fired and `mount_rw` returned, but the debounce task held its own `WritableFs` clone (received via `on_mount`) and kept waiting on the `Notify` it polls — leaking one task per mount/unmount cycle. Switch the per-mount cancel signal from a `oneshot::Sender<()>` to a `tokio_util::sync::CancellationToken`. A clone goes to `mount_rw`'s `until_fut`; another clone goes to `debounce::run`'s `until_fut`. `unmount()` calls `cancel.cancel()` once, both branches wake. The unmount path still awaits the FUSE session's join handle, so the actual `umount(2)` is complete by the time the RPC returns; the debounce task exits in the background (it may still be processing a flush callback when cancel fires, which completes after the await returns). Module docstring updated to describe the new shape. Validation: workspace clippy, build, and test sweep all clean. Assisted-by: claude-opus-4-7
1 parent b36e6b9 commit d3a0729

1 file changed

Lines changed: 34 additions & 27 deletions

File tree

s5_node/src/fuse.rs

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
//! config, opens the meta store + each `[store.<name>]` as a
1414
//! `LocalStore`, decrypts the vault root, builds a `Snapshot`,
1515
//! spawns a tokio task that calls `s5_fuse::mount` (or `mount_rw`)
16-
//! with a oneshot-driven cancellation future, and stores the cancel
17-
//! sender + join handle under a fresh `mount_id`.
16+
//! with a `CancellationToken`-driven cancellation future, and stores
17+
//! the token + join handle under a fresh `mount_id`.
1818
//!
19-
//! Unmount: `MountManager::unmount` removes the entry, sends on the
20-
//! cancel oneshot (which makes the mount task return, which drops
21-
//! the `MountHandle`, which performs the actual FUSE unmount), and
22-
//! awaits the join handle.
19+
//! Unmount: `MountManager::unmount` removes the entry, calls
20+
//! `cancel.cancel()` (which wakes both the FUSE session and any
21+
//! attached debounce loop), and awaits the join handle so the actual
22+
//! `umount(2)` has finished by the time the RPC returns.
2323
//!
2424
//! ## rw mounts
2525
//!
@@ -28,8 +28,9 @@
2828
//! hands back. On every debounce: fold the overlay into a fresh
2929
//! snapshot (`flush_overlay`), persist the new vault root, and
3030
//! submit a `Publish` task to the daemon's executor. The debounce
31-
//! task also listens on the same cancel oneshot so it tears down
32-
//! cleanly on unmount.
31+
//! task listens on a clone of the same `CancellationToken` so it
32+
//! exits cleanly on unmount instead of leaking once the FUSE session
33+
//! goes away.
3334
3435
use std::collections::HashMap;
3536
use std::path::PathBuf;
@@ -41,19 +42,20 @@ use s5_core::blob::BlobStore;
4142
use s5_core::{BlobsRead, FallbackBlobsRead};
4243
use s5_fs_v2::snapshot::Snapshot;
4344
use s5_node_api::config::TaskSpec;
44-
use tokio::sync::{RwLock, oneshot};
45+
use tokio::sync::RwLock;
4546
use tokio::task::JoinHandle;
47+
use tokio_util::sync::CancellationToken;
4648

4749
use crate::config::NodeConfigStore;
4850
use crate::tasks::TaskExecutor;
4951
use crate::tasks::vault_persist::{load_vault_root, save_vault_root, vault_root_path};
5052

51-
/// Per-mount state the manager keeps. `cancel` triggers shutdown of
52-
/// both the FUSE session and (for rw) its attached debounce loop;
53-
/// `join` is the mount task itself, awaited on unmount so the actual
54-
/// `umount(2)` has finished by the time the RPC returns.
53+
/// Per-mount state the manager keeps. `cancel.cancel()` wakes both
54+
/// the FUSE session and (for rw) its attached debounce loop in a
55+
/// single shot; `join` is the mount task itself, awaited on unmount
56+
/// so the actual `umount(2)` has finished by the time the RPC returns.
5557
struct ActiveMount {
56-
cancel: oneshot::Sender<()>,
58+
cancel: CancellationToken,
5759
join: JoinHandle<Result<()>>,
5860
#[allow(dead_code)] // kept for diagnostics + future ListMounts RPC
5961
mountpoint: PathBuf,
@@ -109,10 +111,7 @@ impl MountManager {
109111
resolved.root_plaintext_hash,
110112
);
111113

112-
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
113-
let cancel_fut = async move {
114-
let _ = cancel_rx.await;
115-
};
114+
let cancel = CancellationToken::new();
116115

117116
let mountpoint_for_task = mountpoint.clone();
118117
let join: JoinHandle<Result<()>> = if rw {
@@ -122,10 +121,12 @@ impl MountManager {
122121
snapshot,
123122
mountpoint_for_task,
124123
debounce_ms,
125-
cancel_fut,
124+
cancel.clone(),
126125
)
127126
} else {
127+
let cancel_for_task = cancel.clone();
128128
tokio::spawn(async move {
129+
let cancel_fut = async move { cancel_for_task.cancelled().await };
129130
s5_fuse::mount(&mountpoint_for_task, snapshot, false, true, cancel_fut)
130131
.await
131132
.with_context(|| format!("FUSE mount at {}", mountpoint_for_task.display()))
@@ -137,7 +138,7 @@ impl MountManager {
137138
mounts.insert(
138139
mount_id,
139140
ActiveMount {
140-
cancel: cancel_tx,
141+
cancel,
141142
join,
142143
mountpoint,
143144
},
@@ -155,10 +156,12 @@ impl MountManager {
155156
.remove(&mount_id)
156157
.ok_or_else(|| anyhow!("unknown mount_id {mount_id}"))?
157158
};
158-
// Best effort: sending the cancel triggers the mount task to
159-
// return; if it already exited (kernel-side eject) the receiver
160-
// is gone and `send` errors — that's fine, we still join below.
161-
let _ = active.cancel.send(());
159+
// Single signal wakes both the FUSE session and (for rw) the
160+
// debounce loop. Awaiting `join` ensures the FUSE session has
161+
// finished its `umount(2)` before we return — debounce exit is
162+
// best-effort (it may still be processing a flush after cancel
163+
// fires; that completes in the background).
164+
active.cancel.cancel();
162165
match active.join.await {
163166
Ok(Ok(())) => Ok(()),
164167
Ok(Err(e)) => Err(e.context(format!("mount task for id {mount_id}"))),
@@ -173,7 +176,7 @@ impl MountManager {
173176
snapshot: Snapshot,
174177
mountpoint: PathBuf,
175178
debounce_ms: u64,
176-
cancel_fut: impl std::future::Future<Output = ()> + Send + 'static,
179+
cancel: CancellationToken,
177180
) -> JoinHandle<Result<()>> {
178181
let executor = self.executor.clone();
179182
let primary_store = resolved.primary_store.clone();
@@ -184,14 +187,17 @@ impl MountManager {
184187
tokio::spawn(async move {
185188
// The on_mount callback fires once the FS is built but
186189
// before the kernel sees it; we use it to spawn the
187-
// debounce task with a clone of the WritableFs.
190+
// debounce task with a clone of the WritableFs and the
191+
// same cancellation token so unmount tears it down.
188192
let executor_for_cb = executor.clone();
189193
let vault_for_cb = vault.clone();
190194
let recipient_key_names_for_cb = recipient_key_names.clone();
191195
let recipient_pubkeys_for_cb = recipient_pubkeys.clone();
192196
let vault_root_file_for_cb = vault_root_file.clone();
197+
let cancel_for_debounce = cancel.clone();
193198
let on_mount = move |fs: s5_fuse::WritableFs| {
194199
tokio::spawn(async move {
200+
let cancel_fut = async move { cancel_for_debounce.cancelled().await };
195201
s5_fuse::debounce::run(
196202
fs,
197203
std::time::Duration::from_millis(debounce_ms),
@@ -213,12 +219,13 @@ impl MountManager {
213219
.await
214220
}
215221
},
216-
std::future::pending::<()>(),
222+
cancel_fut,
217223
)
218224
.await;
219225
});
220226
};
221227

228+
let cancel_fut = async move { cancel.cancelled().await };
222229
s5_fuse::mount_rw(
223230
&mountpoint,
224231
snapshot,

0 commit comments

Comments
 (0)