Skip to content

Commit 2aa461e

Browse files
authored
Merge pull request #1697 from obsidiansystems/dyn-ingest
queue-runner: resolve dynamic derivation dependencies
2 parents 9cb9e8d + dd07515 commit 2aa461e

7 files changed

Lines changed: 453 additions & 105 deletions

File tree

subprojects/crates/db/src/connection.rs

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,41 @@ impl Connection {
395395
Ok(results)
396396
}
397397

398+
/// Look up a single output of a derivation from the most recent
399+
/// successful buildstep.
400+
pub async fn resolve_drv_output(
401+
&mut self,
402+
store_dir: &StoreDir,
403+
drv_path: &StorePath,
404+
output_name: &OutputName,
405+
) -> sqlx::Result<Option<StorePath>> {
406+
let drv_display = store_dir.display(drv_path).to_string();
407+
let output_name_str: &str = output_name.as_ref();
408+
let row: Option<(String,)> = sqlx::query_as(
409+
r"SELECT o.path
410+
FROM buildsteps s
411+
JOIN buildstepoutputs o
412+
ON s.build = o.build AND s.stepnr = o.stepnr
413+
WHERE s.drvPath = $1
414+
AND o.name = $2
415+
AND o.path IS NOT NULL
416+
AND s.status = 0
417+
ORDER BY s.build DESC
418+
LIMIT 1",
419+
)
420+
.bind(&drv_display)
421+
.bind(output_name_str)
422+
.fetch_optional(&mut *self.conn)
423+
.await?;
424+
425+
row.map(|(path,)| {
426+
store_dir
427+
.parse(&path)
428+
.map_err(|e| sqlx::Error::Decode(Box::new(e)))
429+
})
430+
.transpose()
431+
}
432+
398433
#[tracing::instrument(skip(self), err)]
399434
pub async fn get_status(&mut self) -> sqlx::Result<Option<serde_json::Value>> {
400435
Ok(
@@ -1461,4 +1496,142 @@ mod tests {
14611496
]
14621497
);
14631498
}
1499+
1500+
// -- resolve_drv_output (depth-1) tests ------------------------------------
1501+
1502+
#[tokio::test]
1503+
async fn resolve_drv_output_basic() {
1504+
let (_pg, mut conn) = setup().await;
1505+
insert_step(&mut conn, 1, 1, &sp("foo.drv")).await;
1506+
insert_output(&mut conn, 1, 1, "out", &sp("result")).await;
1507+
1508+
let result = conn
1509+
.resolve_drv_output(&test_store_dir(), &sp("foo.drv"), &on("out"))
1510+
.await
1511+
.unwrap();
1512+
assert_eq!(result, Some(sp("result")));
1513+
}
1514+
1515+
#[tokio::test]
1516+
async fn resolve_drv_output_missing() {
1517+
let (_pg, mut conn) = setup().await;
1518+
let result = conn
1519+
.resolve_drv_output(&test_store_dir(), &sp("nonexistent.drv"), &on("out"))
1520+
.await
1521+
.unwrap();
1522+
assert_eq!(result, None);
1523+
}
1524+
1525+
#[tokio::test]
1526+
async fn resolve_drv_output_picks_latest_build() {
1527+
let (_pg, mut conn) = setup().await;
1528+
insert_step(&mut conn, 1, 1, &sp("foo.drv")).await;
1529+
insert_output(&mut conn, 1, 1, "out", &sp("old-result")).await;
1530+
insert_step(&mut conn, 5, 1, &sp("foo.drv")).await;
1531+
insert_output(&mut conn, 5, 1, "out", &sp("new-result")).await;
1532+
1533+
let result = conn
1534+
.resolve_drv_output(&test_store_dir(), &sp("foo.drv"), &on("out"))
1535+
.await
1536+
.unwrap();
1537+
assert_eq!(result, Some(sp("new-result")));
1538+
}
1539+
1540+
// -- Simulate the Rust-side loop that replaces the recursive SQL ----------
1541+
//
1542+
// These mirror the resolved-step tests from the DB-column approach,
1543+
// but use resolve_drv_output + an in-memory map instead of
1544+
// resolvedDrvPath in the SQL.
1545+
1546+
/// Helper: resolve a chain one level at a time using `resolve_drv_output`,
1547+
/// translating through `resolved_map` between levels.
1548+
async fn resolve_chain_with_map(
1549+
conn: &mut Connection,
1550+
resolved_map: &std::collections::HashMap<StorePath, StorePath>,
1551+
root: &StorePath,
1552+
outputs: &[&OutputName],
1553+
) -> Option<StorePath> {
1554+
let sd = test_store_dir();
1555+
let mut current = root.clone();
1556+
for output_name in outputs {
1557+
let translated = resolved_map.get(&current).cloned().unwrap_or(current);
1558+
current = conn
1559+
.resolve_drv_output(&sd, &translated, output_name)
1560+
.await
1561+
.unwrap()?;
1562+
}
1563+
Some(current)
1564+
}
1565+
1566+
/// Depth-1: unresolved.drv was resolved to resolved.drv, which has
1567+
/// the outputs. The in-memory map translates before lookup.
1568+
#[tokio::test]
1569+
async fn resolve_with_map_depth_1() {
1570+
let (_pg, mut conn) = setup().await;
1571+
1572+
// resolved.drv was built successfully
1573+
insert_step(&mut conn, 2, 1, &sp("resolved.drv")).await;
1574+
insert_output(&mut conn, 2, 1, "out", &sp("result")).await;
1575+
1576+
let mut map = std::collections::HashMap::new();
1577+
map.insert(sp("unresolved.drv"), sp("resolved.drv"));
1578+
1579+
let result =
1580+
resolve_chain_with_map(&mut conn, &map, &sp("unresolved.drv"), &[&on("out")]).await;
1581+
assert_eq!(result, Some(sp("result")));
1582+
}
1583+
1584+
/// Depth-2: unresolved.drv was resolved to resolved.drv, whose output
1585+
/// is an intermediate.drv that has the final output.
1586+
#[tokio::test]
1587+
async fn resolve_with_map_depth_2() {
1588+
let (_pg, mut conn) = setup().await;
1589+
1590+
insert_step(&mut conn, 2, 1, &sp("resolved.drv")).await;
1591+
insert_output(&mut conn, 2, 1, "out", &sp("intermediate.drv")).await;
1592+
insert_step(&mut conn, 3, 1, &sp("intermediate.drv")).await;
1593+
insert_output(&mut conn, 3, 1, "out", &sp("final")).await;
1594+
1595+
let mut map = std::collections::HashMap::new();
1596+
map.insert(sp("unresolved.drv"), sp("resolved.drv"));
1597+
1598+
let result = resolve_chain_with_map(
1599+
&mut conn,
1600+
&map,
1601+
&sp("unresolved.drv"),
1602+
&[&on("out"), &on("out")],
1603+
)
1604+
.await;
1605+
assert_eq!(result, Some(sp("final")));
1606+
}
1607+
1608+
/// Depth-2 where the intermediate result was also resolved:
1609+
/// root.drv.drv (not resolved) → intermediate.drv (resolved) → final
1610+
#[tokio::test]
1611+
async fn resolve_with_map_intermediate_resolved() {
1612+
let (_pg, mut conn) = setup().await;
1613+
1614+
// root.drv.drv^out → unresolved-intermediate.drv
1615+
insert_step(&mut conn, 1, 1, &sp("root.drv.drv")).await;
1616+
insert_output(&mut conn, 1, 1, "out", &sp("unresolved-intermediate.drv")).await;
1617+
1618+
// resolved-intermediate.drv^out → final-result
1619+
insert_step(&mut conn, 2, 1, &sp("resolved-intermediate.drv")).await;
1620+
insert_output(&mut conn, 2, 1, "out", &sp("final-result")).await;
1621+
1622+
let mut map = std::collections::HashMap::new();
1623+
map.insert(
1624+
sp("unresolved-intermediate.drv"),
1625+
sp("resolved-intermediate.drv"),
1626+
);
1627+
1628+
let result = resolve_chain_with_map(
1629+
&mut conn,
1630+
&map,
1631+
&sp("root.drv.drv"),
1632+
&[&on("out"), &on("out")],
1633+
)
1634+
.await;
1635+
assert_eq!(result, Some(sp("final-result")));
1636+
}
14641637
}

