Skip to content

Commit dd07515

Browse files
artemistEricson2314amaanq
committed
queue-runner: resolve dynamic derivation dependencies
Reverse dependencies now carry an `OutputNameChain` (empty for regular deps, non-empty for dynamic ones), stored in a unified `rdeps` list via `ReverseDep`. This replaces the previous `get_input_drvs` method with `input_drvs`, which walks `SingleDerivedPath` directly, skipping `Opaque` (source) inputs and returning only `Built` derivation dependencies with their relation chain. When a step completes and has dynamic reverse dependencies (`rdeps` with a non-empty `OutputNameChain`), pop one level from the relation, look up the produced output, and create a new step for the resulting derivation. This resolves dynamic derivation chains one level at a time as each step finishes, with no extra DB round-trip needed. The dyn-drv tests are re-enabled and updated to verify the full build step chains: 4 steps for simple dynamic derivations, 12 for the non-trivial DAG case. Co-Authored-By: Artemis Tosini <artemis.tosini@obsidian.systems> Co-Authored-By: John Ericson <John.Ericson@Obsidian.Systems> Co-Authored-By: Amaan Qureshi <git@amaanq.com>
1 parent ba888fb commit dd07515

5 files changed

Lines changed: 240 additions & 74 deletions

File tree

subprojects/hydra-queue-runner/src/state/drv.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::BTreeSet;
2+
13
use nix_utils::SingleDerivedPath;
24

35
/// Output names of intermediate derivations for a dynamic derivation
@@ -42,3 +44,21 @@ pub fn flatten_chain(
4244
chain.0.push(output_name.clone());
4345
(root, chain)
4446
}
47+
48+
/// Extract `Built` input dependencies from a derivation.
49+
///
50+
/// Returns `(root_drv_path, relation)` pairs. `Opaque` (source) inputs are
51+
/// skipped — only derivation build dependencies are returned. For each
52+
/// `Built` input, the outermost output name (what we consume) is discarded;
53+
/// intermediate output names form the [`OutputNameChain`].
54+
pub fn input_drvs(
55+
drv: &nix_utils::Derivation,
56+
) -> BTreeSet<(nix_utils::StorePath, OutputNameChain)> {
57+
drv.inputs
58+
.iter()
59+
.filter_map(|sdp| match sdp {
60+
SingleDerivedPath::Opaque(_) => None,
61+
SingleDerivedPath::Built { drv_path, .. } => Some(flatten_path(drv_path)),
62+
})
63+
.collect()
64+
}

subprojects/hydra-queue-runner/src/state/mod.rs

