Skip to content

Commit ae692ac

Browse files
committed
queue-runner: resolve CA derivations at dispatch time
Instead of resolving at StepInfo construction and carrying two drv identities through the gRPC layer, resolve in realise_drv_on_valid_machine once all deps are built. If resolution yields a different drv, the original step is marked Resolved and a new DB step is created for the resolved drv with a resolvedTo FK linking them. The builder only ever sees one drv.
1 parent 628c409 commit ae692ac

10 files changed

Lines changed: 153 additions & 50 deletions

File tree

subprojects/crates/db/src/connection.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,32 @@ impl Transaction<'_> {
866866
Ok(step_nr)
867867
}
868868

869+
/// Set resolvedToBuild/resolvedToStep on a dependency step after the
870+
/// resolved step has been created, linking the dependency to its resolution.
871+
#[tracing::instrument(skip(self), err)]
872+
pub async fn set_resolved_to(
873+
&mut self,
874+
origin_build_id: crate::models::BuildID,
875+
origin_step_nr: i32,
876+
resolved_build_id: crate::models::BuildID,
877+
resolved_step_nr: i32,
878+
) -> sqlx::Result<()> {
879+
sqlx::query(
880+
r"
881+
UPDATE buildsteps
882+
SET resolvedToBuild = $3, resolvedToStep = $4
883+
WHERE build = $1 AND stepnr = $2
884+
",
885+
)
886+
.bind(origin_build_id)
887+
.bind(origin_step_nr)
888+
.bind(resolved_build_id)
889+
.bind(resolved_step_nr)
890+
.execute(&mut *self.tx)
891+
.await?;
892+
Ok(())
893+
}
894+
869895
#[tracing::instrument(
870896
skip(self, start_time, stop_time, build_id, drv_path, output,),
871897
err,

subprojects/crates/db/src/models.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ pub enum BuildStatus {
1717
LogLimitExceeded = 10,
1818
NarSizeLimitExceeded = 11,
1919
NotDeterministic = 12,
20-
Busy = 100, // not stored
20+
Resolved = 13, // step was resolved to a CA derivation, see resolvedTo FK
21+
Busy = 100, // not stored
2122
}
2223

