-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathmod.rs
More file actions
370 lines (341 loc) · 15.4 KB
/
mod.rs
File metadata and controls
370 lines (341 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
pub mod fingerprint;
pub mod spawn;
use std::sync::Arc;
use futures_util::FutureExt;
use petgraph::{algo::toposort, stable_graph::StableGraph};
use vite_path::AbsolutePath;
use vite_task_plan::{
ExecutionItemKind, ExecutionPlan, LeafExecutionKind, SpawnExecution, TaskExecution,
execution_graph::ExecutionIx,
};
use self::{
fingerprint::PostRunFingerprint,
spawn::{OutputKind as SpawnOutputKind, spawn_with_tracking},
};
use super::{
cache::{CommandCacheValue, ExecutionCache},
event::{
CacheDisabledReason, CacheStatus, ExecutionEvent, ExecutionEventKind, ExecutionId,
ExecutionItemDisplay, OutputKind,
},
reporter::{ExitStatus, Reporter},
};
use crate::{Session, session::execute::spawn::SpawnTrackResult};
/// Internal error type used to abort execution when errors occur.
/// This error is swallowed in Session::execute and never exposed externally.
#[derive(Debug)]
struct ExecutionAborted;
struct ExecutionContext<'a> {
event_handler: &'a mut dyn Reporter,
current_execution_id: ExecutionId,
cache: &'a ExecutionCache,
/// All relative paths in cache are relative to this base path
cache_base_path: &'a Arc<AbsolutePath>,
}
impl ExecutionContext<'_> {
async fn execute_item_kind(
&mut self,
display: Option<&ExecutionItemDisplay>,
item_kind: &ExecutionItemKind,
) -> Result<(), ExecutionAborted> {
match item_kind {
ExecutionItemKind::Expanded(graph) => {
// Use StableGraph to preserve node indices during removal
let mut graph: StableGraph<&TaskExecution, (), _, ExecutionIx> =
graph.map(|_, task_execution| task_execution, |_, ()| ()).into();
// To be consistent with the package graph in vite_package_manager and the dependency graph definition in Wikipedia
// https://en.wikipedia.org/wiki/Dependency_graph, we construct the graph with edges from dependents to dependencies
// e.g. A -> B means A depends on B
//
// For execution we need to reverse the edges first before topological sorting,
// so that tasks without dependencies are executed first
graph.reverse(); // Run tasks without dependencies first
// Always use topological sort to ensure the correct order of execution
// or the task dependencies declaration is meaningless
let node_indices = match toposort(&graph, None) {
Ok(ok) => ok,
Err(cycle) => {
// Follow standard error pattern: Start event, then Error event
let execution_id = self.current_execution_id;
self.current_execution_id = self.current_execution_id.next();
// Emit Start event for cycle detection error
// display is None for top-level execution (no parent task)
// display is Some for nested execution (within a parent task)
// Caching is disabled when cycle dependencies are detected
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Start {
display: display.cloned(),
cache_status: CacheStatus::Disabled(
CacheDisabledReason::CycleDetected,
),
},
});
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Error {
message: format!("Cycle dependencies detected: {cycle:?}"),
},
});
return Err(ExecutionAborted);
}
};
let ordered_executions =
node_indices.into_iter().map(|id| graph.remove_node(id).unwrap());
for task_execution in ordered_executions {
for item in task_execution.items.iter() {
match &item.kind {
ExecutionItemKind::Leaf(leaf_kind) => {
self.execute_leaf(Some(&item.execution_item_display), leaf_kind)
.boxed_local()
.await?;
}
ExecutionItemKind::Expanded(_) => {
self.execute_item_kind(
Some(&item.execution_item_display),
&item.kind,
)
.boxed_local()
.await?;
}
}
}
}
}
ExecutionItemKind::Leaf(leaf_execution_kind) => {
self.execute_leaf(display, leaf_execution_kind).await?;
}
}
Ok(())
}
async fn execute_leaf(
&mut self,
display: Option<&ExecutionItemDisplay>,
leaf_execution_kind: &LeafExecutionKind,
) -> Result<(), ExecutionAborted> {
let execution_id = self.current_execution_id;
self.current_execution_id = self.current_execution_id.next();
match leaf_execution_kind {
LeafExecutionKind::InProcess(in_process_execution) => {
// Emit Start event with cache_status for in-process (built-in) commands
// Caching is disabled for built-in commands
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Start {
display: display.cloned(),
cache_status: CacheStatus::Disabled(
CacheDisabledReason::InProcessExecution,
),
},
});
// Execute the in-process command
let execution_output = in_process_execution.execute().await;
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Output {
kind: OutputKind::Stdout,
content: execution_output.stdout.into(),
},
});
// Emit Finish WITHOUT cache_status (already in Start event)
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Finish { status: Some(0) },
});
}
LeafExecutionKind::Spawn(spawn_execution) => {
self.execute_spawn(execution_id, display, spawn_execution).await?;
}
}
Ok(())
}
async fn execute_spawn(
&mut self,
execution_id: ExecutionId,
display: Option<&ExecutionItemDisplay>,
spawn_execution: &SpawnExecution,
) -> Result<(), ExecutionAborted> {
let cache_metadata = spawn_execution.cache_metadata.as_ref();
// 1. Determine cache status FIRST by trying cache hit
// We need to know the status before emitting Start event so users
// see cache status immediately when execution begins
let (cache_status, cached_value) = if let Some(cache_metadata) = cache_metadata {
match self.cache.try_hit(cache_metadata, &*self.cache_base_path).await {
Ok(Ok(cached)) => (
// Cache hit - we can replay the cached outputs
CacheStatus::Hit { replayed_duration: cached.duration },
Some(cached),
),
Ok(Err(cache_miss)) => (
// Cache miss - includes detailed reason (NotFound or FingerprintMismatch)
CacheStatus::Miss(cache_miss),
None,
),
Err(err) => {
// Cache lookup error - emit error and abort
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Error {
message: format!("Cache lookup failed: {err}"),
},
});
return Err(ExecutionAborted);
}
}
} else {
// No cache metadata provided - caching is disabled for this task
(CacheStatus::Disabled(CacheDisabledReason::NoCacheMetadata), None)
};
// 2. NOW emit Start event with cache_status (ALWAYS emit Start)
// This ensures all spawn executions emit Start, including cache hits
// (previously cache hits didn't emit Start at all)
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Start { display: display.cloned(), cache_status },
});
// 3. If cache hit, replay outputs and return early
// No need to actually execute the command - just replay what was cached
if let Some(cached) = cached_value {
for output in cached.std_outputs.iter() {
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Output {
kind: match output.kind {
SpawnOutputKind::StdOut => OutputKind::Stdout,
SpawnOutputKind::StdErr => OutputKind::Stderr,
},
content: output.content.clone().into(),
},
});
}
// Emit Finish without cache_status (status already in Start event)
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Finish { status: Some(0) },
});
return Ok(());
}
// 4. Execute spawn (cache miss or disabled)
// Track file system access if caching is enabled (for future cache updates)
let mut track_result_with_cache_metadata = if let Some(cache_metadata) = cache_metadata {
Some((SpawnTrackResult::default(), cache_metadata))
} else {
None
};
// Execute command with tracking, emitting output events in real-time
let result = match spawn_with_tracking(
&spawn_execution.spawn_command,
&*self.cache_base_path,
|kind, content| {
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Output {
kind: match kind {
SpawnOutputKind::StdOut => OutputKind::Stdout,
SpawnOutputKind::StdErr => OutputKind::Stderr,
},
content,
},
});
},
track_result_with_cache_metadata.as_mut().map(|(track_result, _)| track_result),
)
.await
{
Ok(result) => result,
Err(err) => {
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Error {
message: format!("Failed to spawn process: {err}"),
},
});
return Err(ExecutionAborted);
}
};
// 5. Update cache if successful
// Only update cache if: (a) tracking was enabled, and (b) execution succeeded
if let Some((track_result, cache_metadata)) = track_result_with_cache_metadata
&& result.exit_status.success()
{
let fingerprint_ignores =
cache_metadata.spawn_fingerprint.fingerprint_ignores().map(|v| v.as_slice());
match PostRunFingerprint::create(
&track_result.path_reads,
&*self.cache_base_path,
fingerprint_ignores,
) {
Ok(post_run_fingerprint) => {
let cache_value = CommandCacheValue {
post_run_fingerprint,
std_outputs: track_result.std_outputs.clone().into(),
duration: result.duration,
};
if let Err(err) = self.cache.update(cache_metadata, cache_value).await {
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Error {
message: format!("Failed to update cache: {err}"),
},
});
return Err(ExecutionAborted);
}
}
Err(err) => {
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Error {
message: format!("Failed to create post-run fingerprint: {err}"),
},
});
return Err(ExecutionAborted);
}
}
}
// 6. Emit finish WITHOUT cache_status
// Cache status was already emitted in Start event
self.event_handler.handle_event(ExecutionEvent {
execution_id,
kind: ExecutionEventKind::Finish { status: result.exit_status.code() },
});
Ok(())
}
}
impl<'a, CustomSubcommand> Session<'a, CustomSubcommand> {
/// Execute an execution plan, reporting events to the provided reporter.
///
/// Returns Err(ExitStatus) to suggest the caller to abort and exit the process with the given exit status.
///
/// The return type isn't just ExitStatus because we want to distinguish between normal successful execution,
/// and execution that failed and needs to exit with a specific code which can be zero.
pub async fn execute(
&self,
plan: ExecutionPlan,
mut reporter: Box<dyn Reporter>,
) -> Result<(), ExitStatus> {
// Lazily initialize the cache on first execution
let cache = match self.cache() {
Ok(cache) => cache,
Err(err) => {
reporter.handle_event(ExecutionEvent {
execution_id: ExecutionId::zero(),
kind: ExecutionEventKind::Error {
message: format!("Failed to initialize cache: {err}"),
},
});
return Err(ExitStatus(1));
}
};
let mut execution_context = ExecutionContext {
event_handler: &mut *reporter,
current_execution_id: ExecutionId::zero(),
cache,
cache_base_path: &self.workspace_path,
};
// Execute and swallow ExecutionAborted error
// display is None for top-level execution
let _ = execution_context.execute_item_kind(None, plan.root_node()).await;
// Always call post_execution, whether execution succeeded or failed
reporter.post_execution()
}
}