Skip to content

Commit 4be2bbb

Browse files
amaanqartemist
authored andcommitted
queue-runner: resolve CA derivations at dispatch time
Instead of resolving at StepInfo construction and carrying two drv identities through the gRPC layer, resolve in realise_drv_on_valid_machine once all deps are built. If resolution yields a different drv, the original step is marked Resolved and a new DB step is created for the resolved drv with a resolvedTo FK linking them. The builder only ever sees one drv. We create a new Step for that resolution and bunt it back to the scheduler. This grants us more flexibility in execution and the method can be used in the future for dynamic derivations, which won't map 1:1 with the original derivations. In order to make tests more consistent, CA derivations will fail if they cannot be fully resolved. Otherwise, there could be inconsistent successes depending on which builder a step was performed on. As part of this, add local outputs to resolution table With the current queue-runner design, all dependency outputs of a CAFloating derivation must be recorded in the hydra database. This is true for things built or substituted by hydra, but until now not by things found on the local nix store. This may occur for outputs that are part of the system configuration. Therefore, add all local outputs that are not already in the database to the resolution table upon creating a step. This makes it possible to build derivations from `contentAddressedByDefault` nixpkgs.
1 parent cd235f7 commit 4be2bbb

13 files changed

Lines changed: 414 additions & 59 deletions

File tree

subprojects/crates/db/src/connection.rs

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::BTreeMap;
2+
13
use sqlx::Acquire;
24

35
use harmonia_store_core::store_path::StoreDir;
@@ -338,12 +340,14 @@ impl Connection {
338340
CROSS JOIN LATERAL (
339341
SELECT o.path
340342
FROM buildsteps s
343+
LEFT JOIN buildsteps sr
344+
ON s.build = sr.build AND s.resolvedToStep = sr.stepnr
341345
JOIN buildstepoutputs o
342-
ON s.build = o.build AND s.stepnr = o.stepnr
346+
ON s.build = o.build AND (s.stepnr = o.stepnr OR sr.stepnr = o.stepnr)
343347
WHERE s.drvPath = r.drv_path
344348
AND o.name = i.chain[r.step]
345349
AND o.path IS NOT NULL
346-
AND s.status = 0
350+
AND (s.status = 0 OR (s.status = 13 AND sr.status = 0))
347351
ORDER BY s.build DESC
348352
LIMIT 1
349353
) sub
@@ -648,6 +652,16 @@ impl Transaction<'_> {
648652
Ok(())
649653
}
650654

655+
#[tracing::instrument(skip(self), err)]
656+
pub async fn find_build_step_outputs(
657+
&mut self,
658+
drv_path: &str,
659+
) -> sqlx::Result<BTreeMap<String, String>> {
660+
let items: Vec<(String, String)> = sqlx::query_as("SELECT o.name, o.path FROM buildstepoutputs o JOIN buildsteps s ON s.stepnr = o.stepnr WHERE s.drvpath = ? AND o.path IS NOT NULL").bind(drv_path).fetch_all(&mut *self.tx).await?;
661+
662+
Ok(items.into_iter().collect())
663+
}
664+
651665
#[tracing::instrument(skip(self, res), err)]
652666
pub async fn update_build_step_in_finish(
653667
&mut self,
@@ -879,6 +893,79 @@ impl Transaction<'_> {
879893
Ok(step_nr)
880894
}
881895

