Skip to content

Commit 80430bf

Browse files
committed
Create new Steps for resolved derivations
Instead of replacing the current step with a new resolved one when something is resolved, punt it back to the scheduler. This will grant us more flexibility in execution and the method can be used in the future for dynamic derivations, which won't map 1:1 with the original derivations.
1 parent 49ed9e1 commit 80430bf

4 files changed

Lines changed: 96 additions & 28 deletions

File tree

subprojects/hydra-queue-runner/src/state/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct Build {
2525
pub local_priority: i32,
2626
pub global_priority: AtomicI32,
2727

28-
toplevel: arc_swap::ArcSwapOption<Step>,
28+
pub toplevel: arc_swap::ArcSwapOption<Step>,
2929
pub jobset: Arc<Jobset>,
3030

3131
finished_in_db: AtomicBool,

subprojects/hydra-queue-runner/src/state/mod.rs

Lines changed: 80 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod step;
1010
mod step_info;
1111
mod uploader;
1212

13+
use anyhow::Context as _;
1314
pub use atomic::AtomicDateTime;
1415
pub use build::{Build, BuildOutput, BuildResultState, BuildTimings, Builds, RemoteBuild};
1516
pub use jobset::{Jobset, JobsetID, Jobsets};
@@ -46,6 +47,8 @@ enum CreateStepResult {
4647

4748
enum RealiseStepResult {
4849
None,
50+
/// Created a new resolved BuildStep
51+
Resolved,
4952
Valid(Arc<Machine>),
5053
MaybeCancelled,
5154
CachedFailure,
@@ -275,7 +278,7 @@ impl State {
275278
let drv = step_info.step.get_drv_path();
276279
let mut build_options = nix_utils::BuildOptions::new(None);
277280

278-
let build_id = {
281+
let build = {
279282
let mut dependents = HashSet::new();
280283
let mut steps = HashSet::new();
281284
step_info.step.get_dependents(&mut dependents, &mut steps);
@@ -308,9 +311,11 @@ impl State {
308311
build_options
309312
.set_max_silent_time(biggest_max_silent_time.unwrap_or(build.max_silent_time));
310313
build_options.set_build_timeout(biggest_build_timeout.unwrap_or(build.timeout));
311-
build.id
314+
build.clone()
312315
};
313316

317+
let build_id = build.id;
318+
314319
let mut job = machine::Job::new(build_id, drv.to_owned());
315320
job.result.set_start_time_now();
316321
if self.check_cached_failure(step_info.step.clone()).await {
@@ -403,10 +408,11 @@ impl State {
403408
}
404409
};
405410

406-
let effective_drv = if let Some(resolved_path) = resolved {
411+
if let Some(resolved_path) = resolved {
407412
tracing::info!("resolved CA derivation {drv} -> {resolved_path}");
408413

409414
// Finish original step as "resolved".
415+
step_info.step.set_finished(true);
410416
let mut resolved_result = RemoteBuild::new();
411417
resolved_result.step_status = BuildStatus::Resolved;
412418
resolved_result.set_start_time_now();
@@ -424,30 +430,77 @@ impl State {
424430
.await?;
425431

426432
// Create DB step for the resolved drv under the same build.
427-
let resolved_step_nr = {
428-
let mut tx = db.begin_transaction().await?;
429-
let nr = tx
430-
.create_build_step(
431-
Some(job.result.get_start_time_as_i32()?),
432-
build_id,
433-
&self.store.print_store_path(&resolved_path),
434-
step_info.step.get_system().as_deref(),
435-
machine.hostname.clone(),
436-
BuildStatus::Busy,
437-
None,
438-
None,
439-
vec![],
440-
)
441-
.await?;
433+
let mut tx = db.begin_transaction().await?;
434+
let nr = tx
435+
.create_build_step(
436+
Some(job.result.get_start_time_as_i32()?),
437+
build_id,
438+
&self.store.print_store_path(&resolved_path),
439+
step_info.step.get_system().as_deref(),
440+
machine.hostname.clone(),
441+
BuildStatus::Busy,
442+
None,
443+
None,
444+
vec![],
445+
)
446+
.await?;
442447

443-
tx.set_resolved_to(build_id, step_nr, nr).await?;
444-
tx.commit().await?;
445-
nr
448+
tx.set_resolved_to(build_id, step_nr, nr).await?;
449+
tx.commit().await?;
450+
451+
// Actually schedule the step
452+
let resolved_step = match self
453+
.create_step(
454+
build.clone(),
455+
resolved_path,
456+
Some(build.clone()),
457+
None,
458+
Arc::new(parking_lot::RwLock::new(HashSet::new())),
459+
Arc::new(parking_lot::RwLock::new(HashSet::new())),
460+
Arc::new(parking_lot::RwLock::new(HashSet::new())),
461+
)
462+
.await
463+
{
464+
CreateStepResult::None => {
465+
return Err(anyhow::anyhow!("Could not create resolved build step"));
466+
}
467+
CreateStepResult::Valid(step) => step,
468+
CreateStepResult::PreviousFailure(step) => {
469+
self.handle_previous_failure(build.clone(), step.clone())
470+
.await
471+
.with_context(|| {
472+
format!(
473+
"Failed to handle previous failure in resolved version of {drv}"
474+
)
475+
})?;
476+
return Ok(RealiseStepResult::CachedFailure);
477+
}
446478
};
447-
job.step_nr = resolved_step_nr;
448-
resolved_path
449-
} else {
450-
drv.to_owned()
479+
480+
resolved_step.make_runnable();
481+
482+
// Copy rdeps so dependents don't try to start too early
483+
for rdep in step_info.step.clone_rdeps() {
484+
if let Some(rdep) = rdep.upgrade() {
485+
resolved_step.make_rdep(&rdep);
486+
}
487+
}
488+
489+
// If we're the root of a build then we need to become the new root,
490+
// lest the Step get garbage collected.
491+
if *build
492+
.toplevel
493+
.compare_and_swap(&step_info.step, Some(resolved_step))
494+
== Some(step_info.step.clone())
495+
{
496+
build.propagate_priorities();
497+
}
498+
499+
// New steps runnable
500+
self.trigger_dispatch();
501+
502+
// No more work to do, build will happen in another step
503+
return Ok(RealiseStepResult::Resolved);
451504
};
452505

453506
{
@@ -456,7 +509,7 @@ impl State {
456509
tx.commit().await?;
457510
}
458511
tracing::info!(
459-
"Submitting build drv={effective_drv} on machine={} hostname={} build_id={build_id} step_nr={}",
512+
"Submitting build drv={drv} on machine={} hostname={} build_id={build_id} step_nr={}",
460513
machine.id,
461514
machine.hostname,
462515
job.step_nr,
@@ -473,7 +526,7 @@ impl State {
473526
machine
474527
.build_drv(
475528
job,
476-
effective_drv,
529+
drv.clone(),
477530
&build_options,
478531
// TODO: cleanup
479532
if self.config.use_presigned_uploads() {

subprojects/hydra-queue-runner/src/state/queue.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ impl Queues {
504504
nr_waiting += 1;
505505
nr_steps_waiting_all_queues += 1;
506506
}
507+
Ok(crate::state::RealiseStepResult::Resolved) => {}
507508
Ok(
508509
crate::state::RealiseStepResult::MaybeCancelled
509510
| crate::state::RealiseStepResult::CachedFailure,

subprojects/hydra-queue-runner/src/state/step.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,20 @@ impl Step {
351351
.store(state.deps.len() as u64, Ordering::Relaxed);
352352
}
353353

354+
pub fn make_rdep(self: &Arc<Self>, dep: &Arc<Self>) {
355+
dep.add_dep(self.clone());
356+
let mut state = self.state.write();
357+
state.rdeps.push(Arc::downgrade(dep));
358+
self.atomic_state
359+
.rdeps_len
360+
.store(state.rdeps.len() as u64, Ordering::Relaxed);
361+
}
362+
363+
pub fn clone_rdeps(&self) -> Vec<Weak<Step>> {
364+
let state = self.state.read();
365+
state.rdeps.clone()
366+
}
367+
354368
pub fn add_referring_data(
355369
&self,
356370
referring_build: Option<&Arc<Build>>,

0 commit comments

Comments
 (0)