Skip to content

Commit 3e6dae1

Browse files
amaanqartemistEricson2314
committed
queue-runner: resolve CA derivations at dispatch time
Instead of resolving at StepInfo construction and carrying two drv identities through the gRPC layer, resolve in realise_drv_on_valid_machine once all deps are built. If resolution yields a different drv, the original step is marked Resolved and a new DB step is created for the resolved drv with a resolvedTo FK linking them. The builder only ever sees one drv. We create a new Step for that resolution and bunt it back to the scheduler. This grants us more flexibility in execution and the method can be used in the future for dynamic derivations, which won't map 1:1 with the original derivations. We also now only create database steps when we are sure they are necessary, reducing the number of duplicate `BuildStep` rows. In order to make tests more consistent, CA derivations will fail if they cannot be fully resolved. Otherwise, there could be inconsistent successes depending on which builder a step was performed on. As part of this, add local outputs to resolution table With the current queue-runner design, all dependency outputs of a CAFloating derivation must be recorded in the hydra database. This is true for things built or substituted by hydra, but until now not by things found on the local nix store. This may occur for outputs that are part of the system configuration. Therefore, add all local outputs that are not already in the database to the resolution table upon creating a step. This makes it possible to build derivations from `contentAddressedByDefault` nixpkgs. Co-Authored-By: Artemis Tosini <artemis.tosini@obsidian.systems> Co-Authored-By: John Ericson <John.Ericson@Obsidian.Systems> Co-Authored-By: Amaan Qureshi <git@amaanq.com>
1 parent 168c672 commit 3e6dae1

16 files changed

Lines changed: 599 additions & 71 deletions

File tree

subprojects/crates/db/src/connection.rs

Lines changed: 233 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,20 @@ impl Connection {
355355
CROSS JOIN LATERAL (
356356
SELECT o.path
357357
FROM buildsteps s
358+
-- If this step was resolved, look up outputs from
359+
-- the resolved drv's successful buildstep instead.
360+
-- resolvedDrvPath is a bare basename; drvPath is a
361+
-- full path, so strip the directory prefix to compare.
362+
LEFT JOIN buildsteps sr
363+
ON sr.drvPath = $2 || '/' || s.resolvedDrvPath
364+
AND sr.status = 0
358365
JOIN buildstepoutputs o
359-
ON s.build = o.build AND s.stepnr = o.stepnr
366+
ON o.build = COALESCE(sr.build, s.build)
367+
AND o.stepnr = COALESCE(sr.stepnr, s.stepnr)
360368
WHERE s.drvPath = r.drv_path
361369
AND o.name = i.chain[r.step]
362370
AND o.path IS NOT NULL
363-
AND s.status = 0
371+
AND (s.status = 0 OR s.status = 13)
364372
ORDER BY s.build DESC
365373
LIMIT 1
366374
) sub
@@ -376,6 +384,7 @@ impl Connection {
376384
",
377385
)
378386
.bind(&json_input)
387+
.bind(store_dir.to_str())
379388
.fetch_all(&mut *self.conn)
380389
.await?;
381390

@@ -693,6 +702,36 @@ impl Transaction<'_> {
693702
Ok(())
694703
}
695704

705+
#[tracing::instrument(skip(self), err)]
706+
pub async fn find_build_step_outputs(
707+
&mut self,
708+
store_dir: &StoreDir,
709+
drv_path: &StorePath,
710+
) -> sqlx::Result<BTreeMap<OutputName, StorePath>> {
711+
let drv_path = store_dir.display(drv_path).to_string();
712+
let items: Vec<(String, String)> = sqlx::query_as(
713+
r"SELECT o.name, o.path
714+
FROM buildstepoutputs o
715+
JOIN buildsteps s ON s.build = o.build AND s.stepnr = o.stepnr
716+
WHERE s.drvpath = $1 AND o.path IS NOT NULL",
717+
)
718+
.bind(drv_path)
719+
.fetch_all(&mut *self.tx)
720+
.await?;
721+
722+
items
723+
.into_iter()
724+
.map(|(name, path)| -> anyhow::Result<_> {
725+
let name: OutputName = name.parse().context("invalid output name from DB")?;
726+
let path: StorePath = store_dir
727+
.parse(&path)
728+
.context("invalid store path from DB")?;
729+
Ok((name, path))
730+
})
731+
.collect::<anyhow::Result<_>>()
732+
.map_err(|e| sqlx::Error::Decode(e.into_boxed_dyn_error()))
733+
}
734+
696735
#[tracing::instrument(skip(self, res), err)]
697736
pub async fn update_build_step_in_finish(
698737
&mut self,
@@ -941,6 +980,83 @@ impl Transaction<'_> {
941980
Ok(step_nr)
942981
}
943982