896+
/// Set resolvedToBuild/resolvedToStep on a dependency step after the
897+
/// resolved step has been created, linking the dependency to its resolution.
898+
#[tracing::instrument(skip(self), err)]
899+
pub async fn set_resolved_to(
900+
&mut self,
901+
origin_build_id: crate::models::BuildID,
902+
origin_step_nr: i32,
903+
resolved_step_nr: i32,
904+
) -> sqlx::Result<()> {
905+
sqlx::query(
906+
r"
907+
UPDATE buildsteps
908+
SET resolvedToStep = $3
909+
WHERE build = $1 AND stepnr = $2
910+
",
911+
)
912+
.bind(origin_build_id)
913+
.bind(origin_step_nr)
914+
.bind(resolved_step_nr)
915+
.execute(&mut *self.tx)
916+
.await?;
917+
Ok(())
918+
}
919+
920+
#[tracing::instrument(
921+
skip(self, start_time, stop_time, build_id, drv_path, outputs,),
922+
err,
923+
ret
924+
)]
925+
pub async fn create_local_step(
926+
&mut self,
927+
start_time: i32,
928+
stop_time: i32,
929+
build_id: crate::models::BuildID,
930+
drv_path: &str,
931+
outputs: BTreeMap<String, String>,
932+
) -> anyhow::Result<i32> {
933+
let step_nr = loop {
934+
if let Some(step_nr) = self
935+
.insert_build_step(InsertBuildStep {
936+
build_id,
937+
r#type: crate::models::BuildType::Substitution,
938+
drv_path,
939+
status: BuildStatus::Success,
940+
busy: false,
941+
start_time: Some(start_time),
942+
stop_time: Some(stop_time),
943+
platform: None,
944+
propagated_from: None,
945+
error_msg: None,
946+
machine: "",
947+
})
948+
.await?
949+
{
950+
break step_nr;
951+
}
952+
};
953+
954+
let output_items: Vec<_> = outputs
955+
.into_iter()
956+
.map(|(name, path)| InsertBuildStepOutput::<String> {
957+
build_id,
958+
step_nr,
959+
name,
960+
path: Some(path),
961+
})
962+
.collect();
963+
964+
self.insert_build_step_outputs(&output_items).await?;
965+
966+
Ok(step_nr)
967+
}
968+
882969
#[tracing::instrument(
883970
skip(self, start_time, stop_time, build_id, drv_path, output,),
884971
err,

subprojects/crates/db/src/models.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ pub enum BuildStatus {
1919
LogLimitExceeded = 10,
2020
NarSizeLimitExceeded = 11,
2121
NotDeterministic = 12,
22-
Busy = 100, // not stored
22+
Resolved = 13, // step was resolved to a CA derivation, see resolvedTo FK
23+
Busy = 100, // not stored
2324
}
2425

