Skip to content

Commit f040eaa

Browse files
committed
Fix two-phase CA derivation resolution and StorePath type safety
Fix several issues with the `ca-resolve-two-phase` branch: - Fix `find_build_step_outputs` SQL: `?` → `$1` (PostgreSQL), add `s.build = o.build` join condition - Fix dep graph propagation: when resolving a CA derivation, remove the original step from each dependent's deps and replace with the resolved step, so downstream steps become runnable - Skip two-phase resolution for CA floating drvs with only `Opaque` inputs (no `Built` deps) — these are already resolved - Replace `resolvedToStep` (integer FK) with `resolvedDrvPath` (text basename) so output lookups work across builds by drv path So it turns out that only scheduled or finishes steps go in the DB with the current desing. Steps which totally pending (blocked on other steps, for example) just live in memory. So we won't get our step number at resolution time to use as a foreign key. Instead, we can just write down the drv path, which is enough for the in-memory step, and only when that step is processsed will something end up in the data base. - Remove eager `create_build_step` for resolved steps — the DB entry is created when the step is actually dispatched very much corresponds to the above - Skip dyn-drv integration test (dynamic derivation resolution not yet implemented in Hydra) Our passing it was very much a lie before. Our failing it now is more honest. - Add unit tests for `resolve_drv_output_chains` with resolved steps Also, did this general cleanup which should be pulled out of this commit and landed on master independently, first. - Refactor `db` crate to use `StorePath` and `OutputName` types instead of raw `&str` for drv paths, output paths, and output names. We want to make the core logic all using nice types, and get away from not-nice types as soon as possible after getting out the database. This does that. - Add `store_dir: &StoreDir` parameter to db functions that need to render `StorePath` to full paths for legacy DB columns
1 parent d9526df commit f040eaa

10 files changed

Lines changed: 527 additions & 358 deletions

File tree

subprojects/crates/db/src/connection.rs

Lines changed: 407 additions & 169 deletions
Large diffs are not rendered by default.

subprojects/crates/db/src/models.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use harmonia_store_core::derived_path::OutputName;
2-
use harmonia_store_core::store_path::{ParseStorePathError, StoreDir};
2+
use harmonia_store_core::store_path::{ParseStorePathError, StoreDir, StorePath};
33
use hashbrown::HashMap;
44

55
pub type BuildID = i32;
@@ -132,7 +132,7 @@ pub struct UpdateBuild<'a> {
132132
pub struct InsertBuildStep<'a> {
133133
pub build_id: BuildID,
134134
pub r#type: BuildType,
135-
pub drv_path: &'a str,
135+
pub drv_path: &'a StorePath,
136136
pub status: BuildStatus,
137137
pub busy: bool,
138138
pub start_time: Option<i32>,
@@ -147,7 +147,7 @@ pub struct InsertBuildStep<'a> {
147147
pub struct InsertBuildStepOutput<StorePath = harmonia_store_core::store_path::StorePath> {
148148
pub build_id: BuildID,
149149
pub step_nr: i32,
150-
pub name: String,
150+
pub name: OutputName,
151151
pub path: Option<StorePath>,
152152
}
153153

subprojects/hydra-queue-runner/src/state/mod.rs

Lines changed: 72 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ pub use step::{Step, Steps};
2020
pub use step_info::StepInfo;
2121

