Skip to content

Commit 3ae2129

Browse files
fix(exec): propagate skips transitively under keep-going; UTF8-safe display_name; reap ephemeral images; fix cache clean (#130)
1 parent 6347385 commit 3ae2129

7 files changed

Lines changed: 284 additions & 33 deletions

File tree

crates/hm-exec/src/local/runner/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::pin::Pin;
1111
use std::sync::Arc;
1212

1313
use anyhow::Result;
14-
use hm_plugin_protocol::{ExecutorInput, StepResult};
14+
use hm_plugin_protocol::{ExecutorInput, SnapshotRef, StepResult};
1515
use tokio_util::sync::CancellationToken;
1616

1717
use crate::local::archive::ArchiveStore;
@@ -55,6 +55,21 @@ pub trait StepRunner: Send + Sync + fmt::Debug {
5555
ctx: &StepContext,
5656
input: ExecutorInput,
5757
) -> Pin<Box<dyn Future<Output = Result<StepResult>> + Send + '_>>;
58+
59+
/// Reap transient snapshots once a run has finished.
60+
///
61+
/// Ephemeral (uncached) leaf steps commit a snapshot purely so a
62+
/// downstream `BuildsIn` child can restore from it; nothing else holds a
63+
/// reference and the cache registry never tracks them. The scheduler
64+
/// collects every such snapshot and calls this at run end (best-effort)
65+
/// so they don't leak in the backend store. The default is a no-op for
66+
/// runners that produce no reapable snapshots.
67+
fn reap_snapshots<'a>(
68+
&'a self,
69+
_snapshots: Vec<SnapshotRef>,
70+
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
71+
Box::pin(async {})
72+
}
5873
}
5974

6075
/// Maps runner names to [`StepRunner`] implementations.

crates/hm-exec/src/local/runner/vm.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@ impl StepRunner for VmRunner {
4949
let vm = Arc::clone(&self.vm);
5050
Box::pin(async move { run_step_vm(&vm, &ctx, input).await })
5151
}
52+
53+
fn reap_snapshots<'a>(
54+
&'a self,
55+
snapshots: Vec<SnapshotRef>,
56+
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
57+
let vm = Arc::clone(&self.vm);
58+
Box::pin(async move {
59+
for snap in snapshots {
60+
let id = SnapshotId::new(snap.0);
61+
if let Err(e) = vm.remove_snapshot(&id).await {
62+
tracing::warn!(snapshot = %id, error = %e, "failed to reap ephemeral snapshot");
63+
}
64+
}
65+
})
66+
}
5267
}
5368

5469
#[tracing::instrument(skip(vm, ctx), fields(step_key = %input.step.key))]

crates/hm-exec/src/local/scheduler.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ struct StepOutcome {
6060
/// `None` only for steps short-circuited because a predecessor failed
6161
/// or the build was cancelled before they could run.
6262
summary: Option<StepResultSummary>,
63+
/// Set when this step did not complete successfully — it failed, timed
64+
/// out, was cancelled, or was itself skipped. Descendants gate on this
65+
/// (not on `exit_code`) so a skip propagates transitively: a skipped
66+
/// step reports `exit_code == 0`, so the exit code alone cannot
67+
/// distinguish "passed" from "skipped" and the cascade would break.
68+
failed_or_skipped: bool,
6369
}
6470