subprojects/hydra-queue-runner/src/state/drv.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::BTreeSet;
2+
13
use nix_utils::SingleDerivedPath;
24

35
/// Output names of intermediate derivations for a dynamic derivation
@@ -42,3 +44,21 @@ pub fn flatten_chain(
4244
chain.0.push(output_name.clone());
4345
(root, chain)
4446
}
47+
48+
/// Extract `Built` input dependencies from a derivation.
49+
///
50+
/// Returns `(root_drv_path, relation)` pairs. `Opaque` (source) inputs are
51+
/// skipped — only derivation build dependencies are returned. For each
52+
/// `Built` input, the outermost output name (what we consume) is discarded;
53+
/// intermediate output names form the [`OutputNameChain`].
54+
pub fn input_drvs(
55+
drv: &nix_utils::Derivation,
56+
) -> BTreeSet<(nix_utils::StorePath, OutputNameChain)> {
57+
drv.inputs
58+
.iter()
59+
.filter_map(|sdp| match sdp {
60+
SingleDerivedPath::Opaque(_) => None,
61+
SingleDerivedPath::Built { drv_path, .. } => Some(flatten_path(drv_path)),
62+
})
63+
.collect()
64+
}

subprojects/hydra-queue-runner/src/state/mod.rs

Lines changed: 102 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ impl State {
521521
// original step's drv path differs from the resolved one, so
522522
// completing the resolved step wouldn't clear the dep).
523523
for rdep in step_info.step.clone_rdeps() {
524-
if let Some(rdep) = rdep.upgrade() {
524+
if let Some(rdep) = rdep.step.upgrade() {
525525
rdep.remove_dep(&step_info.step);
526526
resolved_step.make_rdep(&rdep);
527527
}
@@ -1388,6 +1388,74 @@ impl State {
13881388
tx.commit().await?;
13891389
}
13901390

1391+
// Process dynamic rdeps first, as we must add new step dependencies for dynamically
1392+
// generated derivations
1393+
{
1394+
for (dep_step, output_name, relation) in item.step_info.step.pop_dynamic_rdeps() {
1395+
let Some(dependent_step) = dep_step.upgrade() else {
1396+
continue;
1397+
};
1398+
1399+
let resolved_drv = output.outputs.get(&output_name).cloned().ok_or_else(|| {
1400+
anyhow::anyhow!(
1401+
"Dynamic rdep references output `{output_name}` not produced by {drv_path}"
1402+
)
1403+
})?;
1404+
1405+
// Find a build associated with this step. For intermediate steps
1406+
// (not top-level), `direct` is empty, so we walk the dependency
1407+
// chain via `get_dependents` to find the owning build.
1408+
let build = if let Some(b) = direct.get(0) {
1409+
b.clone()
1410+
} else {
1411+
let mut dependents = HashSet::new();
1412+
let mut visited_steps = HashSet::new();
1413+
item.step_info
1414+
.step
1415+
.get_dependents(&mut dependents, &mut visited_steps);
1416+
let Some(b) = dependents.into_iter().next() else {
1417+
tracing::warn!("Finished step does not have associated build");
1418+
continue;
1419+
};
1420+
b
1421+
};
1422+
1423+
// Create the actual step for the new derivation.
1424+
// finished_drvs is not necessary as it is only a memoization table to reduce
1425+
// checks if a dependency is finished from the database.
1426+
// new_steps is not necessary either as
1427+
let new_runnable: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>> = Default::default();
1428+
let new_step = match self
1429+
.create_step(
1430+
build.clone(),
1431+
resolved_drv.clone(),
1432+
None,
1433+
Some((dependent_step.clone(), relation)),
1434+
Default::default(),
1435+
Default::default(),
1436+
new_runnable.clone(),
1437+
)
1438+
.await
1439+
{
1440+
CreateStepResult::None => continue,
1441+
CreateStepResult::Valid(step) => step,
1442+
CreateStepResult::PreviousFailure(step) => {
1443+
if let Err(e) = self.handle_previous_failure(build.clone(), step).await {
1444+
tracing::error!("Failed to handle previous failure: {e}");
1445+
}
1446+
// TODO: figure out what to do here
1447+
continue;
1448+
}
1449+
};
1450+
1451+
for r in new_runnable.read().iter() {
1452+
r.make_runnable();
1453+
}
1454+
1455+
// create_step already added rdeps, but we need to add a forward dep as well
1456+
dependent_step.add_dep(new_step);
1457+
}
1458+
}
13911459
item.step_info.step.make_rdeps_runnable();
13921460

13931461
// always trigger dispatch, as we now might have a free machine again
@@ -1933,7 +2001,7 @@ impl State {
19332001
build: Arc<Build>,
19342002
drv_path: nix_utils::StorePath,
19352003
referring_build: Option<Arc<Build>>,
1936-
referring_step: Option<Arc<Step>>,
2004+
referring_step: Option<(Arc<Step>, drv::OutputNameChain)>,
19372005
finished_drvs: Arc<parking_lot::RwLock<HashSet<nix_utils::StorePath>>>,
19382006
new_steps: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>>,
19392007
new_runnable: Arc<parking_lot::RwLock<HashSet<Arc<Step>>>>,
@@ -1947,9 +2015,13 @@ impl State {
19472015
}
19482016
}
19492017

1950-
let (step, is_new) =
1951-
self.steps
1952-
.create(&drv_path, referring_build.as_ref(), referring_step.as_ref());
2018+
let (step, is_new) = self.steps.create(
2019+
&drv_path,
2020+
referring_build.as_ref(),
2021+
referring_step
2022+
.as_ref()
2023+
.map(|(step, relation)| (step, relation.clone())),
2024+
);
19532025
if !is_new {
19542026
return CreateStepResult::Valid(step);
19552027
}
@@ -2045,6 +2117,7 @@ impl State {
20452117
self.store.query_missing_outputs(output_paths).await
20462118
};
20472119

2120+
let input_drvs = drv::input_drvs(&drv);
20482121
step.set_drv(drv);
20492122

20502123
if self.check_cached_failure(step.clone()).await {
@@ -2102,35 +2175,32 @@ impl State {
21022175
}
21032176

21042177
tracing::debug!("creating build step '{drv_path}");
2105-
let Some(input_drvs) = step.get_input_drvs() else {
2106-
// this should never happen because we always a a drv set at this point in time
2107-
return CreateStepResult::None;
2108-
};
21092178

21102179
let step2 = step.clone();
2111-
let mut stream = futures::StreamExt::map(tokio_stream::iter(input_drvs), |i| {
2112-
let build = build.clone();
2113-
let step = step2.clone();
2114-
let finished_drvs = finished_drvs.clone();
2115-
let new_steps = new_steps.clone();
2116-
let new_runnable = new_runnable.clone();
2117-
async move {
2118-
Box::pin(self.create_step(
2119-
// conn,
2120-
build,
2121-
i,
2122-
None,
2123-
Some(step),
2124-
finished_drvs,
2125-
new_steps,
2126-
new_runnable,
2127-
))
2128-
.await
2129-
}
2130-
})
2131-
.buffered(25);
2132-
while let Some(v) = tokio_stream::StreamExt::next(&mut stream).await {
2133-
match v {
2180+
let mut stream =
2181+
futures::StreamExt::map(tokio_stream::iter(input_drvs), |(input_path, relation)| {
2182+
let build = build.clone();
2183+
let step = step2.clone();
2184+
let finished_drvs = finished_drvs.clone();
2185+
let new_steps = new_steps.clone();
2186+
let new_runnable = new_runnable.clone();
2187+
2188+
async move {
2189+
Box::pin(self.create_step(
2190+
build,
2191+
input_path,
2192+
None,
2193+
Some((step, relation)),
2194+
finished_drvs,
2195+
new_steps,
2196+
new_runnable,
2197+
))
2198+
.await
2199+
}
2200+
})
2201+
.buffered(25);
2202+
while let Some(result) = tokio_stream::StreamExt::next(&mut stream).await {
2203+
match result {
21342204
CreateStepResult::None => (),
21352205
CreateStepResult::Valid(dep) => {
21362206
if !dep.get_finished() && !dep.get_previous_failure() {

0 commit comments

Comments
 (0)