Skip to content

Commit 4570ddf

Browse files
authored
feat: improve stdio handling with async writers (#160)
## Summary Extends the existing "stdin suggestion" system to a full "stdio" system. Previously, only stdin was conditionally inherited vs null. Now, all three stdio FDs (stdin, stdout, stderr) are conditionally inherited vs piped, and the reporter provides `Box<dyn AsyncWrite + Unpin>` streams for receiving child process output. ## Changes ### New types and API - **`StdioSuggestion`** replaces `StdinSuggestion` — two modes: `Piped` (stdin=null, stdout/stderr piped into AsyncWrite streams) and `Inherited` (all three inherited from parent) - **`StdioConfig`** returned by `LeafExecutionReporter::start()` — contains the suggestion plus `stdout_writer` and `stderr_writer` async streams - Removed `stdin_suggestion()` and `output(kind, content)` from `LeafExecutionReporter` trait - Removed `OutputKind` from `event.rs` (only `spawn.rs` keeps its own `OutputKind`) ### Reporter changes - Removed the `W` generic from all reporters (`PlainReporter`, `LabeledReporterBuilder`, `LabeledGraphReporter`, `LabeledLeafReporter`, `SharedReporterState`) - Display output (command lines, summaries, errors) written directly to `std::io::stdout()` using sync writes - `tokio::io::stdout()` / `tokio::io::stderr()` provided as the `AsyncWrite` streams in `StdioConfig` - `PlainReporter` returns `tokio::io::sink()` writers when `silent_if_cache_hit && is_cache_hit` ### Execution flow - `spawn_with_tracking()` — stdin is always `Stdio::null()`, takes `&mut dyn AsyncWrite` for stdout/stderr instead of an `on_output` callback - New `spawn_inherited()` — inherits all three stdio FDs, no fspy tracking, just `cmd.spawn()` + `wait()` - Decision matrix: inherited mode only when suggestion is `Inherited` AND `cache_metadata.is_none()` (caching fully disabled) - Cache replay writes directly to `StdioConfig` writers - In-process execution writes to `stdout_writer` from `StdioConfig` ### Test updates - Updated 8 stdio suggestion unit tests to use `start()` + assert `StdioConfig.suggestion` - Updated `task-list` E2E "vp run in script" test: inner `vp run` now sees a terminal (inherited stdout) and shows interactive selector — test uses `expect-milestone` + Enter to select and run a task
1 parent a6c49eb commit 4570ddf

File tree

71 files changed

+712
-343
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+712
-343
lines changed

crates/vite_task/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ serde = { workspace = true, features = ["derive", "rc"] }
3131
serde_json = { workspace = true }
3232
smallvec.workspace = true
3333
thiserror = { workspace = true }
34-
tokio = { workspace = true, features = ["rt-multi-thread", "io-std", "macros", "sync"] }
34+
tokio = { workspace = true, features = ["rt-multi-thread", "io-std", "io-util", "macros", "sync"] }
3535
tracing = { workspace = true }
3636
twox-hash = { workspace = true }
3737
vite_glob = { workspace = true }

crates/vite_task/src/session/event.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ pub enum ExecutionError {
4343
PostRunFingerprint(#[source] anyhow::Error),
4444
}
4545

46-
#[derive(Debug)]
47-
pub enum OutputKind {
48-
Stdout,
49-
Stderr,
50-
}
51-
5246
#[derive(Debug)]
5347
pub enum CacheDisabledReason {
5448
InProcessExecution,

crates/vite_task/src/session/execute/mod.rs

Lines changed: 88 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,25 @@ pub mod spawn;
44
use std::{process::Stdio, sync::Arc};
55

66
use futures_util::FutureExt;
7+
use tokio::io::AsyncWriteExt as _;
78
use vite_path::AbsolutePath;
8-
use vite_task_plan::{ExecutionGraph, ExecutionItemKind, LeafExecutionKind, SpawnExecution};
9+
use vite_task_plan::{
10+
ExecutionGraph, ExecutionItemKind, LeafExecutionKind, SpawnCommand, SpawnExecution,
11+
};
912

1013
use self::{
1114
fingerprint::PostRunFingerprint,
12-
spawn::{OutputKind as SpawnOutputKind, spawn_with_tracking},
15+
spawn::{SpawnResult, spawn_with_tracking},
1316
};
1417
use super::{
1518
cache::{CommandCacheValue, ExecutionCache},
1619
event::{
1720
CacheDisabledReason, CacheErrorKind, CacheNotUpdatedReason, CacheStatus, CacheUpdateStatus,
18-
ExecutionError, OutputKind,
21+
ExecutionError,
1922
},
2023
reporter::{
2124
ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionPath,
22-
LeafExecutionReporter, StdinSuggestion,
25+
LeafExecutionReporter, StdioSuggestion,
2326
},
2427
};
2528
use crate::{Session, session::execute::spawn::SpawnTrackResult};
@@ -116,10 +119,13 @@ impl ExecutionContext<'_> {
116119
match leaf_execution_kind {
117120
LeafExecutionKind::InProcess(in_process_execution) => {
118121
// In-process (built-in) commands: caching is disabled, execute synchronously
119-
leaf_reporter.start(CacheStatus::Disabled(CacheDisabledReason::InProcessExecution));
122+
let mut stdio_config = leaf_reporter
123+
.start(CacheStatus::Disabled(CacheDisabledReason::InProcessExecution));
120124

121125
let execution_output = in_process_execution.execute();
122-
leaf_reporter.output(OutputKind::Stdout, execution_output.stdout.into());
126+
// Write output to the stdout writer from StdioConfig
127+
let _ = stdio_config.stdout_writer.write_all(&execution_output.stdout).await;
128+
let _ = stdio_config.stdout_writer.flush().await;
123129

124130
leaf_reporter.finish(
125131
None,
@@ -147,9 +153,10 @@ impl ExecutionContext<'_> {
147153
///
148154
/// The full lifecycle is:
149155
/// 1. Cache lookup (determines cache status)
150-
/// 2. `leaf_reporter.start(cache_status)`
151-
/// 3. If cache hit: replay cached outputs → finish
152-
/// 4. If cache miss/disabled: spawn process → stream output → update cache → finish
156+
/// 2. `leaf_reporter.start(cache_status)` → `StdioConfig`
157+
/// 3. If cache hit: replay cached outputs via `StdioConfig` writers → finish
158+
/// 4. If `Inherited` suggestion AND caching disabled: `spawn_inherited()` → finish
159+
/// 5. Else (piped): `spawn_with_tracking()` with writers → cache update → finish
153160
///
154161
/// Errors (cache lookup failure, spawn failure, cache update failure) are reported
155162
/// through `leaf_reporter.finish()` and do not abort the caller.
@@ -197,20 +204,20 @@ pub async fn execute_spawn(
197204
(CacheStatus::Disabled(CacheDisabledReason::NoCacheMetadata), None)
198205
};
199206

200-
// 2. Report execution start with the determined cache status
201-
leaf_reporter.start(cache_status);
207+
// 2. Report execution start with the determined cache status.
208+
// Returns StdioConfig with the reporter's suggestion and async writers.
209+
let mut stdio_config = leaf_reporter.start(cache_status);
202210

203-
// 3. If cache hit, replay outputs and finish early.
211+
// 3. If cache hit, replay outputs via the StdioConfig writers and finish early.
204212
// No need to actually execute the command — just replay what was cached.
205213
if let Some(cached) = cached_value {
206214
for output in cached.std_outputs.iter() {
207-
leaf_reporter.output(
208-
match output.kind {
209-
SpawnOutputKind::StdOut => OutputKind::Stdout,
210-
SpawnOutputKind::StdErr => OutputKind::Stderr,
211-
},
212-
output.content.clone().into(),
213-
);
215+
let writer: &mut (dyn tokio::io::AsyncWrite + Unpin) = match output.kind {
216+
spawn::OutputKind::StdOut => &mut stdio_config.stdout_writer,
217+
spawn::OutputKind::StdErr => &mut stdio_config.stderr_writer,
218+
};
219+
let _ = writer.write_all(&output.content).await;
220+
let _ = writer.flush().await;
214221
}
215222
leaf_reporter.finish(
216223
None,
@@ -220,41 +227,53 @@ pub async fn execute_spawn(
220227
return SpawnOutcome::CacheHit;
221228
}
222229

223-
// 4. Execute spawn (cache miss or disabled).
224-
// Track file system access if caching is enabled (for future cache updates).
230+
// 4. Determine actual stdio mode based on the suggestion AND cache state.
231+
// Inherited stdio is only used when the reporter suggests it AND caching is
232+
// completely disabled (no cache_metadata). If caching is enabled but missed,
233+
// we still need piped mode to capture output for the cache update.
234+
let use_inherited =
235+
stdio_config.suggestion == StdioSuggestion::Inherited && cache_metadata.is_none();
236+
237+
if use_inherited {
238+
// Inherited mode: all three stdio FDs (stdin, stdout, stderr) are inherited
239+
// from the parent process. No fspy tracking, no output capture.
240+
// Drop the StdioConfig writers before spawning to avoid holding tokio::io::Stdout
241+
// while the child also writes to the same FD.
242+
drop(stdio_config);
243+
244+
match spawn_inherited(&spawn_execution.spawn_command).await {
245+
Ok(result) => {
246+
leaf_reporter.finish(
247+
Some(result.exit_status),
248+
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
249+
None,
250+
);
251+
return SpawnOutcome::Spawned(result.exit_status);
252+
}
253+
Err(err) => {
254+
leaf_reporter.finish(
255+
None,
256+
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
257+
Some(ExecutionError::Spawn(err)),
258+
);
259+
return SpawnOutcome::Failed;
260+
}
261+
}
262+
}
263+
264+
// 5. Piped mode: execute spawn with tracking, streaming output to writers.
225265
let mut track_result_with_cache_metadata =
226266
cache_metadata.map(|cache_metadata| (SpawnTrackResult::default(), cache_metadata));
227267

228-
// Determine the child process's stdin mode based on:
229-
// - The reporter's suggestion (inherited only when appropriate, e.g., single task)
230-
// - Whether caching is disabled (inherited stdin would make output non-deterministic,
231-
// breaking cache semantics)
232-
let stdin = if leaf_reporter.stdin_suggestion() == StdinSuggestion::Inherited
233-
&& cache_metadata.is_none()
234-
{
235-
Stdio::inherit()
236-
} else {
237-
Stdio::null()
238-
};
239-
240-
// Execute command with tracking, streaming output in real-time via the reporter
241268
#[expect(
242269
clippy::large_futures,
243270
reason = "spawn_with_tracking manages process I/O and creates a large future"
244271
)]
245272
let result = match spawn_with_tracking(
246273
&spawn_execution.spawn_command,
247274
cache_base_path,
248-
stdin,
249-
|kind, content| {
250-
leaf_reporter.output(
251-
match kind {
252-
SpawnOutputKind::StdOut => OutputKind::Stdout,
253-
SpawnOutputKind::StdErr => OutputKind::Stderr,
254-
},
255-
content,
256-
);
257-
},
275+
&mut stdio_config.stdout_writer,
276+
&mut stdio_config.stderr_writer,
258277
track_result_with_cache_metadata.as_mut().map(|(track_result, _)| track_result),
259278
)
260279
.await
@@ -270,7 +289,7 @@ pub async fn execute_spawn(
270289
}
271290
};
272291

273-
// 5. Update cache if successful and determine cache update status.
292+
// 6. Update cache if successful and determine cache update status.
274293
// Errors during cache update are terminal (reported through finish).
275294
let (cache_update_status, cache_error) = if let Some((track_result, cache_metadata)) =
276295
track_result_with_cache_metadata
@@ -315,14 +334,37 @@ pub async fn execute_spawn(
315334
(CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), None)
316335
};
317336

318-
// 6. Finish the leaf execution with the result and optional cache error.
337+
// 7. Finish the leaf execution with the result and optional cache error.
319338
// Cache update/fingerprint failures are reported but do not affect the outcome —
320339
// the process ran, so we return its actual exit status.
321340
leaf_reporter.finish(Some(result.exit_status), cache_update_status, cache_error);
322341

323342
SpawnOutcome::Spawned(result.exit_status)
324343
}
325344

345+
/// Spawn a command with all three stdio file descriptors inherited from the parent.
346+
///
347+
/// Used when the reporter suggests inherited stdio AND caching is disabled.
348+
/// All three FDs (stdin, stdout, stderr) are inherited, allowing interactive input
349+
/// and direct terminal output. No fspy tracking is performed since there's no
350+
/// cache to update.
351+
///
352+
/// The child process will see `is_terminal() == true` for stdout/stderr when the
353+
/// parent is running in a terminal. This is expected behavior.
354+
async fn spawn_inherited(spawn_command: &SpawnCommand) -> anyhow::Result<SpawnResult> {
355+
let mut cmd = fspy::Command::new(spawn_command.program_path.as_path());
356+
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
357+
cmd.envs(spawn_command.all_envs.iter());
358+
cmd.current_dir(&*spawn_command.cwd);
359+
cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit()).stderr(Stdio::inherit());
360+
361+
let start = std::time::Instant::now();
362+
let mut child = cmd.into_tokio_command().spawn()?;
363+
let exit_status = child.wait().await?;
364+
365+
Ok(SpawnResult { exit_status, duration: start.elapsed() })
366+
}
367+
326368
impl Session<'_> {
327369
/// Execute an execution graph, reporting events through the provided reporter builder.
328370
///

crates/vite_task/src/session/execute/spawn.rs

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use std::{
77
};
88

99
use bincode::{Decode, Encode};
10-
use bstr::BString;
1110
use fspy::AccessMode;
1211
use rustc_hash::FxHashSet;
1312
use serde::Serialize;
14-
use tokio::io::AsyncReadExt as _;
13+
use tokio::io::{AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
1514
use vite_path::{AbsolutePath, RelativePathBuf};
1615
use vite_task_plan::SpawnCommand;
1716

@@ -57,28 +56,26 @@ pub struct SpawnTrackResult {
5756
pub path_writes: FxHashSet<RelativePathBuf>,
5857
}
5958

60-
/// Spawn a command with file system tracking via fspy.
59+
/// Spawn a command with file system tracking via fspy, using piped stdio.
6160
///
6261
/// Returns the execution result including captured outputs, exit status,
6362
/// and tracked file accesses.
6463
///
65-
/// - `stdin` controls the child process's stdin (typically `Stdio::null()` or `Stdio::inherit()`).
66-
/// - `on_output` is called in real-time as stdout/stderr data arrives.
64+
/// - stdin is always `/dev/null` (piped mode is for non-interactive execution).
65+
/// - `stdout_writer`/`stderr_writer` receive the child's stdout/stderr output in real-time.
6766
/// - `track_result` if provided, will be populated with captured outputs and path accesses for caching. If `None`, tracking is disabled.
67+
#[expect(clippy::future_not_send, reason = "uses !Send dyn AsyncWrite writers internally")]
6868
#[expect(
6969
clippy::too_many_lines,
7070
reason = "spawn logic is inherently sequential and splitting would reduce clarity"
7171
)]
72-
pub async fn spawn_with_tracking<F>(
72+
pub async fn spawn_with_tracking(
7373
spawn_command: &SpawnCommand,
7474
workspace_root: &AbsolutePath,
75-
stdin: Stdio,
76-
mut on_output: F,
75+
stdout_writer: &mut (dyn AsyncWrite + Unpin),
76+
stderr_writer: &mut (dyn AsyncWrite + Unpin),
7777
track_result: Option<&mut SpawnTrackResult>,
78-
) -> anyhow::Result<SpawnResult>
79-
where
80-
F: FnMut(OutputKind, BString),
81-
{
78+
) -> anyhow::Result<SpawnResult> {
8279
/// The tracking state of the spawned process
8380
enum TrackingState<'a> {
8481
/// Tacking is enabled, with the tracked child and result reference
@@ -92,7 +89,7 @@ where
9289
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
9390
cmd.envs(spawn_command.all_envs.iter());
9491
cmd.current_dir(&*spawn_command.cwd);
95-
cmd.stdin(stdin).stdout(Stdio::piped()).stderr(Stdio::piped());
92+
cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped());
9693

9794
let mut tracking_state = if let Some(track_result) = track_result {
9895
// track_result is Some. Spawn with tracking enabled
@@ -122,37 +119,49 @@ where
122119

123120
let start = Instant::now();
124121

125-
// Helper closure to process output chunks
126-
let mut process_output = |kind: OutputKind, content: Vec<u8>| {
127-
// Emit event immediately
128-
on_output(kind, content.clone().into());
129-
130-
// Store outputs for caching
131-
if let Some(outputs) = &mut outputs {
132-
// Merge consecutive outputs of the same kind for caching
133-
if let Some(last) = outputs.last_mut()
134-
&& last.kind == kind
135-
{
136-
last.content.extend(&content);
137-
} else {
138-
outputs.push(StdOutput { kind, content });
139-
}
140-
}
141-
};
142-
143122
// Read from both stdout and stderr concurrently using select!
144123
loop {
145124
tokio::select! {
146125
result = child_stdout.read(&mut stdout_buf), if !stdout_done => {
147126
match result? {
148127
0 => stdout_done = true,
149-
n => process_output(OutputKind::StdOut, stdout_buf[..n].to_vec()),
128+
n => {
129+
let content = stdout_buf[..n].to_vec();
130+
// Write to the async writer immediately
131+
stdout_writer.write_all(&content).await?;
132+
stdout_writer.flush().await?;
133+
// Store outputs for caching
134+
if let Some(outputs) = &mut outputs {
135+
if let Some(last) = outputs.last_mut()
136+
&& last.kind == OutputKind::StdOut
137+
{
138+
last.content.extend(&content);
139+
} else {
140+
outputs.push(StdOutput { kind: OutputKind::StdOut, content });
141+
}
142+
}
143+
}
150144
}
151145
}
152146
result = child_stderr.read(&mut stderr_buf), if !stderr_done => {
153147
match result? {
154148
0 => stderr_done = true,
155-
n => process_output(OutputKind::StdErr, stderr_buf[..n].to_vec()),
149+
n => {
150+
let content = stderr_buf[..n].to_vec();
151+
// Write to the async writer immediately
152+
stderr_writer.write_all(&content).await?;
153+
stderr_writer.flush().await?;
154+
// Store outputs for caching
155+
if let Some(outputs) = &mut outputs {
156+
if let Some(last) = outputs.last_mut()
157+
&& last.kind == OutputKind::StdErr
158+
{
159+
last.content.extend(&content);
160+
} else {
161+
outputs.push(StdOutput { kind: OutputKind::StdErr, content });
162+
}
163+
}
164+
}
156165
}
157166
}
158167
else => break,

crates/vite_task/src/session/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,7 @@ impl<'a> Session<'a> {
246246
.await
247247
}
248248
Ok(graph) => {
249-
let builder =
250-
LabeledReporterBuilder::new(std::io::stdout(), self.workspace_path());
249+
let builder = LabeledReporterBuilder::new(self.workspace_path());
251250
Ok(self
252251
.execute_graph(graph, Box::new(builder))
253252
.await
@@ -392,7 +391,7 @@ impl<'a> Session<'a> {
392391

393392
let cwd = Arc::clone(&self.cwd);
394393
let graph = self.plan_from_cli_run(cwd, run_command).await?;
395-
let builder = LabeledReporterBuilder::new(std::io::stdout(), self.workspace_path());
394+
let builder = LabeledReporterBuilder::new(self.workspace_path());
396395
Ok(self.execute_graph(graph, Box::new(builder)).await.err().unwrap_or(ExitStatus::SUCCESS))
397396
}
398397

@@ -468,7 +467,7 @@ impl<'a> Session<'a> {
468467
let cache = self.cache()?;
469468

470469
// Create a plain (standalone) reporter — no graph awareness, no summary
471-
let plain_reporter = reporter::PlainReporter::new(std::io::stdout(), silent_if_cache_hit);
470+
let plain_reporter = reporter::PlainReporter::new(silent_if_cache_hit);
472471

473472
// Execute the spawn directly using the free function, bypassing the graph pipeline
474473
match execute::execute_spawn(

0 commit comments

Comments
 (0)