983+
/// Record which resolved drv path this step was resolved to.
984+
#[tracing::instrument(skip(self), err)]
985+
pub async fn set_resolved_to(
986+
&mut self,
987+
origin_build_id: crate::models::BuildID,
988+
origin_step_nr: i32,
989+
resolved_drv_path: &StorePath,
990+
) -> sqlx::Result<()> {
991+
sqlx::query(
992+
r"
993+
UPDATE buildsteps
994+
SET resolvedDrvPath = $3
995+
WHERE build = $1 AND stepnr = $2
996+
",
997+
)
998+
.bind(origin_build_id)
999+
.bind(origin_step_nr)
1000+
.bind(resolved_drv_path.to_string())
1001+
.execute(&mut *self.tx)
1002+
.await?;
1003+
Ok(())
1004+
}
1005+
1006+
#[tracing::instrument(
1007+
skip(self, start_time, stop_time, build_id, drv_path, outputs,),
1008+
err,
1009+
ret
1010+
)]
1011+
pub async fn create_local_step(
1012+
&mut self,
1013+
store_dir: &StoreDir,
1014+
start_time: i32,
1015+
stop_time: i32,
1016+
build_id: crate::models::BuildID,
1017+
drv_path: &StorePath,
1018+
outputs: BTreeMap<OutputName, StorePath>,
1019+
) -> anyhow::Result<i32> {
1020+
let step_nr = loop {
1021+
if let Some(step_nr) = self
1022+
.insert_build_step(
1023+
store_dir,
1024+
InsertBuildStep {
1025+
build_id,
1026+
r#type: crate::models::BuildType::Substitution,
1027+
drv_path,
1028+
status: BuildStatus::Success,
1029+
busy: false,
1030+
start_time: Some(start_time),
1031+
stop_time: Some(stop_time),
1032+
platform: None,
1033+
propagated_from: None,
1034+
error_msg: None,
1035+
machine: "",
1036+
},
1037+
)
1038+
.await?
1039+
{
1040+
break step_nr;
1041+
}
1042+
};
1043+
1044+
let output_items: Vec<_> = outputs
1045+
.into_iter()
1046+
.map(|(name, path)| InsertBuildStepOutput {
1047+
build_id,
1048+
step_nr,
1049+
name,
1050+
path: Some(path),
1051+
})
1052+
.collect();
1053+
1054+
self.insert_build_step_outputs(store_dir, &output_items)
1055+
.await?;
1056+
1057+
Ok(step_nr)
1058+
}
1059+
9441060
#[tracing::instrument(
9451061
skip(self, start_time, stop_time, build_id, drv_path, output,),
9461062
err,
@@ -1191,11 +1307,24 @@ mod tests {
11911307
}
11921308

11931309
async fn insert_step(conn: &mut Connection, build: i32, stepnr: i32, drv_path: &StorePath) {
1310+
insert_step_with_status(conn, build, stepnr, drv_path, 0, None).await;
1311+
}
1312+
1313+
async fn insert_step_with_status(
1314+
conn: &mut Connection,
1315+
build: i32,
1316+
stepnr: i32,
1317+
drv_path: &StorePath,
1318+
status: i32,
1319+
resolved_drv_path: Option<&StorePath>,
1320+
) {
11941321
let sd = test_store_dir();
1195-
sqlx::query("INSERT INTO BuildSteps (build, stepnr, type, busy, drvPath, status) VALUES ($1, $2, 0, 0, $3, 0)")
1322+
sqlx::query("INSERT INTO BuildSteps (build, stepnr, type, busy, drvPath, status, resolvedDrvPath) VALUES ($1, $2, 0, 0, $3, $4, $5)")
11961323
.bind(build)
11971324
.bind(stepnr)
11981325
.bind(sd.display(drv_path).to_string())
1326+
.bind(status)
1327+
.bind(resolved_drv_path.map(|p| p.to_string()))
11991328
.execute(&mut *conn.conn)
12001329
.await
12011330
.unwrap();
@@ -1333,6 +1462,70 @@ mod tests {
13331462
);
13341463
}
13351464

