Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions subprojects/crates/db/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,41 @@ impl Connection {
Ok(results)
}

/// Look up a single output of a derivation from the most recent
/// successful buildstep.
pub async fn resolve_drv_output(
&mut self,
store_dir: &StoreDir,
drv_path: &StorePath,
output_name: &OutputName,
) -> sqlx::Result<Option<StorePath>> {
let drv_display = store_dir.display(drv_path).to_string();
let output_name_str: &str = output_name.as_ref();
let row: Option<(String,)> = sqlx::query_as(
r"SELECT o.path
FROM buildsteps s
JOIN buildstepoutputs o
ON s.build = o.build AND s.stepnr = o.stepnr
WHERE s.drvPath = $1
AND o.name = $2
AND o.path IS NOT NULL
AND s.status = 0
ORDER BY s.build DESC
LIMIT 1",
)
.bind(&drv_display)
.bind(output_name_str)
.fetch_optional(&mut *self.conn)
.await?;

row.map(|(path,)| {
store_dir
.parse(&path)
.map_err(|e| sqlx::Error::Decode(Box::new(e)))
})
.transpose()
}

#[tracing::instrument(skip(self), err)]
pub async fn get_status(&mut self) -> sqlx::Result<Option<serde_json::Value>> {
Ok(
Expand Down Expand Up @@ -1461,4 +1496,142 @@ mod tests {
]
);
}

// -- resolve_drv_output (depth-1) tests ------------------------------------

#[tokio::test]
async fn resolve_drv_output_basic() {
let (_pg, mut conn) = setup().await;
insert_step(&mut conn, 1, 1, &sp("foo.drv")).await;
insert_output(&mut conn, 1, 1, "out", &sp("result")).await;

let result = conn
.resolve_drv_output(&test_store_dir(), &sp("foo.drv"), &on("out"))
.await
.unwrap();
assert_eq!(result, Some(sp("result")));
}

#[tokio::test]
async fn resolve_drv_output_missing() {
let (_pg, mut conn) = setup().await;
let result = conn
.resolve_drv_output(&test_store_dir(), &sp("nonexistent.drv"), &on("out"))
.await
.unwrap();
assert_eq!(result, None);
}

#[tokio::test]
async fn resolve_drv_output_picks_latest_build() {
let (_pg, mut conn) = setup().await;
insert_step(&mut conn, 1, 1, &sp("foo.drv")).await;
insert_output(&mut conn, 1, 1, "out", &sp("old-result")).await;
insert_step(&mut conn, 5, 1, &sp("foo.drv")).await;
insert_output(&mut conn, 5, 1, "out", &sp("new-result")).await;

let result = conn
.resolve_drv_output(&test_store_dir(), &sp("foo.drv"), &on("out"))
.await
.unwrap();
assert_eq!(result, Some(sp("new-result")));
}

// -- Simulate the Rust-side loop that replaces the recursive SQL ----------
//
// These mirror the resolved-step tests from the DB-column approach,
// but use resolve_drv_output + an in-memory map instead of
// resolvedDrvPath in the SQL.

/// Helper: resolve a chain one level at a time using `resolve_drv_output`,
/// translating through `resolved_map` between levels.
async fn resolve_chain_with_map(
conn: &mut Connection,
resolved_map: &std::collections::HashMap<StorePath, StorePath>,
root: &StorePath,
outputs: &[&OutputName],
) -> Option<StorePath> {
let sd = test_store_dir();
let mut current = root.clone();
for output_name in outputs {
let translated = resolved_map.get(&current).cloned().unwrap_or(current);
current = conn
.resolve_drv_output(&sd, &translated, output_name)
.await
.unwrap()?;
}
Some(current)
}

/// Depth-1: unresolved.drv was resolved to resolved.drv, which has
/// the outputs. The in-memory map translates before lookup.
#[tokio::test]
async fn resolve_with_map_depth_1() {
let (_pg, mut conn) = setup().await;

// resolved.drv was built successfully
insert_step(&mut conn, 2, 1, &sp("resolved.drv")).await;
insert_output(&mut conn, 2, 1, "out", &sp("result")).await;

let mut map = std::collections::HashMap::new();
map.insert(sp("unresolved.drv"), sp("resolved.drv"));

let result =
resolve_chain_with_map(&mut conn, &map, &sp("unresolved.drv"), &[&on("out")]).await;
assert_eq!(result, Some(sp("result")));
}