2324
impl BuildStatus {
@@ -36,6 +37,7 @@ impl BuildStatus {
3637
10 => Some(Self::LogLimitExceeded),
3738
11 => Some(Self::NarSizeLimitExceeded),
3839
12 => Some(Self::NotDeterministic),
40+
13 => Some(Self::Resolved),
3941
100 => Some(Self::Busy),
4042
_ => None,
4143
}

subprojects/hydra-builder/src/state.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -400,11 +400,6 @@ impl State {
400400

401401
let machine_id = self.id;
402402
let drv = nix_utils::parse_store_path(&m.drv);
403-
let resolved_drv = m
404-
.resolved_drv
405-
.as_ref()
406-
.map(|v| nix_utils::parse_store_path(v));
407-
let maybe_resolved_drv = resolved_drv.as_ref().unwrap_or(&drv);
408403

409404
let before_import = Instant::now();
410405
let gcroot_prefix = uuid::Uuid::new_v4().to_string();
@@ -422,7 +417,7 @@ impl State {
422417
.await;
423418
let requisites = client
424419
.fetch_drv_requisites(FetchRequisitesRequest {
425-
path: maybe_resolved_drv.to_string().to_owned(),
420+
path: drv.to_string().to_owned(),
426421
include_outputs: false,
427422
})
428423
.await
@@ -435,7 +430,7 @@ impl State {
435430
store.clone(),
436431
self.metrics.clone(),
437432
&gcroot,
438-
maybe_resolved_drv,
433+
&drv,
439434
requisites
440435
.into_iter()
441436
.map(|s| nix_utils::parse_store_path(&s)),
@@ -456,7 +451,7 @@ impl State {
456451
let before_build = Instant::now();
457452
let (mut child, stdout, mut stderr) = nix_utils::realise_drv(
458453
&store,
459-
maybe_resolved_drv,
454+
&drv,
460455
&nix_utils::BuildOptions::complete(m.max_log_size, m.max_silent_time, m.build_timeout),
461456
true,
462457
)
@@ -516,7 +511,7 @@ impl State {
516511
.store_dir()
517512
.parse(&output_raw[0].drv_path)
518513
.map_err(|e: nix_utils::ParseStorePathError| JobFailure::PostProcessing(e.into()))?;
519-
if &actual_out_drv != maybe_resolved_drv {
514+
if actual_out_drv != drv {
520515
return Err(JobFailure::PostProcessing(anyhow::anyhow!(
521516
"Nix returned outputs for {actual_out_drv} when we expected {drv}"
522517
)));

subprojects/hydra-queue-runner/src/state/machine.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -491,22 +491,16 @@ impl Machines {
491491
pub struct Job {
492492
pub internal_build_id: uuid::Uuid,
493493
pub path: nix_utils::StorePath,
494-
pub resolved_drv: Option<nix_utils::StorePath>,
495494
pub build_id: BuildID,
496495
pub step_nr: i32,
497496
pub result: RemoteBuild,
498497
}
499498

500499
impl Job {
501-
pub fn new(
502-
build_id: BuildID,
503-
path: nix_utils::StorePath,
504-
resolved_drv: Option<nix_utils::StorePath>,
505-
) -> Self {
500+
pub fn new(build_id: BuildID, path: nix_utils::StorePath) -> Self {
506501
Self {
507502
internal_build_id: uuid::Uuid::new_v4(),
508503
path,
509-
resolved_drv,
510504
build_id,
511505
step_nr: 0,
512506
result: RemoteBuild::new(),
@@ -538,7 +532,6 @@ pub enum Message {
538532
BuildMessage {
539533
build_id: uuid::Uuid,
540534
drv: nix_utils::StorePath,
541-
resolved_drv: Option<nix_utils::StorePath>,
542535
max_log_size: u64,
543536
max_silent_time: i32,
544537
build_timeout: i32,
@@ -560,15 +553,13 @@ impl Message {
560553
Self::BuildMessage {
561554
build_id,
562555
drv,
563-
resolved_drv,
564556
max_log_size,
565557
max_silent_time,
566558
build_timeout,
567559
presigned_url_opts,
568560
} => runner_request::Message::Build(BuildMessage {
569561
build_id: build_id.to_string(),
570562
drv: drv.to_string(),
571-
resolved_drv: resolved_drv.map(|p| p.to_string()),
572563
max_log_size,
573564
max_silent_time,
574565
build_timeout,
@@ -694,15 +685,15 @@ impl Machine {
694685
pub async fn build_drv(
695686
&self,
696687
job: Job,
688+
effective_drv: nix_utils::StorePath,
697689
opts: &nix_utils::BuildOptions,
698690
presigned_url_opts: Option<PresignedUrlOpts>,
699691
) -> anyhow::Result<()> {
700-
let drv = job.path.clone();
692+
let drv = effective_drv;
701693
self.msg_queue
702694
.send(Message::BuildMessage {
703695
build_id: job.internal_build_id,
704696
drv,
705-
resolved_drv: job.resolved_drv.clone(),
706697
max_log_size: opts.get_max_log_size(),
707698
max_silent_time: opts.get_max_silent_time(),
708699
build_timeout: opts.get_build_timeout(),

subprojects/hydra-queue-runner/src/state/mod.rs

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,7 @@ impl State {
317317
build.id
318318
};
319319

320-
let mut job = machine::Job::new(
321-
build_id,
322-
drv.to_owned(),
323-
step_info.resolved_drv_path.clone(),
324-
);
320+
let mut job = machine::Job::new(build_id, drv.to_owned());
325321
job.result.set_start_time_now();
326322
if self.check_cached_failure(step_info.step.clone()).await {
327323
job.result.step_status = BuildStatus::CachedFailure;
@@ -363,33 +359,99 @@ impl State {
363359
.collect(),
364360
)
365361
.await?;
362+
366363
tx.commit().await?;
367364
step_nr
368365
};
369366
job.step_nr = step_nr;
370367

368+
// Try to resolve CA derivation inputs. If resolution yields a
369+
// different drv, mark this step as Resolved in the DB and create a new
370+
// step for the resolved drv that the builder will actually build.
371+
let effective_drv = if let Some(guard) = step_info.step.get_drv() {
372+
let drv_ref = guard.as_ref().unwrap();
373+
match StepInfo::try_resolve(self.store.store_dir(), &self.db, drv_ref).await {
374+
Some(basic_drv) => {
375+
let resolved_path = self.store.write_derivation(&basic_drv).await?;
376+
if &resolved_path != drv {
377+
tracing::info!("resolved CA derivation {drv} -> {resolved_path}");
378+
379+
// Finish original step as "resolved".
380+
let mut resolved_result = RemoteBuild::new();
381+
resolved_result.step_status = BuildStatus::Resolved;
382+
resolved_result.set_start_time_now();
383+
resolved_result.set_stop_time_now();
384+
resolved_result.log_file.clone_from(&job.result.log_file);
385+
finish_build_step(
386+
&self.db,
387+
&self.store,
388+
build_id,
389+
step_nr,
390+
&resolved_result,
391+
Some(&machine.hostname),
392+
None,
393+
)
394+
.await?;
395+
396+
// Create DB step for the resolved drv under the same build.
397+
let resolved_step_nr = {
398+
let mut tx = db.begin_transaction().await?;
399+
let nr = tx
400+
.create_build_step(
401+
Some(job.result.get_start_time_as_i32()?),
402+
build_id,
403+
&self.store.print_store_path(&resolved_path),
404+
step_info.step.get_system().as_deref(),
405+
machine.hostname.clone(),
406+
BuildStatus::Busy,
407+
None,
408+
None,
409+
vec![],
410+
)
411+
.await?;
412+
413+
// Link the original step to the resolved step.
414+
tx.set_resolved_to(build_id, step_nr, build_id, nr).await?;
415+
416+
tx.commit().await?;
417+
nr
418+
};
419+
job.step_nr = resolved_step_nr;
420+
resolved_path
421+
} else {
422+
drv.to_owned()
423+
}
424+
}
425+
None => drv.to_owned(),
426+
}
427+
} else {
428+
drv.to_owned()
429+
};
430+
371431
{
372432
let mut tx = db.begin_transaction().await?;
373433
tx.notify_build_started(build_id).await?;
374434
tx.commit().await?;
375435
}
376436
tracing::info!(
377-
"Submitting build drv={drv} on machine={} hostname={} build_id={build_id} step_nr={step_nr}",
437+
"Submitting build drv={effective_drv} on machine={} hostname={} build_id={build_id} step_nr={}",
378438
machine.id,
379-
machine.hostname
439+
machine.hostname,
440+
job.step_nr,
380441
);
381442
self.db
382443
.get()
383444
.await?
384445
.update_build_step(db::models::UpdateBuildStep {
385446
build_id,
386-
step_nr,
447+
step_nr: job.step_nr,
387448
status: db::models::StepStatus::Connecting,
388449
})
389450
.await?;
390451
machine
391452
.build_drv(
392453
job,
454+
effective_drv,
393455
&build_options,
394456
// TODO: cleanup
395457
if self.config.use_presigned_uploads() {
@@ -962,7 +1024,7 @@ impl State {
9621024
if r.atomic_state.tries.load(Ordering::Relaxed) > 0 {
9631025
continue;
9641026
}
965-
let step_info = StepInfo::new(&self.store, &self.db, r.clone()).await;
1027+
let step_info = StepInfo::new(r.clone());
9661028

9671029
new_queues
9681030
.entry(system)
@@ -2090,7 +2152,7 @@ impl State {
20902152
continue;
20912153
};
20922154

2093-
let mut job = machine::Job::new(build.id, drv.to_owned(), None);
2155+
let mut job = machine::Job::new(build.id, drv.to_owned());
20942156
job.result.set_start_and_stop(now);
20952157
job.result.step_status = BuildStatus::Unsupported;
20962158
job.result.error_msg = Some(format!(

subprojects/hydra-queue-runner/src/state/step.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,3 +525,33 @@ impl Steps {
525525
steps.remove(drv_path);
526526
}
527527
}
528+
529+
#[cfg(test)]
530+
mod tests {
531+
use super::*;
532+
533+
fn drv(name: &str) -> nix_utils::StorePath {
534+
nix_utils::parse_store_path(&format!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-{name}.drv"))
535+
}
536+
537+
#[test]
538+
fn steps_create_and_remove() {
539+
let steps = Steps::new();
540+
let (step, is_new) = steps.create(&drv("test"), None, None);
541+
assert!(is_new);
542+
assert_eq!(steps.len(), 1);
543+
544+
steps.remove(step.get_drv_path());
545+
assert_eq!(steps.len(), 0);
546+
}
547+
548+
#[test]
549+
fn steps_weak_ref_dies_without_strong_ref() {
550+
let steps = Steps::new();
551+
let (step, _) = steps.create(&drv("ephemeral"), None, None);
552+
assert_eq!(steps.len(), 1);
553+
554+
drop(step);
555+
assert_eq!(steps.len(), 0);
556+
}
557+
}

subprojects/hydra-queue-runner/src/state/step_info.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22
use std::sync::atomic::{AtomicBool, Ordering};
33

44
use db::models::BuildID;
5-
use nix_utils::BaseStore as _;
65
use nix_utils::SingleDerivedPath;
76

87
use super::Step;
@@ -37,27 +36,15 @@ fn flatten_chain(
3736
#[derive(Debug)]
3837
pub struct StepInfo {
3938
pub step: Arc<Step>,
40-
pub resolved_drv_path: Option<nix_utils::StorePath>,
4139
already_scheduled: AtomicBool,
4240
cancelled: AtomicBool,
4341
pub runnable_since: jiff::Timestamp,
4442
lowest_share_used: atomic_float::AtomicF64,
4543
}
4644

4745
impl StepInfo {
48-
pub async fn new(store: &nix_utils::LocalStore, db: &db::Database, step: Arc<Step>) -> Self {
46+
pub fn new(step: Arc<Step>) -> Self {
4947
Self {
50-
resolved_drv_path: match step.get_drv() {
51-
Some(guard) => {
52-
let resolved =
53-
Self::try_resolve(store.store_dir(), db, guard.as_ref().unwrap()).await;
54-
match resolved {
55-
Some(ref basic_drv) => store.write_derivation(basic_drv).await.ok(),
56-
None => None,
57-
}
58-
}
59-
None => None,
60-
},
6148
already_scheduled: false.into(),
6249
cancelled: false.into(),
6350
runnable_since: step.get_runnable_since(),
@@ -77,7 +64,7 @@ impl StepInfo {
7764
///
7865
/// We only need a store dir, not a store, because all the info we need comes from the Hydra
7966
/// database.
80-
async fn try_resolve(
67+
pub(super) async fn try_resolve(
8168
store_dir: &nix_utils::StoreDir,
8269
db: &db::Database,
8370
drv: &nix_utils::Derivation,
@@ -297,7 +284,6 @@ mod tests {
297284

298285
StepInfo {
299286
step,
300-
resolved_drv_path: None,
301287
already_scheduled: false.into(),
302288
cancelled: false.into(),
303289
runnable_since: jiff::Timestamp::now(),

0 commit comments

Comments
 (0)