1465+
/// A step that was resolved (status=13) with `resolvedDrvPath` pointing
1466+
/// to a different drv whose successful buildstep has the outputs.
1467+
#[tokio::test]
1468+
async fn resolve_through_resolved_step() {
1469+
let (_pg, mut conn) = setup().await;
1470+
1471+
// Step 1: unresolved ca-depending-on-ca.drv, status=Resolved(13),
1472+
// resolvedDrvPath points to the resolved drv
1473+
insert_step_with_status(
1474+
&mut conn,
1475+
1,
1476+
1,
1477+
&sp("unresolved.drv"),
1478+
13,
1479+
Some(&sp("resolved.drv")),
1480+
)
1481+
.await;
1482+
// A successful buildstep for the resolved drv (could be any build)
1483+
insert_step(&mut conn, 2, 1, &sp("resolved.drv")).await;
1484+
insert_output(&mut conn, 2, 1, "out", &sp("result")).await;
1485+
1486+
// Looking up via the unresolved drv path should find the output
1487+
// through the resolvedDrvPath chain.
1488+
let results = conn
1489+
.resolve_drv_output_chains(&test_store_dir(), &[(&sp("unresolved.drv"), &[&on("out")])])
1490+
.await
1491+
.unwrap();
1492+
assert_eq!(results, vec![Some(sp("result"))]);
1493+
}
1494+
1495+
/// A depth-2 chain where the first step was resolved:
1496+
/// unresolved.drv (status=13, resolvedDrvPath→resolved.drv) →
1497+
/// resolved.drv outputs a .drv path → that .drv has the final output.
1498+
#[tokio::test]
1499+
async fn resolve_depth_2_through_resolved_step() {
1500+
let (_pg, mut conn) = setup().await;
1501+
1502+
// Build 1: unresolved step, resolved to bbb-resolved.drv
1503+
insert_step_with_status(
1504+
&mut conn,
1505+
1,
1506+
1,
1507+
&sp("unresolved.drv"),
1508+
13,
1509+
Some(&sp("resolved.drv")),
1510+
)
1511+
.await;
1512+
insert_step(&mut conn, 2, 1, &sp("resolved.drv")).await;
1513+
insert_output(&mut conn, 2, 1, "out", &sp("intermediate.drv")).await;
1514+
1515+
// Build 3: the intermediate drv
1516+
insert_step(&mut conn, 3, 1, &sp("intermediate.drv")).await;
1517+
insert_output(&mut conn, 3, 1, "out", &sp("final")).await;
1518+
1519+
let results = conn
1520+
.resolve_drv_output_chains(
1521+
&test_store_dir(),
1522+
&[(&sp("unresolved.drv"), &[&on("out"), &on("out")])],
1523+
)
1524+
.await
1525+
.unwrap();
1526+
assert_eq!(results, vec![Some(sp("final"))]);
1527+
}
1528+
13361529
/// Batch with ragged depths: one depth-1 (Opaque), one depth-2 (Built),
13371530
/// one depth-3 (Built(Built(...))).
13381531
#[tokio::test]
@@ -1377,4 +1570,41 @@ mod tests {
13771570
]
13781571
);
13791572
}
1573+
1574+
/// Depth-1 lookup where the only buildstep for the drv has
1575+
/// status=Resolved(13) with `resolvedDrvPath` pointing to
1576+
/// a different drv whose successful buildstep has the outputs.
1577+
/// This matches the production scenario: ca-depending-on-ca.drv
1578+
/// was resolved to a different drv, and ca-depending-on-ca-
1579+
/// depending-on-ca needs to look up its output.
1580+
#[tokio::test]
1581+
async fn resolve_depth_1_via_resolved_step() {
1582+
let (_pg, mut conn) = setup().await;
1583+
1584+
// Build 1, step 1: unresolved ca-depending-on-ca.drv
1585+
// status=13 (Resolved), resolvedDrvPath points to the resolved drv
1586+
insert_step_with_status(
1587+
&mut conn,
1588+
1,
1589+
1,
1590+
&sp("unresolved-ca-dep.drv"),
1591+
13,
1592+
Some(&sp("resolved-ca-dep.drv")),
1593+
)
1594+
.await;
1595+
// Build 2: the resolved drv was built successfully
1596+
insert_step(&mut conn, 2, 1, &sp("resolved-ca-dep.drv")).await;
1597+
insert_output(&mut conn, 2, 1, "out", &sp("ca-dep-output")).await;
1598+
1599+
// Depth-1 chain: look up "out" of the unresolved drv path.
1600+
// The query should follow resolvedDrvPath to find the output.
1601+
let results = conn
1602+
.resolve_drv_output_chains(
1603+
&test_store_dir(),
1604+
&[(&sp("unresolved-ca-dep.drv"), &[&on("out")])],
1605+
)
1606+
.await
1607+
.unwrap();
1608+
assert_eq!(results, vec![Some(sp("ca-dep-output"))]);
1609+
}
13801610
}