2222
use std::collections::BTreeMap;
23-
use std::str::FromStr as _;
2423
use std::sync::Arc;
2524
use std::sync::atomic::{AtomicI64, Ordering};
2625
use std::time::Instant;
@@ -338,9 +337,10 @@ impl State {
338337

339338
let step_nr = tx
340339
.create_build_step(
340+
self.store.store_dir(),
341341
Some(job.result.get_start_time_as_i32()?),
342342
build_id,
343-
&self.store.print_store_path(step_info.step.get_drv_path()),
343+
step_info.step.get_drv_path(),
344344
step_info.step.get_system().as_deref(),
345345
machine.hostname.clone(),
346346
BuildStatus::Busy,
@@ -351,12 +351,6 @@ impl State {
351351
.get_output_paths(self.store.store_dir())
352352
.unwrap_or_default()
353353
.into_iter()
354-
.map(|(name, path)| {
355-
(
356-
name.to_string(),
357-
path.map(|p| self.store.print_store_path(&p)),
358-
)
359-
})
360354
.collect(),
361355
)
362356
.await?;
@@ -376,59 +370,58 @@ impl State {
376370
};
377371
let drv_ref = guard.as_ref().unwrap();
378372

379-
let is_ca_floating = drv_ref
380-
.outputs
381-
.iter()
382-
.all(|output| matches!(output.1, nix_utils::DerivationOutput::CAFloating(_)));
383-
384-
let needs_resolution = is_ca_floating
385-
&& drv_ref.inputs.iter().any(|input| {
386-
matches!(
387-
input,
388-
nix_utils::SingleDerivedPath::Built {
389-
drv_path: _,
390-
output: _
391-
}
392-
)
393-
});
394-
395-
if needs_resolution {
396-
let resolved_path = if let Some(basic_drv) =
397-
StepInfo::try_resolve(self.store.store_dir(), &self.db, drv_ref).await
398-
{
399-
let resolved_path = self.store.write_derivation(&basic_drv).await?;
400-
(&resolved_path != drv).then_some(resolved_path)
401-
} else {
402-
None
403-
};
373+
// `Some(path)` if this CA derivation was resolved to a
374+
// different drv; `None` if resolution is not needed or is
375+
// a no-op (same drv path).
376+
let resolved_path = (|| async {
377+
// Only CA floating derivations need resolution, and only
378+
// when they have `Built` inputs (dynamic derivation deps).
379+
// `Opaque`-only inputs are already resolved.
380+
let is_ca_with_built_inputs =
381+
drv_ref.outputs.iter().all(|output| {
382+
matches!(output.1, nix_utils::DerivationOutput::CAFloating(_))
383+
}) && drv_ref
384+
.inputs
385+
.iter()
386+
.any(|input| matches!(input, nix_utils::SingleDerivedPath::Built { .. }));
387+
tracing::info!(
388+
is_ca_with_built_inputs,
389+
ca_floating = drv_ref
390+
.outputs
391+
.iter()
392+
.all(|o| matches!(o.1, nix_utils::DerivationOutput::CAFloating(_))),
393+
has_built_inputs = drv_ref
394+
.inputs
395+
.iter()
396+
.any(|i| matches!(i, nix_utils::SingleDerivedPath::Built { .. })),
397+
"resolution check for {drv}"
398+
);
399+
if !is_ca_with_built_inputs {
400+
return Ok::<_, anyhow::Error>(None);
401+
}
404402

405-
// If we try to build an unresolved CA derivation it might work, but it might not.
406-
// Make sure it always fails for consistency.
407-
let Some(resolved_path) = resolved_path else {
408-
return Err(anyhow::anyhow!(
409-
"Failed to resolve CAFloating derivation {drv}"
410-
));
411-
};
403+
// Resolve `Built` input placeholders to concrete store
404+
// paths using outputs recorded in the DB.
405+
let basic_drv = StepInfo::try_resolve(self.store.store_dir(), &self.db, drv_ref)
406+
.await
407+
.ok_or_else(|| {
408+
anyhow::anyhow!("Failed to resolve CAFloating derivation {drv}")
409+
})?;
410+
let resolved_path = self.store.write_derivation(&basic_drv).await?;
411+
// If resolution changed the drv, we need a two-phase
412+
// build; otherwise just build the original directly.
413+
Ok((&resolved_path != drv).then_some(resolved_path))
414+
})()
415+
.await?;
412416

417+
if let Some(resolved_path) = resolved_path {
413418
tracing::info!("resolved CA derivation {drv} -> {resolved_path}");
414419

415-
// Create DB step for the resolved drv under the same build.
420+
// Record the resolved drv path on the original step so
421+
// future output lookups can follow the chain.
416422
let mut tx = db.begin_transaction().await?;
417-
let nr = tx
418-
.create_build_step(
419-
Some(job.result.get_start_time_as_i32()?),
420-
build_id,
421-
&self.store.print_store_path(&resolved_path),
422-
step_info.step.get_system().as_deref(),
423-
machine.hostname.clone(),
424-
BuildStatus::Busy,
425-
None,
426-
None,
427-
vec![],
428-
)
423+
tx.set_resolved_to(build_id, step_nr, &resolved_path)
429424
.await?;
430-
431-
tx.set_resolved_to(build_id, step_nr, nr).await?;
432425
tx.commit().await?;
433426

434427
// Finish original step as "resolved" in the DB and in-memory
@@ -499,11 +492,15 @@ impl State {
499492
// dependencies and, if it was the root derivation, as the new root
500493
// derivation of the build.
501494

502-
// Copy rdeps so the resolved step is kept around.
503-
// This also prevents reverse dependencies from being kept
504-
// around for too long.
495+
// Replace the original step with the resolved step in the
496+
// dependency graph. Each step that depended on the original
497+
// (unresolved) step must now depend on the resolved step
498+
// instead, otherwise it will never become runnable (the
499+
// original step's drv path differs from the resolved one, so
500+
// completing the resolved step wouldn't clear the dep).
505501
for rdep in step_info.step.clone_rdeps() {
506502
if let Some(rdep) = rdep.upgrade() {
503+
rdep.remove_dep(&step_info.step);
507504
resolved_step.make_rdep(&rdep);
508505
}
509506
}
@@ -717,8 +714,9 @@ impl State {
717714
.await?
718715
.ok_or_else(|| anyhow::anyhow!("drv not found"))?;
719716
db.insert_debug_build(
717+
self.store.store_dir(),
720718
jobset_id,
721-
&self.store.print_store_path(drv_path),
719+
drv_path,
722720
std::str::from_utf8(&drv.platform).expect("platform must be valid UTF-8"),
723721
)
724722
.await?;
@@ -1560,9 +1558,10 @@ impl State {
15601558
}
15611559

15621560
tx.create_build_step(
1561+
self.store.store_dir(),
15631562
None,
15641563
b.id,
1565-
&self.store.print_store_path(step.get_drv_path()),
1564+
step.get_drv_path(),
15661565
step.get_system().as_deref(),
15671566
machine
15681567
.as_deref()
@@ -1578,12 +1577,6 @@ impl State {
15781577
step.get_output_paths(self.store.store_dir())
15791578
.unwrap_or_default()
15801579
.into_iter()
1581-
.map(|(name, path)| {
1582-
(
1583-
name.to_string(),
1584-
path.map(|p| self.store.print_store_path(&p)),
1585-
)
1586-
})
15871580
.collect(),
15881581
)
15891582
.await?;
@@ -1622,7 +1615,7 @@ impl State {
16221615
.unwrap_or_default()
16231616
{
16241617
if let Some(path) = path {
1625-
tx.insert_failed_paths(&self.store.print_store_path(&path))
1618+
tx.insert_failed_paths(self.store.store_dir(), &path)
16261619
.await?;
16271620
}
16281621
}
@@ -1698,7 +1691,7 @@ impl State {
16981691
// Find the previous build step record, first by derivation path, then by output
16991692
// path.
17001693
let mut propagated_from = tx
1701-
.get_last_build_step_id(&self.store.print_store_path(step.get_drv_path()))
1694+
.get_last_build_step_id(self.store.store_dir(), step.get_drv_path())
17021695
.await?
17031696
.unwrap_or_default();
17041697

@@ -1711,11 +1704,12 @@ impl State {
17111704
.unwrap_or_default();
17121705
for (name, path) in &outputs {
17131706
let res = if let Some(path) = path {
1714-
tx.get_last_build_step_id_for_output_path(&self.store.print_store_path(path))
1707+
tx.get_last_build_step_id_for_output_path(self.store.store_dir(), path)
17151708
.await
17161709
} else {
17171710
tx.get_last_build_step_id_for_output_with_drv(
1718-
&self.store.print_store_path(step.get_drv_path()),
1711+
self.store.store_dir(),
1712+
step.get_drv_path(),
17191713
name.as_ref(),
17201714
)
17211715
.await
@@ -1728,9 +1722,10 @@ impl State {
17281722
}
17291723

17301724
tx.create_build_step(
1725+
self.store.store_dir(),
17311726
None,
17321727
build.id,
1733-
&self.store.print_store_path(step.get_drv_path()),
1728+
step.get_drv_path(),
17341729
step.get_system().as_deref(),
17351730
String::new(),
17361731
BuildStatus::CachedFailure,
@@ -1739,12 +1734,6 @@ impl State {
17391734
step.get_output_paths(self.store.store_dir())
17401735
.unwrap_or_default()
17411736
.into_iter()
1742-
.map(|(name, path)| {
1743-
(
1744-
name.to_string(),
1745-
path.map(|p| self.store.print_store_path(&p)),
1746-
)
1747-
})
17481737
.collect(),
17491738
)
17501739
.await?;
@@ -2155,17 +2144,9 @@ impl State {
21552144
) -> anyhow::Result<BTreeMap<nix_utils::OutputName, nix_utils::StorePath>> {
21562145
let mut db = self.db.get().await?;
21572146
let mut tx = db.begin_transaction().await?;
2158-
tx.find_build_step_outputs(&self.store.print_store_path(drv_path))
2159-
.await?
2160-
.iter()
2161-
.map(|(name, path)| -> anyhow::Result<_> {
2162-
Ok((
2163-
nix_utils::OutputName::from_str(name)?,
2164-
self.store.get_store_dir().parse(path)?,
2165-
))
2166-
})
2167-
.collect::<anyhow::Result<_>>()
2168-
.into()
2147+
Ok(tx
2148+
.find_build_step_outputs(self.store.store_dir(), drv_path)
2149+
.await?)
21692150
}
21702151

21712152
#[tracing::instrument(skip(self, step), ret, level = "debug")]
@@ -2179,9 +2160,10 @@ impl State {
21792160
};
21802161

21812162
conn.check_if_paths_failed(
2163+
self.store.store_dir(),
21822164
&drv_outputs
21832165
.iter()
2184-
.filter_map(|(_, path)| path.as_ref().map(|p| self.store.print_store_path(p)))
2166+
.filter_map(|(_, path)| path.as_ref().cloned())
21852167
.collect::<Vec<_>>(),
21862168
)
21872169
.await
@@ -2233,7 +2215,7 @@ impl State {
22332215
continue;
22342216
};
22352217
let Some(db_build_output) = db
2236-
.get_build_output_for_path(&self.store.print_store_path(out_path))
2218+
.get_build_output_for_path(self.store.store_dir(), out_path)
22372219
.await?
22382220
else {
22392221
continue;

subprojects/hydra-queue-runner/src/state/step.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,14 @@ impl Step {
351351
.store(state.deps.len() as u64, Ordering::Relaxed);
352352
}
353353

354+
pub fn remove_dep(&self, dep: &Arc<Self>) {
355+
let mut state = self.state.write();
356+
state.deps.remove(dep);
357+
self.atomic_state
358+
.deps_len
359+
.store(state.deps.len() as u64, Ordering::Relaxed);
360+
}
361+
354362
pub fn make_rdep(self: &Arc<Self>, dep: &Arc<Self>) {
355363
dep.add_dep(self.clone());
356364
let mut state = self.state.write();

0 commit comments

Comments
 (0)