2526
impl BuildStatus {
@@ -38,6 +39,7 @@ impl BuildStatus {
3839
10 => Some(Self::LogLimitExceeded),
3940
11 => Some(Self::NarSizeLimitExceeded),
4041
12 => Some(Self::NotDeterministic),
42+
13 => Some(Self::Resolved),
4143
100 => Some(Self::Busy),
4244
_ => None,
4345
}

subprojects/hydra-builder/src/state.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -408,8 +408,6 @@ impl State {
408408
.drv
409409
.ok_or(JobFailure::Preparing(anyhow::anyhow!("missing drv")))?
410410
.0;
411-
let resolved_drv = m.resolved_drv.map(|v| v.0);
412-
let maybe_resolved_drv = resolved_drv.as_ref().unwrap_or(&drv);
413411

414412
let before_import = Instant::now();
415413
let gcroot_prefix = uuid::Uuid::new_v4().to_string();
@@ -427,7 +425,7 @@ impl State {
427425
.await;
428426
let requisites = client
429427
.fetch_drv_requisites(FetchRequisitesRequest {
430-
path: Some(ProtoStorePath::from(maybe_resolved_drv.clone())),
428+
path: Some(ProtoStorePath::from(drv.clone())),
431429
include_outputs: false,
432430
})
433431
.await
@@ -440,7 +438,7 @@ impl State {
440438
store.clone(),
441439
self.metrics.clone(),
442440
&gcroot,
443-
maybe_resolved_drv,
441+
&drv,
444442
requisites.into_iter().map(|s| s.0),
445443
usize::try_from(self.max_concurrent_downloads.load(Ordering::Relaxed)).unwrap_or(5),
446444
self.config.use_substitutes,
@@ -459,7 +457,7 @@ impl State {
459457
let before_build = Instant::now();
460458
let (mut child, stdout, mut stderr) = nix_utils::realise_drv(
461459
&store,
462-
maybe_resolved_drv,
460+
&drv,
463461
&nix_utils::BuildOptions::complete(m.max_log_size, m.max_silent_time, m.build_timeout),
464462
true,
465463
)
@@ -519,7 +517,7 @@ impl State {
519517
.store_dir()
520518
.parse(&output_raw[0].drv_path)
521519
.map_err(|e: nix_utils::ParseStorePathError| JobFailure::PostProcessing(e.into()))?;
522-
if &actual_out_drv != maybe_resolved_drv {
520+
if actual_out_drv != drv {
523521
return Err(JobFailure::PostProcessing(anyhow::anyhow!(
524522
"Nix returned outputs for {actual_out_drv} when we expected {drv}"
525523
)));

subprojects/hydra-queue-runner/src/state/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct Build {
2525
pub local_priority: i32,
2626
pub global_priority: AtomicI32,
2727

28-
toplevel: arc_swap::ArcSwapOption<Step>,
28+
pub toplevel: arc_swap::ArcSwapOption<Step>,
2929
pub jobset: Arc<Jobset>,
3030

3131
finished_in_db: AtomicBool,

subprojects/hydra-queue-runner/src/state/machine.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -491,22 +491,16 @@ impl Machines {
491491
pub struct Job {
492492
pub internal_build_id: uuid::Uuid,
493493
pub path: nix_utils::StorePath,
494-
pub resolved_drv: Option<nix_utils::StorePath>,
495494
pub build_id: BuildID,
496495
pub step_nr: i32,
497496
pub result: RemoteBuild,
498497
}
499498

500499
impl Job {
501-
pub fn new(
502-
build_id: BuildID,
503-
path: nix_utils::StorePath,
504-
resolved_drv: Option<nix_utils::StorePath>,
505-
) -> Self {
500+
pub fn new(build_id: BuildID, path: nix_utils::StorePath) -> Self {
506501
Self {
507502
internal_build_id: uuid::Uuid::new_v4(),
508503
path,
509-
resolved_drv,
510504
build_id,
511505
step_nr: 0,
512506
result: RemoteBuild::new(),
@@ -538,7 +532,6 @@ pub enum Message {
538532
BuildMessage {
539533
build_id: uuid::Uuid,
540534
drv: nix_utils::StorePath,
541-
resolved_drv: Option<nix_utils::StorePath>,
542535
max_log_size: u64,
543536
max_silent_time: i32,
544537
build_timeout: i32,
@@ -560,15 +553,13 @@ impl Message {
560553
Self::BuildMessage {
561554
build_id,
562555
drv,
563-
resolved_drv,
564556
max_log_size,
565557
max_silent_time,
566558
build_timeout,
567559
presigned_url_opts,
568560
} => runner_request::Message::Build(BuildMessage {
569561
build_id: build_id.to_string(),
570562
drv: Some(shared::proto::ProtoStorePath::from(drv)),
571-
resolved_drv: resolved_drv.map(shared::proto::ProtoStorePath::from),
572563
max_log_size,
573564
max_silent_time,
574565
build_timeout,
@@ -694,15 +685,15 @@ impl Machine {
694685
pub async fn build_drv(
695686
&self,
696687
job: Job,
688+
effective_drv: nix_utils::StorePath,
697689
opts: &nix_utils::BuildOptions,
698690
presigned_url_opts: Option<PresignedUrlOpts>,
699691
) -> anyhow::Result<()> {
700-
let drv = job.path.clone();
692+
let drv = effective_drv;
701693
self.msg_queue
702694
.send(Message::BuildMessage {
703695
build_id: job.internal_build_id,
704696
drv,
705-
resolved_drv: job.resolved_drv.clone(),
706697
max_log_size: opts.get_max_log_size(),
707698
max_silent_time: opts.get_max_silent_time(),
708699
build_timeout: opts.get_build_timeout(),

0 commit comments

Comments
 (0)