subprojects/crates/db/src/models.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ pub enum BuildStatus {
1919
LogLimitExceeded = 10,
2020
NarSizeLimitExceeded = 11,
2121
NotDeterministic = 12,
22-
Busy = 100, // not stored
22+
Resolved = 13, // step was resolved to a CA derivation, see resolvedTo FK
23+
Busy = 100, // not stored
2324
}
2425

2526
impl BuildStatus {
@@ -38,6 +39,7 @@ impl BuildStatus {
3839
10 => Some(Self::LogLimitExceeded),
3940
11 => Some(Self::NarSizeLimitExceeded),
4041
12 => Some(Self::NotDeterministic),
42+
13 => Some(Self::Resolved),
4143
100 => Some(Self::Busy),
4244
_ => None,
4345
}

subprojects/hydra-builder/src/state.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -408,8 +408,6 @@ impl State {
408408
.drv
409409
.ok_or(JobFailure::Preparing(anyhow::anyhow!("missing drv")))?
410410
.0;
411-
let resolved_drv = m.resolved_drv.map(|v| v.0);
412-
let maybe_resolved_drv = resolved_drv.as_ref().unwrap_or(&drv);
413411

414412
let before_import = Instant::now();
415413
let gcroot_prefix = uuid::Uuid::new_v4().to_string();
@@ -427,7 +425,7 @@ impl State {
427425
.await;
428426
let requisites = client
429427
.fetch_drv_requisites(FetchRequisitesRequest {
430-
path: Some(ProtoStorePath::from(maybe_resolved_drv.clone())),
428+
path: Some(ProtoStorePath::from(drv.clone())),
431429
include_outputs: false,
432430
})
433431
.await
@@ -440,7 +438,7 @@ impl State {
440438
store.clone(),
441439
self.metrics.clone(),
442440
&gcroot,
443-
maybe_resolved_drv,
441+
&drv,
444442
requisites.into_iter().map(|s| s.0),
445443
usize::try_from(self.max_concurrent_downloads.load(Ordering::Relaxed)).unwrap_or(5),
446444
self.config.use_substitutes,
@@ -459,7 +457,7 @@ impl State {
459457
let before_build = Instant::now();
460458
let (mut child, stdout, mut stderr) = nix_utils::realise_drv(
461459
&store,
462-
maybe_resolved_drv,
460+
&drv,
463461
&nix_utils::BuildOptions::complete(m.max_log_size, m.max_silent_time, m.build_timeout),
464462
true,
465463
)
@@ -536,7 +534,7 @@ impl State {
536534
.store_dir()
537535
.parse(&output_raw[0].drv_path)
538536
.map_err(|e: nix_utils::ParseStorePathError| JobFailure::PostProcessing(e.into()))?;
539-
if &actual_out_drv != maybe_resolved_drv {
537+
if actual_out_drv != drv {
540538
return Err(JobFailure::PostProcessing(anyhow::anyhow!(
541539
"Nix returned outputs for {actual_out_drv} when we expected {drv}"
542540
)));

subprojects/hydra-queue-runner/src/state/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct Build {
2525
pub local_priority: i32,
2626
pub global_priority: AtomicI32,
2727

28-
toplevel: arc_swap::ArcSwapOption<Step>,
28+
pub toplevel: arc_swap::ArcSwapOption<Step>,
2929
pub jobset: Arc<Jobset>,
3030

3131
finished_in_db: AtomicBool,

0 commit comments

Comments
 (0)