Skip to content

Commit d0febc0

Browse files
authored
Merge pull request #1629 from obsidiansystems/ca-resolve-two-phase
queue-runner: resolve CA derivations after dependency outputs land
2 parents 6db3647 + 3cbcdbe commit d0febc0

15 files changed

Lines changed: 481 additions & 99 deletions

File tree

subprojects/crates/db/src/connection.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,36 @@ impl Transaction<'_> {
693693
Ok(())
694694
}
695695

696+
#[tracing::instrument(skip(self), err)]
697+
pub async fn find_build_step_outputs(
698+
&mut self,
699+
store_dir: &StoreDir,
700+
drv_path: &StorePath,
701+
) -> sqlx::Result<BTreeMap<OutputName, StorePath>> {
702+
let drv_path = store_dir.display(drv_path).to_string();
703+
let items: Vec<(String, String)> = sqlx::query_as(
704+
r"SELECT o.name, o.path
705+
FROM buildstepoutputs o
706+
JOIN buildsteps s ON s.build = o.build AND s.stepnr = o.stepnr
707+
WHERE s.drvpath = $1 AND o.path IS NOT NULL",
708+
)
709+
.bind(drv_path)
710+
.fetch_all(&mut *self.tx)
711+
.await?;
712+
713+
items
714+
.into_iter()
715+
.map(|(name, path)| -> anyhow::Result<_> {
716+
let name: OutputName = name.parse().context("invalid output name from DB")?;
717+
let path: StorePath = store_dir
718+
.parse(&path)
719+
.context("invalid store path from DB")?;
720+
Ok((name, path))
721+
})
722+
.collect::<anyhow::Result<_>>()
723+
.map_err(|e| sqlx::Error::Decode(e.into_boxed_dyn_error()))
724+
}
725+
696726
#[tracing::instrument(skip(self, res), err)]
697727
pub async fn update_build_step_in_finish(
698728
&mut self,
@@ -941,6 +971,60 @@ impl Transaction<'_> {
941971
Ok(step_nr)
942972
}
943973

974+
#[tracing::instrument(
975+
skip(self, start_time, stop_time, build_id, drv_path, outputs,),
976+
err,
977+
ret
978+
)]
979+
pub async fn create_local_step(
980+
&mut self,
981+
store_dir: &StoreDir,
982+
start_time: i32,
983+
stop_time: i32,
984+
build_id: crate::models::BuildID,
985+
drv_path: &StorePath,
986+
outputs: BTreeMap<OutputName, StorePath>,
987+
) -> anyhow::Result<i32> {
988+
let step_nr = loop {
989+
if let Some(step_nr) = self
990+
.insert_build_step(
991+
store_dir,
992+
InsertBuildStep {
993+
build_id,
994+
r#type: crate::models::BuildType::Substitution,
995+
drv_path,
996+
status: BuildStatus::Success,
997+
busy: false,
998+
start_time: Some(start_time),
999+
stop_time: Some(stop_time),
1000+
platform: None,
1001+
propagated_from: None,
1002+
error_msg: None,
1003+
machine: "",
1004+
},
1005+
)
1006+
.await?
1007+
{
1008+
break step_nr;
1009+
}
1010+
};
1011+
1012+
let output_items: Vec<_> = outputs
1013+
.into_iter()
1014+
.map(|(name, path)| InsertBuildStepOutput {
1015+
build_id,
1016+
step_nr,
1017+
name,
1018+
path: Some(path),
1019+
})
1020+
.collect();
1021+
1022+
self.insert_build_step_outputs(store_dir, &output_items)
1023+
.await?;
1024+
1025+
Ok(step_nr)
1026+
}
1027+
9441028
#[tracing::instrument(
9451029
skip(self, start_time, stop_time, build_id, drv_path, output,),
9461030
err,

