Skip to content

Commit 88e796f

Browse files
authored
feat: concurrent task execution with DAG scheduler (#288)
## Summary - Replace sequential task execution with a concurrent DAG scheduler using `FuturesUnordered` + `tokio::sync::Semaphore` (limit 10 per graph level) - Each nested `Expanded` graph gets its own semaphore for independent concurrency limits - Failure propagation via `CancellationToken` — any task failure immediately kills all in-flight processes - Add `barrier` test tool (cross-platform, `fs.watch`-based) for concurrency testing - Add e2e tests proving concurrent execution and kill-on-failure behavior ## Test plan - [x] All existing e2e snapshot tests pass (10 consecutive runs) - [x] New `concurrent-execution` fixture proves independent tasks run concurrently (barrier with 2 participants — would timeout if sequential) - [x] Failure test proves cancellation kills concurrent tasks (task b hangs on stdin after barrier, killed when task a fails) - [x] `cargo clippy` clean - [x] `cargo test -p vite_task` unit tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code)
1 parent 2663222 commit 88e796f

File tree

35 files changed

+705
-169
lines changed

35 files changed

+705
-169
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fspy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ rustc-hash = { workspace = true }
2020
tempfile = { workspace = true }
2121
thiserror = { workspace = true }
2222
tokio = { workspace = true, features = ["net", "process", "io-util", "sync", "rt"] }
23+
tokio-util = { workspace = true }
2324
which = { workspace = true, features = ["tracing"] }
2425
xxhash-rust = { workspace = true }
2526

crates/fspy/examples/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
1818
let mut command = fspy::Command::new(program);
1919
command.envs(std::env::vars_os()).args(args);
2020

21-
let child = command.spawn().await?;
21+
let child = command.spawn(tokio_util::sync::CancellationToken::new()).await?;
2222
let termination = child.wait_handle.await?;
2323

2424
let mut path_count = 0usize;

crates/fspy/src/command.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
use fspy_shared_unix::exec::Exec;
99
use rustc_hash::FxHashMap;
1010
use tokio::process::Command as TokioCommand;
11+
use tokio_util::sync::CancellationToken;
1112

1213
use crate::{SPY_IMPL, TrackedChild, error::SpawnError};
1314

@@ -167,9 +168,12 @@ impl Command {
167168
/// # Errors
168169
///
169170
/// Returns [`SpawnError`] if program resolution fails or the process cannot be spawned.
170-
pub async fn spawn(mut self) -> Result<TrackedChild, SpawnError> {
171+
pub async fn spawn(
172+
mut self,
173+
cancellation_token: CancellationToken,
174+
) -> Result<TrackedChild, SpawnError> {
171175
self.resolve_program()?;
172-
SPY_IMPL.spawn(self).await
176+
SPY_IMPL.spawn(self, cancellation_token).await
173177
}
174178

175179
/// Resolve program name to full path using `PATH` and cwd.

crates/fspy/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ pub struct TrackedChild {
5454

5555
/// The future that resolves to exit status and path accesses when the process exits.
5656
pub wait_handle: BoxFuture<'static, io::Result<ChildTermination>>,
57+
58+
/// A duplicated process handle of the child, captured before the tokio `Child`
59+
/// is moved into the background wait task. This is an independently owned handle
60+
/// (via `DuplicateHandle`) so it remains valid even after tokio closes its copy.
61+
/// Callers can use this to assign the process to a Win32 Job Object.
62+
#[cfg(windows)]
63+
pub process_handle: std::os::windows::io::OwnedHandle,
5764
}
5865

5966
pub(crate) static SPY_IMPL: LazyLock<SpyImpl> = LazyLock::new(|| {

crates/fspy/src/unix/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use futures_util::FutureExt;
2222
#[cfg(target_os = "linux")]
2323
use syscall_handler::SyscallHandler;
2424
use tokio::task::spawn_blocking;
25+
use tokio_util::sync::CancellationToken;
2526

2627
use crate::{
2728
ChildTermination, Command, TrackedChild,
@@ -80,7 +81,11 @@ impl SpyImpl {
8081
})
8182
}
8283

83-
pub(crate) async fn spawn(&self, mut command: Command) -> Result<TrackedChild, SpawnError> {
84+
pub(crate) async fn spawn(
85+
&self,
86+
mut command: Command,
87+
cancellation_token: CancellationToken,
88+
) -> Result<TrackedChild, SpawnError> {
8489
#[cfg(target_os = "linux")]
8590
let supervisor = supervise::<SyscallHandler>().map_err(SpawnError::Supervisor)?;
8691

@@ -143,7 +148,13 @@ impl SpyImpl {
143148
// Keep polling for the child to exit in the background even if `wait_handle` is not awaited,
144149
// because we need to stop the supervisor and lock the channel as soon as the child exits.
145150
wait_handle: tokio::spawn(async move {
146-
let status = child.wait().await?;
151+
let status = tokio::select! {
152+
status = child.wait() => status?,
153+
() = cancellation_token.cancelled() => {
154+
child.start_kill()?;
155+
child.wait().await?
156+
}
157+
};
147158

148159
let arenas = std::iter::once(exec_resolve_accesses);
149160
// Stop the supervisor and collect path accesses from it.

crates/fspy/src/windows/mod.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use fspy_shared::{
1313
windows::{PAYLOAD_ID, Payload},
1414
};
1515
use futures_util::FutureExt;
16+
use tokio_util::sync::CancellationToken;
1617
use winapi::{
1718
shared::minwindef::TRUE,
1819
um::{processthreadsapi::ResumeThread, winbase::CREATE_SUSPENDED},
@@ -73,7 +74,11 @@ impl SpyImpl {
7374
}
7475

7576
#[expect(clippy::unused_async, reason = "async signature required by SpyImpl trait")]
76-
pub(crate) async fn spawn(&self, mut command: Command) -> Result<TrackedChild, SpawnError> {
77+
pub(crate) async fn spawn(
78+
&self,
79+
mut command: Command,
80+
cancellation_token: CancellationToken,
81+
) -> Result<TrackedChild, SpawnError> {
7782
let ansi_dll_path_with_nul = Arc::clone(&self.ansi_dll_path_with_nul);
7883
command.env("FSPY", "1");
7984
let mut command = command.into_tokio_command();
@@ -135,14 +140,32 @@ impl SpyImpl {
135140
if *spawn_success { SpawnError::OsSpawn(err) } else { SpawnError::Injection(err) }
136141
})?;
137142

143+
// Duplicate the process handle before the child is moved into the background
144+
// task. The duplicate is independently owned (its own ref count), so it stays
145+
// valid even after tokio closes its copy when the process exits.
146+
let process_handle = {
147+
use std::os::windows::io::BorrowedHandle;
148+
// SAFETY: The child was just spawned and hasn't been moved yet, so its
149+
// raw handle is valid. `borrow_raw` creates a temporary borrow.
150+
let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) };
151+
borrowed.try_clone_to_owned().map_err(SpawnError::OsSpawn)?
152+
};
153+
138154
Ok(TrackedChild {
139155
stdin: child.stdin.take(),
140156
stdout: child.stdout.take(),
141157
stderr: child.stderr.take(),
158+
process_handle,
142159
// Keep polling for the child to exit in the background even if `wait_handle` is not awaited,
143160
// because we need to stop the supervisor and lock the channel as soon as the child exits.
144161
wait_handle: tokio::spawn(async move {
145-
let status = child.wait().await?;
162+
let status = tokio::select! {
163+
status = child.wait() => status?,
164+
() = cancellation_token.cancelled() => {
165+
child.start_kill()?;
166+
child.wait().await?
167+
}
168+
};
146169
// Lock the ipc channel after the child has exited.
147170
// We are not interested in path accesses from descendants after the main child has exited.
148171
let ipc_receiver_lock_guard = OwnedReceiverLockGuard::lock_async(receiver).await?;

crates/fspy/tests/cancellation.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::process::Stdio;
2+
3+
use tokio::io::AsyncReadExt as _;
4+
use tokio_util::sync::CancellationToken;
5+
6+
#[test_log::test(tokio::test)]
7+
async fn cancellation_kills_tracked_child() -> anyhow::Result<()> {
8+
let cmd = subprocess_test::command_for_fn!((), |()| {
9+
use std::io::Write as _;
10+
// Signal readiness via stdout
11+
std::io::stdout().write_all(b"ready\n").unwrap();
12+
std::io::stdout().flush().unwrap();
13+
// Block on stdin — will be killed by cancellation
14+
let _ = std::io::stdin().read_line(&mut String::new());
15+
});
16+
let token = CancellationToken::new();
17+
let mut fspy_cmd = fspy::Command::from(cmd);
18+
fspy_cmd.stdout(Stdio::piped()).stdin(Stdio::piped());
19+
let mut child = fspy_cmd.spawn(token.clone()).await?;
20+
21+
// Wait for child to signal readiness
22+
let mut stdout = child.stdout.take().unwrap();
23+
let mut buf = vec![0u8; 64];
24+
let n = stdout.read(&mut buf).await?;
25+
assert!(std::str::from_utf8(&buf[..n])?.contains("ready"));
26+
27+
// Cancel — fspy background task calls start_kill
28+
token.cancel();
29+
let termination = child.wait_handle.await?;
30+
assert!(!termination.status.success());
31+
Ok(())
32+
}

crates/fspy/tests/node_fs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ async fn track_node_script(script: &str, args: &[&OsStr]) -> anyhow::Result<Path
1616
.envs(vars_os()) // https://github.com/jdx/mise/discussions/5968
1717
.arg(script)
1818
.args(args);
19-
let child = command.spawn().await?;
19+
let child = command.spawn(tokio_util::sync::CancellationToken::new()).await?;
2020
let termination = child.wait_handle.await?;
2121
assert!(termination.status.success());
2222
Ok(termination.path_accesses)

crates/fspy/tests/oxlint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async fn track_oxlint(dir: &std::path::Path, args: &[&str]) -> anyhow::Result<Pa
4545
.env("PATH", new_path)
4646
.current_dir(dir);
4747

48-
let child = command.spawn().await?;
48+
let child = command.spawn(tokio_util::sync::CancellationToken::new()).await?;
4949
let termination = child.wait_handle.await?;
5050
// oxlint may return non-zero if it finds lint errors, that's OK
5151
Ok(termination.path_accesses)

0 commit comments

Comments
 (0)