/// Depth-2: unresolved.drv was resolved to resolved.drv, whose output
/// is an intermediate.drv that has the final output.
#[tokio::test]
async fn resolve_with_map_depth_2() {
let (_pg, mut conn) = setup().await;

insert_step(&mut conn, 2, 1, &sp("resolved.drv")).await;
insert_output(&mut conn, 2, 1, "out", &sp("intermediate.drv")).await;
insert_step(&mut conn, 3, 1, &sp("intermediate.drv")).await;
insert_output(&mut conn, 3, 1, "out", &sp("final")).await;

let mut map = std::collections::HashMap::new();
map.insert(sp("unresolved.drv"), sp("resolved.drv"));

let result = resolve_chain_with_map(
&mut conn,
&map,
&sp("unresolved.drv"),
&[&on("out"), &on("out")],
)
.await;
assert_eq!(result, Some(sp("final")));
}

/// Depth-2 where the intermediate result was also resolved:
/// root.drv.drv (not resolved) → intermediate.drv (resolved) → final
#[tokio::test]
async fn resolve_with_map_intermediate_resolved() {
let (_pg, mut conn) = setup().await;

// root.drv.drv^out → unresolved-intermediate.drv
insert_step(&mut conn, 1, 1, &sp("root.drv.drv")).await;
insert_output(&mut conn, 1, 1, "out", &sp("unresolved-intermediate.drv")).await;

// resolved-intermediate.drv^out → final-result
insert_step(&mut conn, 2, 1, &sp("resolved-intermediate.drv")).await;
insert_output(&mut conn, 2, 1, "out", &sp("final-result")).await;

let mut map = std::collections::HashMap::new();
map.insert(
sp("unresolved-intermediate.drv"),
sp("resolved-intermediate.drv"),
);

let result = resolve_chain_with_map(
&mut conn,
&map,
&sp("root.drv.drv"),
&[&on("out"), &on("out")],
)
.await;
assert_eq!(result, Some(sp("final-result")));
}
}
20 changes: 20 additions & 0 deletions subprojects/hydra-queue-runner/src/state/drv.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeSet;

use nix_utils::SingleDerivedPath;

/// Output names of intermediate derivations for a dynamic derivation
Expand Down Expand Up @@ -42,3 +44,21 @@ pub fn flatten_chain(
chain.0.push(output_name.clone());
(root, chain)
}