subprojects/crates/db/src/models.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub enum BuildStatus {
2222
LogLimitExceeded = 10,
2323
NarSizeLimitExceeded = 11,
2424
NotDeterministic = 12,
25+
/// step was resolved to a CA derivation
26+
Resolved = 13,
2527
/// not stored
2628
Busy = 100,
2729
}
@@ -42,6 +44,7 @@ impl BuildStatus {
4244
10 => Some(Self::LogLimitExceeded),
4345
11 => Some(Self::NarSizeLimitExceeded),
4446
12 => Some(Self::NotDeterministic),
47+
13 => Some(Self::Resolved),
4548
100 => Some(Self::Busy),
4649
_ => None,
4750
}

subprojects/hydra-builder/src/state.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -408,8 +408,6 @@ impl State {
408408
.drv
409409
.ok_or(JobFailure::Preparing(anyhow::anyhow!("missing drv")))?
410410
.0;
411-
let resolved_drv = m.resolved_drv.map(|v| v.0);
412-
let maybe_resolved_drv = resolved_drv.as_ref().unwrap_or(&drv);
413411

414412
let before_import = Instant::now();
415413
let gcroot_prefix = uuid::Uuid::new_v4().to_string();
@@ -427,7 +425,7 @@ impl State {
427425
.await;
428426
let requisites = client
429427
.fetch_drv_requisites(FetchRequisitesRequest {
430-
path: Some(ProtoStorePath::from(maybe_resolved_drv.clone())),
428+
path: Some(ProtoStorePath::from(drv.clone())),
431429
include_outputs: false,
432430
})
433431
.await
@@ -440,7 +438,7 @@ impl State {
440438
store.clone(),
441439
self.metrics.clone(),
442440
&gcroot,
443-
maybe_resolved_drv,
441+
&drv,
444442
requisites.into_iter().map(|s| s.0),
445443
usize::try_from(self.max_concurrent_downloads.load(Ordering::Relaxed)).unwrap_or(5),
446444
self.config.use_substitutes,
@@ -459,7 +457,7 @@ impl State {
459457
let before_build = Instant::now();
460458
let (mut child, stdout, mut stderr) = nix_utils::realise_drv(
461459
&store,
462-
maybe_resolved_drv,
460+
&drv,
463461
&nix_utils::BuildOptions::complete(m.max_log_size, m.max_silent_time, m.build_timeout),
464462
true,
465463
)
@@ -536,7 +534,7 @@ impl State {
536534
.store_dir()
537535
.parse(&output_raw[0].drv_path)
538536
.map_err(|e: nix_utils::ParseStorePathError| JobFailure::PostProcessing(e.into()))?;
539-
if &actual_out_drv != maybe_resolved_drv {
537+
if actual_out_drv != drv {
540538
return Err(JobFailure::PostProcessing(anyhow::anyhow!(
541539
"Nix returned outputs for {actual_out_drv} when we expected {drv}"
542540
)));

subprojects/hydra-queue-runner/src/state/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct Build {
2525
pub local_priority: i32,
2626
pub global_priority: AtomicI32,
2727

28-
toplevel: arc_swap::ArcSwapOption<Step>,
28+
pub toplevel: arc_swap::ArcSwapOption<Step>,
2929
pub jobset: Arc<Jobset>,
3030

3131
finished_in_db: AtomicBool,
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use nix_utils::SingleDerivedPath;
2+
3+
/// Output names of intermediate derivations for a dynamic derivation
4+
/// dependency, stored in reverse order so that the next level to resolve
5+
/// can be cheaply `pop()`-ed.
6+
///
7+
/// e.g. for a derivation input that is `aaaa-dyn.drv^foo^bar^out`, the final
8+
/// derivation would be `aaaa-dyn.drv^foo^bar` (no final `^out`). The output
9+
/// names are stored as `["bar", "foo"]` (reversed).
10+
///
11+
/// In the common case of depending on a static derivation, this is empty.
12+
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
13+
pub struct OutputNameChain(pub Vec<nix_utils::OutputName>);
14+
15+
impl OutputNameChain {
16+
pub fn pop(&mut self) -> Option<nix_utils::OutputName> {
17+
self.0.pop()
18+
}
19+
}
20+
21+
/// Flatten a [`SingleDerivedPath`] into `(root_drv_path, chain)`.
22+
///
23+
/// The output chain is in stack order (outermost first) matching
24+
/// [`OutputNameChain`]'s convention. For `Built { Opaque(A), "foo" }`,
25+
/// returns `(A, ["foo"])`. For `Opaque(A)`, returns `(A, [])`.
26+
pub fn flatten_path(sdp: &SingleDerivedPath) -> (nix_utils::StorePath, OutputNameChain) {
27+
match sdp {
28+
SingleDerivedPath::Opaque(p) => (p.clone(), OutputNameChain::default()),
29+
SingleDerivedPath::Built { drv_path, output } => flatten_chain(drv_path, output),
30+
}
31+
}
32+
33+
/// Like [`flatten_path`] but appends an additional output name.
34+
///
35+
/// For `Built { Opaque(A), "foo" }` with output `"bar"`,
36+
/// returns `(A, ["bar", "foo"])`.
37+
pub fn flatten_chain(
38+
drv_path: &SingleDerivedPath,
39+
output_name: &nix_utils::OutputName,
40+
) -> (nix_utils::StorePath, OutputNameChain) {
41+
let (root, mut chain) = flatten_path(drv_path);
42+
chain.0.push(output_name.clone());
43+
(root, chain)
44+
}

subprojects/hydra-queue-runner/src/state/machine.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -491,22 +491,16 @@ impl Machines {
491491
pub struct Job {
492492
pub internal_build_id: uuid::Uuid,
493493
pub path: nix_utils::StorePath,
494-
pub resolved_drv: Option<nix_utils::StorePath>,
495494
pub build_id: BuildID,
496495
pub step_nr: i32,
497496
pub result: RemoteBuild,
498497
}
499498

500499
impl Job {
501-
pub fn new(
502-
build_id: BuildID,
503-
path: nix_utils::StorePath,
504-
resolved_drv: Option<nix_utils::StorePath>,
505-
) -> Self {
500+
pub fn new(build_id: BuildID, path: nix_utils::StorePath) -> Self {
506501
Self {
507502
internal_build_id: uuid::Uuid::new_v4(),
508503
path,
509-
resolved_drv,
510504
build_id,
511505
step_nr: 0,
512506
result: RemoteBuild::new(),
@@ -538,7 +532,6 @@ pub enum Message {
538532
BuildMessage {
539533
build_id: uuid::Uuid,
540534
drv: nix_utils::StorePath,
541-
resolved_drv: Option<nix_utils::StorePath>,
542535
max_log_size: u64,
543536
max_silent_time: i32,
544537
build_timeout: i32,
@@ -560,15 +553,13 @@ impl Message {
560553
Self::BuildMessage {
561554
build_id,
562555
drv,
563-
resolved_drv,
564556
max_log_size,
565557
max_silent_time,
566558
build_timeout,
567559
presigned_url_opts,
568560
} => runner_request::Message::Build(BuildMessage {
569561
build_id: build_id.to_string(),
570562
drv: Some(shared::proto::ProtoStorePath::from(drv)),
571-
resolved_drv: resolved_drv.map(shared::proto::ProtoStorePath::from),
572563
max_log_size,
573564
max_silent_time,
574565
build_timeout,
@@ -696,15 +687,15 @@ impl Machine {
696687
pub async fn build_drv(
697688
&self,
698689
job: Job,
690+
effective_drv: nix_utils::StorePath,
699691
opts: &nix_utils::BuildOptions,
700692
presigned_url_opts: Option<PresignedUrlOpts>,
701693
) -> anyhow::Result<()> {
702-
let drv = job.path.clone();
694+
let drv = effective_drv;
703695
self.msg_queue
704696
.send(Message::BuildMessage {
705697
build_id: job.internal_build_id,
706698
drv,
707-
resolved_drv: job.resolved_drv.clone(),
708699
max_log_size: opts.get_max_log_size(),
709700
max_silent_time: opts.get_max_silent_time(),
710701
build_timeout: opts.get_build_timeout(),

0 commit comments

Comments
 (0)