6571
type StepFuture = futures::future::Shared<BoxFuture<'static, StepOutcome>>;
@@ -186,8 +192,12 @@ pub(crate) async fn run(
186192
let pred_outcomes: Vec<StepOutcome> =
187193
join_all(preds.iter().map(|(_, f)| f.clone())).await;
188194

189-
// Early exit if any predecessor failed or the build was cancelled.
190-
if cancel.is_cancelled() || pred_outcomes.iter().any(|o| o.exit_code != 0) {
195+
// Early exit if any predecessor failed/was skipped, or the build
196+
// was cancelled. Gating on `failed_or_skipped` (not `exit_code`)
197+
// is what makes the skip propagate transitively: a skipped
198+
// predecessor reports `exit_code == 0`, so an exit-code-only gate
199+
// would let a skipped step's descendants run anyway.
200+
if cancel.is_cancelled() || pred_outcomes.iter().any(|o| o.failed_or_skipped) {
191201
let status = if cancel.is_cancelled() {
192202
StepStatus::Canceled
193203
} else {
@@ -203,6 +213,7 @@ pub(crate) async fn run(
203213
exit_code: None,
204214
duration_ms: 0,
205215
}),
216+
failed_or_skipped: true,
206217
};
207218
}
208219

@@ -249,6 +260,7 @@ pub(crate) async fn run(
249260
exit_code: Some(1),
250261
duration_ms: 0,
251262
}),
263+
failed_or_skipped: true,
252264
}
253265
}
254266
}
@@ -285,6 +297,22 @@ pub(crate) async fn run(
285297
let outcomes: Vec<StepOutcome> = join_all(pending).await;
286298
let any_failed = outcomes.iter().any(|o| o.exit_code != 0);
287299

300+
// Reap ephemeral leaf snapshots. Uncached steps commit an `ephemeral:*`
301+
// image for downstream container lineage; the cache registry never tracks
302+
// them, so once the run is over nothing else will. Collect every such
303+
// snapshot the steps produced and ask the default runner to remove them
304+
// (best-effort — failures are logged, not fatal).
305+
let ephemeral: Vec<SnapshotRef> = outcomes
306+
.iter()
307+
.filter_map(|o| o.snapshot.clone())
308+
.filter(|s| s.0.starts_with("ephemeral:"))
309+
.collect();
310+
if !ephemeral.is_empty()
311+
&& let Some(runner) = runner_registry.resolve(None)
312+
{
313+
runner.reap_snapshots(ephemeral).await;
314+
}
315+
288316
// Derive the overall verdict. Timeout wins (it also fired cancellation);
289317
// then cancellation; then any failed step; otherwise the build passed.
290318
let status = if timed_out {
@@ -363,10 +391,13 @@ async fn execute_step(
363391
let step_key = step_wire.key.clone();
364392
let display_name = step_wire.label.clone().unwrap_or_else(|| {
365393
let cmd = step_wire.cmd.trim();
366-
if cmd.len() <= 40 {
394+
if cmd.chars().count() <= 40 {
367395
cmd.to_owned()
368396
} else {
369-
format!("{}…", &cmd[..39])
397+
// Truncate on a char boundary, not a byte offset: `&cmd[..39]`
398+
// panics if byte 39 falls inside a multibyte UTF-8 sequence.
399+
let truncated: String = cmd.chars().take(39).collect();
400+
format!("{truncated}…")
370401
}
371402
});
372403
let env_map = transition.env;
@@ -476,6 +507,7 @@ async fn execute_step(
476507
exit_code: Some(124),
477508
duration_ms: dur_ms,
478509
}),
510+
failed_or_skipped: true,
479511
});
480512
}
481513
}
@@ -522,6 +554,7 @@ async fn execute_step(
522554
exit_code: Some(sr.exit_code),
523555
duration_ms: dur_ms,
524556
}),
557+
failed_or_skipped: sr.exit_code != 0,
525558
})
526559
}
527560
Err(e) => {

crates/hm-vm/src/registry.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,25 @@ impl ImageRegistry {
154154
snapshot.map(SnapshotId::new)
155155
}
156156

157+
/// Return every stored snapshot ID.
158+
///
159+
/// Used by `hm cache clean` to remove the backing backend images before
160+
/// the registry DB is deleted — without this the images orphan beyond
161+
/// recovery by key. Order is unspecified.
162+
#[must_use]
163+
pub fn all_snapshot_ids(&self) -> Vec<SnapshotId> {
164+
let Ok(conn) = self.conn.lock() else {
165+
return Vec::new();
166+
};
167+
let Ok(mut stmt) = conn.prepare("SELECT snapshot_id FROM snapshots") else {
168+
return Vec::new();
169+
};
170+
stmt.query_map([], |row| row.get::<_, String>(0).map(SnapshotId::new))
171+
.ok()
172+
.map(|rows| rows.filter_map(Result::ok).collect())
173+
.unwrap_or_default()
174+
}
175+
157176
/// Returns the number of cached entries.
158177
#[must_use]
159178
pub fn len(&self) -> u64 {
@@ -315,6 +334,23 @@ mod tests {
315334
assert_eq!(got, Some(SnapshotId::new("snap-persist")));
316335
}
317336

337+
#[test]
338+
fn all_snapshot_ids_returns_every_entry() {
339+
let (reg, _dir) = open_temp(10);
340+
assert!(reg.all_snapshot_ids().is_empty());
341+
342+
reg.put("k1", &SnapshotId::new("forever-a"));
343+
reg.put("k2", &SnapshotId::new("forever-b"));
344+
345+
let mut ids: Vec<String> = reg
346+
.all_snapshot_ids()
347+
.into_iter()
348+
.map(|s| s.to_string())
349+
.collect();
350+
ids.sort();
351+
assert_eq!(ids, vec!["forever-a".to_string(), "forever-b".to_string()]);
352+
}
353+
318354
#[test]
319355
fn invalidate_returns_removed_snapshot() {
320356
let (reg, _dir) = open_temp(10);

crates/hm-vm/src/vm.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use tracing::{instrument, warn};
88
use crate::backend::VmBackend;
99
use crate::registry::ImageRegistry;
1010
use crate::types::{
11-
Action, CachingPolicy, ExecutionResult, ImageSource, OutputSink, SnapshotLabel, VmConfig,
11+
Action, CachingPolicy, ExecutionResult, ImageSource, OutputSink, SnapshotId, SnapshotLabel,
12+
VmConfig,
1213
};
1314

1415
/// High-level orchestrator that drives the VM lifecycle.
@@ -81,6 +82,20 @@ impl HmVm {
8182
result
8283
}
8384

85+
/// Remove a snapshot from the backend store.
86+
///
87+
/// Used to reap ephemeral (uncached) leaf snapshots once a run finishes —
88+
/// `CachingPolicy::None` commits a transient `ephemeral:*` image purely for
89+
/// downstream container lineage, and nothing in the registry ever evicts
90+
/// it. The scheduler reaps these explicitly at run end.
91+
///
92+
/// # Errors
93+
///
94+
/// Returns an error if the backend fails to remove the snapshot.
95+
pub async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()> {
96+
self.backend.remove_snapshot(snapshot).await
97+
}
98+
8499
/// Inner lifecycle: inject, exec, snapshot. Separated so the caller
85100
/// can guarantee `vm.destroy()` runs regardless of outcome.
86101
async fn run_in_vm(

crates/hm/src/commands/cache/clean.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::Result;
2+
use hm_vm::VmBackend as _;
23

34
/// # Errors
45
/// Returns an error if workspace cache removal fails.
@@ -21,6 +22,15 @@ pub async fn handle_clean() -> Result<i32> {
2122
let db_cleaned = if let Some(cache_dir) = hm_util::dirs::hm_cache_dir() {
2223
let db_path = cache_dir.join("registry.db");
2324
if db_path.exists() {
25+
// Remove the backing Docker images BEFORE deleting registry.db.
26+
// The registry is the only index from a cache key to its tagged
27+
// image (`forever-*`, etc.); once the DB is gone the images can't
28+
// be located by key, and `docker image prune` only reclaims
29+
// *dangling* images, so a tagged snapshot survives it. So we
30+
// enumerate the registry, remove each image via the Docker
31+
// backend (best-effort), then drop the DB.
32+
remove_registered_images(&db_path).await;
33+
2434
std::fs::remove_file(&db_path)?;
2535
tracing::info!(path = %db_path.display(), "removed VM image registry");
2636
true
@@ -31,19 +41,60 @@ pub async fn handle_clean() -> Result<i32> {
3141
false
3242
};
3343

34-
if db_cleaned {
35-
tracing::warn!(
36-
"Docker images from previous runs may still exist — run `docker image prune` to reclaim disk"
37-
);
38-
}
39-
4044
if !ws_cleaned && !db_cleaned {
4145
tracing::info!("nothing to clean");
4246
}
4347

4448
Ok(0)
4549
}
4650

51+
/// Remove every Docker image tracked by the registry at `db_path`.
52+
///
53+
/// Best-effort: a missing Docker daemon or an already-deleted image is logged
54+
/// and skipped, never fatal — `clean` must still delete the registry DB so the
55+
/// cache index is reset.
56+
async fn remove_registered_images(db_path: &std::path::Path) {
57+
// Capacity here is irrelevant — we only read existing rows, never insert.
58+
let registry = match hm_vm::ImageRegistry::open(db_path, u64::MAX) {
59+
Ok(r) => r,
60+
Err(e) => {
61+
tracing::warn!(error = %e, "could not open image registry; skipping image removal");
62+
return;
63+
}
64+
};
65+
66+
let snapshots = registry.all_snapshot_ids();
67+
if snapshots.is_empty() {
68+
return;
69+
}
70+
71+
let backend = match hm_vm::docker::DockerBackend::connect() {
72+
Ok(b) => b,
73+
Err(e) => {
74+
tracing::warn!(
75+
error = %e,
76+
"could not connect to Docker; {} cached image(s) may remain — remove them with `docker image rm`",
77+
snapshots.len(),
78+
);
79+
return;
80+
}
81+
};
82+
83+
let mut removed = 0usize;
84+
for snap in &snapshots {
85+
match backend.remove_snapshot(snap).await {
86+
Ok(()) => removed += 1,
87+
Err(e) => {
88+
tracing::warn!(image = %snap, error = %e, "failed to remove cached image");
89+
}
90+
}
91+
}
92+
tracing::info!(
93+
"removed {removed} of {} cached Docker image(s)",
94+
snapshots.len()
95+
);
96+
}
97+
4798
fn dir_size(path: &std::path::Path) -> u64 {
4899
fn walk(p: &std::path::Path) -> u64 {
49100
std::fs::read_dir(p)

0 commit comments

Comments
 (0)