Skip to content

Commit 1ef4e2f

Browse files
branchseerclaude
andauthored
feat: --concurrency-limit and --parallel (#309)
## Summary - Add `--concurrency-limit` flag to control the number of tasks running at the same time. - Add `--parallel` flag to ignore task dependencies and run all tasks at once with unlimited concurrency (unless `--concurrency-limit` is also specified) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cf2a748 commit 1ef4e2f

File tree

124 files changed

+5439
-3355
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

124 files changed

+5439
-3355
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
- **Fixed** Ctrl-C now prevents future tasks from being scheduled and prevents caching of in-flight task results ([#309](https://github.com/voidzero-dev/vite-task/pull/309))
4+
- **Added** `--concurrency-limit` flag to limit the number of tasks running at the same time (defaults to 4) ([#288](https://github.com/voidzero-dev/vite-task/pull/288), [#309](https://github.com/voidzero-dev/vite-task/pull/309))
5+
- **Added** `--parallel` flag to ignore task dependencies and run all tasks at once with unlimited concurrency (unless `--concurrency-limit` is also specified) ([#309](https://github.com/voidzero-dev/vite-task/pull/309))
36
- **Added** object form for `input` entries: `{ "pattern": "...", "base": "workspace" | "package" }` to resolve glob patterns relative to the workspace root instead of the package directory ([#295](https://github.com/voidzero-dev/vite-task/pull/295))
47
- **Fixed** arguments after the task name being consumed by `vp` instead of passed through to the task ([#286](https://github.com/voidzero-dev/vite-task/pull/286), [#290](https://github.com/voidzero-dev/vite-task/pull/290))
58
- **Changed** default untracked env patterns to align with Turborepo, covering more CI and platform-specific variables ([#262](https://github.com/voidzero-dev/vite-task/pull/262))

CONTRIBUTING.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ just lint # Clippy linting
3636
just doc # Generate documentation
3737
```
3838

39-
### Running Specific Tests
39+
## Testing
40+
41+
### Running Tests
4042

4143
```bash
4244
cargo test # All tests
@@ -55,6 +57,17 @@ Integration tests (e2e, plan, fspy) require `pnpm install` in `packages/tools` f
5557

5658
See individual crate READMEs for crate-specific testing details.
5759

60+
### Playground
61+
62+
The `playground/` directory is a small workspace for manually testing the task runner. It has three packages (`app → lib → utils`) with cached tasks (`build`, `test`, `lint`, `typecheck`) and an uncached `dev` script.
63+
64+
```bash
65+
cargo run --bin vt -- run -r build # run build across all packages
66+
cargo run --bin vt -- run -r --parallel dev # start all dev scripts in parallel
67+
```
68+
69+
See `playground/README.md` for the full task list and dependency structure.
70+
5871
## Cross-Platform Development
5972

6073
This project must work on macOS, Linux, and Windows. Skipping tests on any platform is not acceptable.

crates/vite_task/src/cli/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ pub struct RunFlags {
5151
/// How task output is displayed.
5252
#[clap(long, default_value = "interleaved")]
5353
pub log: LogMode,
54+
55+
/// Maximum number of tasks to run concurrently. Defaults to 4.
56+
#[clap(long)]
57+
pub concurrency_limit: Option<usize>,
58+
59+
/// Run tasks without dependency ordering. Sets concurrency to unlimited
60+
/// unless `--concurrency-limit` is also specified.
61+
#[clap(long, default_value = "false")]
62+
pub parallel: bool,
5463
}
5564

5665
impl RunFlags {
@@ -206,6 +215,8 @@ impl ResolvedRunCommand {
206215

207216
let cache_override = self.flags.cache_override();
208217
let include_explicit_deps = !self.flags.ignore_depends_on;
218+
let concurrency_limit = self.flags.concurrency_limit.map(|n| n.max(1));
219+
let parallel = self.flags.parallel;
209220

210221
let (package_query, is_cwd_only) =
211222
self.flags.package_query.into_package_query(task_specifier.package_name, cwd)?;
@@ -220,6 +231,8 @@ impl ResolvedRunCommand {
220231
plan_options: PlanOptions {
221232
extra_args: self.additional_args.into(),
222233
cache_override,
234+
concurrency_limit,
235+
parallel,
223236
},
224237
},
225238
is_cwd_only,

crates/vite_task/src/session/event.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ pub enum CacheNotUpdatedReason {
5959
CacheDisabled,
6060
/// Execution exited with non-zero status
6161
NonZeroExitStatus,
62+
/// Execution was cancelled before the result could be trusted.
63+
/// Two possible causes:
64+
/// - Ctrl-C: the user interrupted execution; the task may have
65+
/// exited successfully but without completing its intended work.
66+
/// - Fast-fail: a sibling task failed, triggering cancellation
67+
/// while this task was still running.
68+
Cancelled,
6269
/// Task modified files it read during execution (read-write overlap detected by fspy).
6370
/// Caching such tasks is unsound because the prerun input hashes become stale.
6471
InputModified {

crates/vite_task/src/session/execute/mod.rs

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ pub enum SpawnOutcome {
5050
Failed,
5151
}
5252

53-
/// Maximum number of tasks that can execute concurrently within a single
54-
/// execution graph level.
55-
const CONCURRENCY_LIMIT: usize = 10;
56-
5753
/// Holds shared references needed during graph execution.
5854
///
5955
/// The `reporter` field is wrapped in `RefCell` because concurrent futures
@@ -71,34 +67,48 @@ struct ExecutionContext<'a> {
7167
/// Base path for resolving relative paths in cache entries.
7268
/// Typically the workspace root.
7369
cache_base_path: &'a Arc<AbsolutePath>,
74-
/// Token for cancelling in-flight child processes.
75-
cancellation_token: CancellationToken,
70+
/// Token cancelled when a task fails. Kills in-flight child processes
71+
/// (via `start_kill` in spawn.rs), prevents scheduling new tasks, and
72+
/// prevents caching results of concurrently-running tasks.
73+
fast_fail_token: CancellationToken,
74+
/// Token cancelled by Ctrl-C. Unlike `fast_fail_token` (which kills
75+
/// children), this only prevents scheduling new tasks and caching
76+
/// results — running processes are left to handle SIGINT naturally.
77+
interrupt_token: CancellationToken,
7678
}
7779

7880
impl ExecutionContext<'_> {
81+
/// Returns true if execution has been cancelled, either by a task
82+
/// failure (fast-fail) or by Ctrl-C (interrupt).
83+
fn cancelled(&self) -> bool {
84+
self.fast_fail_token.is_cancelled() || self.interrupt_token.is_cancelled()
85+
}
86+
7987
/// Execute all tasks in an execution graph concurrently, respecting dependencies.
8088
///
8189
/// Uses a DAG scheduler: tasks whose dependencies have all completed are scheduled
8290
/// onto a `FuturesUnordered`, bounded by a per-graph `Semaphore` with
83-
/// [`CONCURRENCY_LIMIT`] permits. Each recursive `Expanded` graph creates its own
91+
/// `concurrency_limit` permits. Each recursive `Expanded` graph creates its own
8492
/// semaphore, so nested graphs have independent concurrency limits.
8593
///
86-
/// Fast-fail: if any task fails, `execute_leaf` cancels the `CancellationToken`
87-
/// (killing in-flight child processes). This method detects the cancellation,
88-
/// closes the semaphore, drains remaining futures, and returns.
94+
/// Fast-fail: if any task fails, `execute_leaf` cancels the `fast_fail_token`
95+
/// (killing in-flight child processes). Ctrl-C cancels the `interrupt_token`.
96+
/// Either cancellation causes this method to close the semaphore, drain
97+
/// remaining futures, and return.
8998
#[tracing::instrument(level = "debug", skip_all)]
9099
async fn execute_expanded_graph(&self, graph: &ExecutionGraph) {
91-
if graph.node_count() == 0 {
100+
if graph.graph.node_count() == 0 {
92101
return;
93102
}
94103

95-
let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));
104+
let semaphore =
105+
Arc::new(Semaphore::new(graph.concurrency_limit.min(Semaphore::MAX_PERMITS)));
96106

97107
// Compute dependency count for each node.
98108
// Edge A→B means "A depends on B", so A's dependency count = outgoing edge count.
99109
let mut dep_count: FxHashMap<ExecutionNodeIndex, usize> = FxHashMap::default();
100-
for node_ix in graph.node_indices() {
101-
dep_count.insert(node_ix, graph.neighbors(node_ix).count());
110+
for node_ix in graph.graph.node_indices() {
111+
dep_count.insert(node_ix, graph.graph.neighbors(node_ix).count());
102112
}
103113

104114
let mut futures = FuturesUnordered::new();
@@ -114,7 +124,7 @@ impl ExecutionContext<'_> {
114124
// On failure, `execute_leaf` cancels the token — we detect it here, close
115125
// the semaphore (so pending acquires fail immediately), and drain.
116126
while let Some(completed_ix) = futures.next().await {
117-
if self.cancellation_token.is_cancelled() {
127+
if self.cancelled() {
118128
semaphore.close();
119129
while futures.next().await.is_some() {}
120130
return;
@@ -123,7 +133,7 @@ impl ExecutionContext<'_> {
123133
// Find dependents of the completed node (nodes that depend on it).
124134
// Edge X→completed means "X depends on completed", so X is a predecessor
125135
// in graph direction = neighbor in Incoming direction.
126-
for dependent in graph.neighbors_directed(completed_ix, Direction::Incoming) {
136+
for dependent in graph.graph.neighbors_directed(completed_ix, Direction::Incoming) {
127137
let count = dep_count.get_mut(&dependent).expect("all nodes are in dep_count");
128138
*count -= 1;
129139
if *count == 0 {
@@ -135,7 +145,7 @@ impl ExecutionContext<'_> {
135145

136146
/// Create a future that acquires a semaphore permit, then executes a graph node.
137147
///
138-
/// On failure, `execute_node` cancels the `CancellationToken` — the caller
148+
/// On failure, `execute_node` cancels the `fast_fail_token` — the caller
139149
/// detects this after the future completes. On semaphore closure or prior
140150
/// cancellation, the node is skipped.
141151
fn spawn_node<'a>(
@@ -147,7 +157,7 @@ impl ExecutionContext<'_> {
147157
let sem = semaphore.clone();
148158
async move {
149159
if let Ok(_permit) = sem.acquire_owned().await
150-
&& !self.cancellation_token.is_cancelled()
160+
&& !self.cancelled()
151161
{
152162
self.execute_node(graph, node_ix).await;
153163
}
@@ -159,13 +169,13 @@ impl ExecutionContext<'_> {
159169
/// Execute a single node's items sequentially.
160170
///
161171
/// A node may have multiple items (from `&&`-split commands). Items are executed
162-
/// in order; if any item fails, `execute_leaf` cancels the `CancellationToken`
172+
/// in order; if any item fails, `execute_leaf` cancels the `fast_fail_token`
163173
/// and remaining items are skipped (preserving `&&` semantics).
164174
async fn execute_node(&self, graph: &ExecutionGraph, node_ix: ExecutionNodeIndex) {
165-
let task_execution = &graph[node_ix];
175+
let task_execution = &graph.graph[node_ix];
166176

167177
for item in &task_execution.items {
168-
if self.cancellation_token.is_cancelled() {
178+
if self.cancelled() {
169179
return;
170180
}
171181
match &item.kind {
@@ -183,7 +193,7 @@ impl ExecutionContext<'_> {
183193
///
184194
/// Creates a [`LeafExecutionReporter`] from the graph reporter and delegates
185195
/// to the appropriate execution method. On failure (non-zero exit or
186-
/// infrastructure error), cancels the `CancellationToken`.
196+
/// infrastructure error), cancels the `fast_fail_token`.
187197
#[tracing::instrument(level = "debug", skip_all)]
188198
async fn execute_leaf(&self, display: &ExecutionItemDisplay, leaf_kind: &LeafExecutionKind) {
189199
// Borrow the reporter briefly to create the leaf reporter, then drop
@@ -218,7 +228,8 @@ impl ExecutionContext<'_> {
218228
spawn_execution,
219229
self.cache,
220230
self.cache_base_path,
221-
self.cancellation_token.clone(),
231+
self.fast_fail_token.clone(),
232+
self.interrupt_token.clone(),
222233
)
223234
.await;
224235
match outcome {
@@ -229,7 +240,7 @@ impl ExecutionContext<'_> {
229240
}
230241
};
231242
if failed {
232-
self.cancellation_token.cancel();
243+
self.fast_fail_token.cancel();
233244
}
234245
}
235246
}
@@ -258,7 +269,8 @@ pub async fn execute_spawn(
258269
spawn_execution: &SpawnExecution,
259270
cache: &ExecutionCache,
260271
cache_base_path: &Arc<AbsolutePath>,
261-
cancellation_token: CancellationToken,
272+
fast_fail_token: CancellationToken,
273+
interrupt_token: CancellationToken,
262274
) -> SpawnOutcome {
263275
let cache_metadata = spawn_execution.cache_metadata.as_ref();
264276

@@ -351,7 +363,7 @@ pub async fn execute_spawn(
351363
// while the child also writes to the same FD.
352364
drop(stdio_config);
353365

354-
match spawn_inherited(&spawn_execution.spawn_command, cancellation_token).await {
366+
match spawn_inherited(&spawn_execution.spawn_command, fast_fail_token).await {
355367
Ok(result) => {
356368
leaf_reporter.finish(
357369
Some(result.exit_status),
@@ -422,7 +434,7 @@ pub async fn execute_spawn(
422434
std_outputs.as_mut(),
423435
path_accesses.as_mut(),
424436
&resolved_negatives,
425-
cancellation_token,
437+
fast_fail_token.clone(),
426438
)
427439
.await
428440
{
@@ -442,7 +454,11 @@ pub async fn execute_spawn(
442454
let (cache_update_status, cache_error) = if let Some((cache_metadata, globbed_inputs)) =
443455
cache_metadata_and_inputs
444456
{
445-
if result.exit_status.success() {
457+
let cancelled = fast_fail_token.is_cancelled() || interrupt_token.is_cancelled();
458+
if cancelled {
459+
// Cancelled (Ctrl-C or sibling failure) — result is untrustworthy
460+
(CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::Cancelled), None)
461+
} else if result.exit_status.success() {
446462
// Check for read-write overlap: if the task wrote to any file it also
447463
// read, the inputs were modified during execution — don't cache.
448464
// Note: this only checks fspy-inferred reads, not globbed_inputs keys.
@@ -522,7 +538,7 @@ pub async fn execute_spawn(
522538
#[tracing::instrument(level = "debug", skip_all)]
523539
async fn spawn_inherited(
524540
spawn_command: &SpawnCommand,
525-
cancellation_token: CancellationToken,
541+
fast_fail_token: CancellationToken,
526542
) -> anyhow::Result<SpawnResult> {
527543
let mut cmd = fspy::Command::new(spawn_command.program_path.as_path());
528544
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
@@ -582,7 +598,7 @@ async fn spawn_inherited(
582598

583599
let exit_status = tokio::select! {
584600
status = child.wait() => status?,
585-
() = cancellation_token.cancelled() => {
601+
() = fast_fail_token.cancelled() => {
586602
child.start_kill()?;
587603
child.wait().await?
588604
}
@@ -697,6 +713,7 @@ impl Session<'_> {
697713
&self,
698714
execution_graph: ExecutionGraph,
699715
builder: Box<dyn GraphExecutionReporterBuilder>,
716+
interrupt_token: CancellationToken,
700717
) -> Result<(), ExitStatus> {
701718
// Initialize cache before building the reporter. Cache errors are reported
702719
// directly to stderr and cause an early exit, keeping the reporter flow clean
@@ -716,7 +733,8 @@ impl Session<'_> {
716733
reporter: &reporter,
717734
cache,
718735
cache_base_path: &self.workspace_path,
719-
cancellation_token: CancellationToken::new(),
736+
fast_fail_token: CancellationToken::new(),
737+
interrupt_token,
720738
};
721739

722740
// Execute the graph with fast-fail: if any task fails, remaining tasks

crates/vite_task/src/session/execute/spawn.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub async fn spawn_with_tracking(
9292
std_outputs: Option<&mut Vec<StdOutput>>,
9393
path_accesses: Option<&mut TrackedPathAccesses>,
9494
resolved_negatives: &[wax::Glob<'static>],
95-
cancellation_token: CancellationToken,
95+
fast_fail_token: CancellationToken,
9696
) -> anyhow::Result<SpawnResult> {
9797
let mut cmd = fspy::Command::new(spawn_command.program_path.as_path());
9898
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
@@ -108,7 +108,7 @@ pub async fn spawn_with_tracking(
108108
let (mut child_stdout, mut child_stderr, mut child_wait) = if path_accesses.is_some() {
109109
// fspy tracking enabled — fspy manages cancellation internally via a clone
110110
// of the token. We keep the original for the pipe read loop.
111-
let mut tracked_child = cmd.spawn(cancellation_token.clone()).await?;
111+
let mut tracked_child = cmd.spawn(fast_fail_token.clone()).await?;
112112
let stdout = tracked_child.stdout.take().unwrap();
113113
let stderr = tracked_child.stderr.take().unwrap();
114114
#[cfg(windows)]
@@ -193,7 +193,7 @@ pub async fn spawn_with_tracking(
193193
}
194194
}
195195
}
196-
() = cancellation_token.cancelled() => {
196+
() = fast_fail_token.cancelled() => {
197197
// Kill the direct child (no-op for fspy which handles it internally).
198198
if let ChildWait::Tokio(ref mut child) = child_wait {
199199
let _ = child.start_kill();
@@ -291,7 +291,7 @@ pub async fn spawn_with_tracking(
291291
ChildWait::Tokio(mut child) => {
292292
let exit_status = tokio::select! {
293293
status = child.wait() => status?,
294-
() = cancellation_token.cancelled() => {
294+
() = fast_fail_token.cancelled() => {
295295
child.start_kill()?;
296296
child.wait().await?
297297
}

0 commit comments

Comments
 (0)