Lines changed: 102 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ impl State {
521521
// original step's drv path differs from the resolved one, so
522522
// completing the resolved step wouldn't clear the dep).
523523
for rdep in step_info.step.clone_rdeps() {
524-
if let Some(rdep) = rdep.upgrade() {
524+
if let Some(rdep) = rdep.step.upgrade() {
525525
rdep.remove_dep(&step_info.step);
526526
resolved_step.make_rdep(&rdep);
527527
}
@@ -1388,6 +1388,74 @@ impl State {
13881388
tx.commit().await?;
13891389
}
13901390

1391+
// Process dynamic rdeps first, as we must add new step dependencies for dynamically
1392+
// generated derivations
1393+
{
1394+
for (dep_step, output_name, relation) in item.step_info.step.pop_dynamic_rdeps() {
1395+
let Some(dependent_step) = dep_step.upgrade() else {
1396+
continue;
1397+
};
1398+
1399+
let resolved_drv = output.outputs.get(&output_name).cloned().ok_or_else(|| {
1400+
anyhow::anyhow!(
1401+
"Dynamic rdep references output `{output_name}` not produced by {drv_path}"
1402+
)
1403+
})?;
1404+
1405+
// Find a build associated with this step. For intermediate steps
1406+
// (not top-level), `direct` is empty, so we walk the dependency
1407+
// chain via `get_dependents` to find the owning build.
1408+
let build = if let Some(b) = direct.get(0) {
1409+
b.clone()
1410+
} else {
1411+
let mut dependents = HashSet::new();
1412+
let mut visited_steps = HashSet::new();
1413+
item.step_info
1414+
.step
1415+
.get_dependents(&mut dependents, &mut visited_steps);
1416+
let Some(b) = dependents.into_iter().next() else {
1417+
tracing::warn!("Finished step does not have associated build");
1418+
continue;
1419+
};
1420+
b
1421+
};
1422+
1423+
// Create the actual step for the new derivation.
1424+
// finished_drvs is not necessary as it is only a memoization table to reduce
1425+
// checks if a dependency is finished from the database.
1426+
// new_steps is not necessary either as
1427+
let new_runnable: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>> = Default::default();
1428+
let new_step = match self
1429+
.create_step(
1430+
build.clone(),
1431+
resolved_drv.clone(),
1432+
None,
1433+
Some((dependent_step.clone(), relation)),
1434+
Default::default(),
1435+
Default::default(),
1436+
new_runnable.clone(),
1437+
)
1438+
.await
1439+
{
1440+
CreateStepResult::None => continue,
1441+
CreateStepResult::Valid(step) => step,
1442+
CreateStepResult::PreviousFailure(step) => {
1443+
if let Err(e) = self.handle_previous_failure(build.clone(), step).await {
1444+
tracing::error!("Failed to handle previous failure: {e}");
1445+
}
1446+
// TODO: figure out what to do here
1447+
continue;
1448+
}
1449+
};
1450+
1451+
for r in new_runnable.read().iter() {
1452+
r.make_runnable();
1453+
}
1454+
1455+
// create_step already added rdeps, but we need to add a forward dep as well
1456+
dependent_step.add_dep(new_step);
1457+
}
1458+
}
13911459
item.step_info.step.make_rdeps_runnable();
13921460

13931461
// always trigger dispatch, as we now might have a free machine again
@@ -1933,7 +2001,7 @@ impl State {
19332001
build: Arc<Build>,
19342002
drv_path: nix_utils::StorePath,
19352003
referring_build: Option<Arc<Build>>,
1936-
referring_step: Option<Arc<Step>>,
2004+
referring_step: Option<(Arc<Step>, drv::OutputNameChain)>,
19372005
finished_drvs: Arc<parking_lot::RwLock<HashSet<nix_utils::StorePath>>>,
19382006
new_steps: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>>,
19392007
new_runnable: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>>,
@@ -1947,9 +2015,13 @@ impl State {
19472015
}
19482016
}
19492017

1950-
let (step, is_new) =
1951-
self.steps
1952-
.create(&drv_path, referring_build.as_ref(), referring_step.as_ref());
2018+
let (step, is_new) = self.steps.create(
2019+
&drv_path,
2020+
referring_build.as_ref(),
2021+
referring_step
2022+
.as_ref()
2023+
.map(|(step, relation)| (step, relation.clone())),
2024+
);
19532025
if !is_new {
19542026
return CreateStepResult::Valid(step);
19552027
}
@@ -2045,6 +2117,7 @@ impl State {
20452117
self.store.query_missing_outputs(output_paths).await
20462118
};
20472119

2120+
let input_drvs = drv::input_drvs(&drv);
20482121
step.set_drv(drv);
20492122

20502123
if self.check_cached_failure(step.clone()).await {
@@ -2102,35 +2175,32 @@ impl State {
21022175
}
21032176

21042177
tracing::debug!("creating build step '{drv_path}");
2105-
let Some(input_drvs) = step.get_input_drvs() else {
2106-
// this should never happen because we always a a drv set at this point in time
2107-
return CreateStepResult::None;
2108-
};
21092178

21102179
let step2 = step.clone();
2111-
let mut stream = futures::StreamExt::map(tokio_stream::iter(input_drvs), |i| {
2112-
let build = build.clone();
2113-
let step = step2.clone();
2114-
let finished_drvs = finished_drvs.clone();
2115-
let new_steps = new_steps.clone();
2116-
let new_runnable = new_runnable.clone();
2117-
async move {
2118-
Box::pin(self.create_step(
2119-
// conn,
2120-
build,
2121-
i,
2122-
None,
2123-
Some(step),
2124-
finished_drvs,
2125-
new_steps,
2126-
new_runnable,
2127-
))
2128-
.await
2129-
}
2130-
})
2131-
.buffered(25);
2132-
while let Some(v) = tokio_stream::StreamExt::next(&mut stream).await {
2133-
match v {
2180+
let mut stream =
2181+
futures::StreamExt::map(tokio_stream::iter(input_drvs), |(input_path, relation)| {
2182+
let build = build.clone();
2183+
let step = step2.clone();
2184+
let finished_drvs = finished_drvs.clone();
2185+
let new_steps = new_steps.clone();
2186+
let new_runnable = new_runnable.clone();
2187+
2188+
async move {
2189+
Box::pin(self.create_step(
2190+
build,
2191+
input_path,
2192+
None,
2193+
Some((step, relation)),
2194+
finished_drvs,
2195+
new_steps,
2196+
new_runnable,
2197+
))
2198+
.await
2199+
}
2200+
})
2201+
.buffered(25);
2202+
while let Some(result) = tokio_stream::StreamExt::next(&mut stream).await {
2203+
match result {
21342204
CreateStepResult::None => (),
21352205
CreateStepResult::Valid(dep) => {
21362206
if !dep.get_finished() && !dep.get_previous_failure() {

subprojects/hydra-queue-runner/src/state/step.rs

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ use hashbrown::{HashMap, HashSet};
88
use super::{Build, Jobset};
99
use db::models::BuildID;
1010

11+
use super::drv::OutputNameChain;
12+
13+
#[derive(Debug, Clone)]
14+
pub struct ReverseDep {
15+
/// The step that depends on us
16+
pub step: Weak<Step>,
17+
pub relation: OutputNameChain,
18+
}
19+
1120
#[derive(Debug)]
1221
pub struct StepAtomicState {
1322
/// Whether the step has finished initialisation.
@@ -66,10 +75,11 @@ impl StepAtomicState {
6675

6776
#[derive(Debug)]
6877
pub(super) struct StepState {
69-
/// The build steps on which this step depends.
78+
/// The resolved build steps on which this step depends
7079
deps: HashSet<Arc<Step>>,
7180
/// The build steps that depend on this step.
72-
rdeps: Vec<Weak<Step>>,
81+
/// An empty `relation` signifies a regular (non-dynamic) reverse dependency.
82+
rdeps: Vec<ReverseDep>,
7383
/// Builds that have this step as the top-level derivation.
7484
builds: Vec<Weak<Build>>,
7585
/// Jobsets to which this step belongs. Used for determining scheduling priority.
@@ -186,16 +196,6 @@ impl Step {
186196
})
187197
}
188198

189-
pub fn get_input_drvs(&self) -> Option<Vec<nix_utils::StorePath>> {
190-
let drv = self.drv.load_full();
191-
drv.as_ref().map(|drv| {
192-
harmonia_store_core::derivation::DerivationInputs::from(&drv.inputs)
193-
.drvs
194-
.into_keys()
195-
.collect::<Vec<_>>()
196-
})
197-
}
198-
199199
pub fn get_after(&self) -> jiff::Timestamp {
200200
self.atomic_state.after.load()
201201
}
@@ -270,7 +270,9 @@ impl Step {
270270
};
271271

272272
for rdep in rdeps {
273-
let Some(rdep) = rdep.upgrade() else { continue };
273+
let Some(rdep) = rdep.step.upgrade() else {
274+
continue;
275+
};
274276
rdep.get_dependents(builds, steps);
275277
}
276278
}
@@ -286,7 +288,7 @@ impl Step {
286288

287289
let mut state = self.state.write();
288290
state.rdeps.retain(|rdep| {
289-
let Some(rdep) = rdep.upgrade() else {
291+
let Some(rdep) = rdep.step.upgrade() else {
290292
return false;
291293
};
292294

@@ -374,21 +376,45 @@ impl Step {
374376
pub fn make_rdep(self: &Arc<Self>, dep: &Arc<Self>) {
375377
dep.add_dep(self.clone());
376378
let mut state = self.state.write();
377-
state.rdeps.push(Arc::downgrade(dep));
379+
state.rdeps.push(ReverseDep {
380+
step: Arc::downgrade(dep),
381+
relation: OutputNameChain::default(),
382+
});
378383
self.atomic_state
379384
.rdeps_len
380385
.store(state.rdeps.len() as u64, Ordering::Relaxed);
381386
}
382387

383-
pub fn clone_rdeps(&self) -> Vec<Weak<Step>> {
388+
pub fn clone_rdeps(&self) -> Vec<ReverseDep> {
384389
let state = self.state.read();
385390
state.rdeps.clone()
386391
}
387392

393+
/// Pop one level of dynamic indirection from each dynamic rdep,
394+
/// returning `(dependent_step, popped_output_name, remaining_relation)` triples.
395+
///
396+
/// The rdep entries remain in the list (with shortened relations) so that
397+
/// `make_rdeps_runnable` can still clean up forward deps.
398+
///
399+
/// We collect into a `Vec` rather than returning an iterator because the
400+
/// write lock on the step's state must be released before the caller can
401+
/// do async work (e.g. `create_step`) with the results.
402+
pub fn pop_dynamic_rdeps(&self) -> Vec<(Weak<Step>, nix_utils::OutputName, OutputNameChain)> {
403+
let mut state = self.state.write();
404+
state
405+
.rdeps
406+
.iter_mut()
407+
.filter_map(|rdep| {
408+
let output_name = rdep.relation.pop()?;
409+
Some((rdep.step.clone(), output_name, rdep.relation.clone()))
410+
})
411+
.collect()
412+
}
413+
388414
pub fn add_referring_data(
389415
&self,
390416
referring_build: Option<&Arc<Build>>,
391-
referring_step: Option<&Arc<Self>>,
417+
referring_step: Option<(&Arc<Self>, OutputNameChain)>,
392418
) {
393419
if referring_build.is_none() && referring_step.is_none() {
394420
return;
@@ -398,8 +424,11 @@ impl Step {
398424
if let Some(referring_build) = referring_build {
399425
state.builds.push(Arc::downgrade(referring_build));
400426
}
401-
if let Some(referring_step) = referring_step {
402-
state.rdeps.push(Arc::downgrade(referring_step));
427+
if let Some((referring_step, relation)) = referring_step {
428+
state.rdeps.push(ReverseDep {
429+
step: Arc::downgrade(referring_step),
430+
relation,
431+
});
403432
self.atomic_state
404433
.rdeps_len
405434
.store(state.rdeps.len() as u64, Ordering::Relaxed);
@@ -531,7 +560,7 @@ impl Steps {
531560
&self,
532561
drv_path: &nix_utils::StorePath,
533562
referring_build: Option<&Arc<Build>>,
534-
referring_step: Option<&Arc<Step>>,
563+
referring_step: Option<(&Arc<Step>, OutputNameChain)>,
535564
) -> (Arc<Step>, bool) {
536565
let mut is_new = false;
537566
let mut steps = self.inner.write();

subprojects/hydra-tests/content-addressed/dyn-drv-non-trivial.t

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@ use warnings;
44
use Setup;
55
use Test2::V0;
66

7-
# FIXME now that we're properly resolving things in Hydra rather than Nix,
8-
# dynamic derivations stopped-fake working
9-
plan skip_all => 'dynamic derivation resolution not yet implemented';
10-
117
# Adapted from https://github.com/NixOS/nix/blob/master/tests/functional/dyn-drv/non-trivial.nix
128
#
139
# A single derivation uses recursive-nix to dynamically create a DAG of
@@ -43,6 +39,38 @@ if ($wrapper) {
4339
$wrapper->discard_changes;
4440
is($wrapper->finished, 1, "wrapper should be finished");
4541
is($wrapper->buildstatus, 0, "wrapper should succeed");
42+
43+
# Full dynamic derivation chain: 12 steps total
44+
# 1. make-derivations.drv.drv (status=0, build makeDerivations)
45+
# 2. build-a.drv (status=0, build a)
46+
# 3. build-c.drv (status=13, resolve c)
47+
# 4. build-b.drv (status=13, resolve b)
48+
# 5. build-b.drv (status=0, build resolved b)
49+
# 6. build-c.drv (status=0, build resolved c)
50+
# 7. build-d.drv (status=13, resolve d)
51+
# 8. build-d.drv (status=0, build resolved d)
52+
# 9. make-derivations.drv (status=13, resolve e — named after makeDerivations output)
53+
# 10. make-derivations.drv (status=0, build resolved e)
54+
# 11. wrapper.drv (status=13, resolve wrapper)
55+
# 12. wrapper.drv (status=0, build resolved wrapper)
56+
my @steps = $wrapper->buildsteps->search({}, { order_by => 'stepnr' })->all;
57+
is(scalar @steps, 12, "wrapper should have 12 build steps");
58+
59+
# Check that derivations a-d each got a successful (status=0) build step.
60+
# build-e is named make-derivations.drv (the output of makeDerivations),
61+
# so we check for it separately.
62+
my @built = sort map {
63+
my $drv = $_->drvpath // "";
64+
(defined $_->status && $_->status == 0 && $drv =~ m{-build-([a-d])\.drv$}) ? $1 : ()
65+
} @steps;
66+
is(\@built, [qw(a b c d)], "derivations a-d should each have a successful build step");
67+
68+
# build-e is the make-derivations.drv step (status=0, not the .drv.drv)
69+
my @build_e = grep {
70+
my $drv = $_->drvpath // "";
71+
defined $_->status && $_->status == 0 && $drv =~ m{-make-derivations\.drv$}
72+
} @steps;
73+
is(scalar @build_e, 1, "build-e (make-derivations.drv) should have a successful build step");
4674
}
4775

4876
done_testing;

0 commit comments

Comments
 (0)