/// Extract `Built` input dependencies from a derivation.
///
/// Returns `(root_drv_path, relation)` pairs. `Opaque` (source) inputs are
/// skipped — only derivation build dependencies are returned. For each
/// `Built` input, the outermost output name (what we consume) is discarded;
/// intermediate output names form the [`OutputNameChain`].
pub fn input_drvs(
drv: &nix_utils::Derivation,
) -> BTreeSet<(nix_utils::StorePath, OutputNameChain)> {
drv.inputs
.iter()
.filter_map(|sdp| match sdp {
SingleDerivedPath::Opaque(_) => None,
SingleDerivedPath::Built { drv_path, .. } => Some(flatten_path(drv_path)),
})
.collect()
}
134 changes: 102 additions & 32 deletions subprojects/hydra-queue-runner/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ impl State {
// original step's drv path differs from the resolved one, so
// completing the resolved step wouldn't clear the dep).
for rdep in step_info.step.clone_rdeps() {
if let Some(rdep) = rdep.upgrade() {
if let Some(rdep) = rdep.step.upgrade() {
rdep.remove_dep(&step_info.step);
resolved_step.make_rdep(&rdep);
}
Expand Down Expand Up @@ -1388,6 +1388,74 @@ impl State {
tx.commit().await?;
}

// Process dynamic rdeps first, as we must add new step dependencies for dynamically
// generated derivations
{
for (dep_step, output_name, relation) in item.step_info.step.pop_dynamic_rdeps() {
let Some(dependent_step) = dep_step.upgrade() else {
continue;
};

let resolved_drv = output.outputs.get(&output_name).cloned().ok_or_else(|| {
anyhow::anyhow!(
"Dynamic rdep references output `{output_name}` not produced by {drv_path}"
)
})?;

// Find a build associated with this step. For intermediate steps
// (not top-level), `direct` is empty, so we walk the dependency
// chain via `get_dependents` to find the owning build.
let build = if let Some(b) = direct.get(0) {
b.clone()
} else {
let mut dependents = HashSet::new();
let mut visited_steps = HashSet::new();
item.step_info
.step
.get_dependents(&mut dependents, &mut visited_steps);
let Some(b) = dependents.into_iter().next() else {
tracing::warn!("Finished step does not have associated build");
continue;
};
b
};

// Create the actual step for the new derivation.
// finished_drvs is not necessary as it is only a memoization table to reduce
// checks if a dependency is finished from the database.
// new_steps is not necessary either as
let new_runnable: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>> = Default::default();
let new_step = match self
.create_step(
build.clone(),
resolved_drv.clone(),
None,
Some((dependent_step.clone(), relation)),
Default::default(),
Default::default(),
new_runnable.clone(),
)
.await
{
CreateStepResult::None => continue,
CreateStepResult::Valid(step) => step,
CreateStepResult::PreviousFailure(step) => {
if let Err(e) = self.handle_previous_failure(build.clone(), step).await {
tracing::error!("Failed to handle previous failure: {e}");
}
// TODO: figure out what to do here
continue;
}
};

for r in new_runnable.read().iter() {
r.make_runnable();
}

// create_step already added rdeps, but we need to add a forward dep as well
dependent_step.add_dep(new_step);
}
}
item.step_info.step.make_rdeps_runnable();

// always trigger dispatch, as we now might have a free machine again
Expand Down Expand Up @@ -1933,7 +2001,7 @@ impl State {
build: Arc<Build>,
drv_path: nix_utils::StorePath,
referring_build: Option<Arc<Build>>,
referring_step: Option<Arc<Step>>,
referring_step: Option<(Arc<Step>, drv::OutputNameChain)>,
finished_drvs: Arc<parking_lot::RwLock<HashSet<nix_utils::StorePath>>>,
new_steps: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>>,
new_runnable: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>>,
Expand All @@ -1947,9 +2015,13 @@ impl State {
}
}

let (step, is_new) =
self.steps
.create(&drv_path, referring_build.as_ref(), referring_step.as_ref());
let (step, is_new) = self.steps.create(
&drv_path,
referring_build.as_ref(),
referring_step
.as_ref()
.map(|(step, relation)| (step, relation.clone())),
);
if !is_new {
return CreateStepResult::Valid(step);
}
Expand Down Expand Up @@ -2045,6 +2117,7 @@ impl State {
self.store.query_missing_outputs(output_paths).await
};

let input_drvs = drv::input_drvs(&drv);
step.set_drv(drv);

if self.check_cached_failure(step.clone()).await {
Expand Down Expand Up @@ -2102,35 +2175,32 @@ impl State {
}

tracing::debug!("creating build step '{drv_path}");
let Some(input_drvs) = step.get_input_drvs() else {
// this should never happen because we always a a drv set at this point in time
return CreateStepResult::None;
};

let step2 = step.clone();
let mut stream = futures::StreamExt::map(tokio_stream::iter(input_drvs), |i| {
let build = build.clone();
let step = step2.clone();
let finished_drvs = finished_drvs.clone();
let new_steps = new_steps.clone();
let new_runnable = new_runnable.clone();
async move {
Box::pin(self.create_step(
// conn,
build,
i,
None,
Some(step),
finished_drvs,
new_steps,
new_runnable,
))
.await
}
})
.buffered(25);
while let Some(v) = tokio_stream::StreamExt::next(&mut stream).await {
match v {
let mut stream =
futures::StreamExt::map(tokio_stream::iter(input_drvs), |(input_path, relation)| {
let build = build.clone();
let step = step2.clone();
let finished_drvs = finished_drvs.clone();
let new_steps = new_steps.clone();
let new_runnable = new_runnable.clone();

async move {
Box::pin(self.create_step(
build,
input_path,
None,
Some((step, relation)),
finished_drvs,
new_steps,
new_runnable,
))
.await
}
})
.buffered(25);
while let Some(result) = tokio_stream::StreamExt::next(&mut stream).await {
match result {
CreateStepResult::None => (),
CreateStepResult::Valid(dep) => {
if !dep.get_finished() && !dep.get_previous_failure() {
Expand Down
Loading