Skip to content

Commit 0768499

Browse files
committed
wip
1 parent 49ed9e1 commit 0768499

4 files changed

Lines changed: 86 additions & 28 deletions

File tree

subprojects/hydra-queue-runner/src/state/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct Build {
2525
pub local_priority: i32,
2626
pub global_priority: AtomicI32,
2727

28-
toplevel: arc_swap::ArcSwapOption<Step>,
28+
pub toplevel: arc_swap::ArcSwapOption<Step>,
2929
pub jobset: Arc<Jobset>,
3030

3131
finished_in_db: AtomicBool,

subprojects/hydra-queue-runner/src/state/mod.rs

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ enum CreateStepResult {
4646

4747
enum RealiseStepResult {
4848
None,
49+
/// Created a new resolved BuildStep
50+
Resolved,
4951
Valid(Arc<Machine>),
5052
MaybeCancelled,
5153
CachedFailure,
@@ -275,7 +277,7 @@ impl State {
275277
let drv = step_info.step.get_drv_path();
276278
let mut build_options = nix_utils::BuildOptions::new(None);
277279

278-
let build_id = {
280+
let build = {
279281
let mut dependents = HashSet::new();
280282
let mut steps = HashSet::new();
281283
step_info.step.get_dependents(&mut dependents, &mut steps);
@@ -308,9 +310,11 @@ impl State {
308310
build_options
309311
.set_max_silent_time(biggest_max_silent_time.unwrap_or(build.max_silent_time));
310312
build_options.set_build_timeout(biggest_build_timeout.unwrap_or(build.timeout));
311-
build.id
313+
build.clone()
312314
};
313315

316+
let build_id = build.id;
317+
314318
let mut job = machine::Job::new(build_id, drv.to_owned());
315319
job.result.set_start_time_now();
316320
if self.check_cached_failure(step_info.step.clone()).await {
@@ -403,10 +407,11 @@ impl State {
403407
}
404408
};
405409

406-
let effective_drv = if let Some(resolved_path) = resolved {
410+
if let Some(resolved_path) = resolved {
407411
tracing::info!("resolved CA derivation {drv} -> {resolved_path}");
408412

409413
// Finish original step as "resolved".
414+
step_info.step.set_finished(true);
410415
let mut resolved_result = RemoteBuild::new();
411416
resolved_result.step_status = BuildStatus::Resolved;
412417
resolved_result.set_start_time_now();
@@ -424,30 +429,68 @@ impl State {
424429
.await?;
425430

426431
// Create DB step for the resolved drv under the same build.
427-
let resolved_step_nr = {
428-
let mut tx = db.begin_transaction().await?;
429-
let nr = tx
430-
.create_build_step(
431-
Some(job.result.get_start_time_as_i32()?),
432-
build_id,
433-
&self.store.print_store_path(&resolved_path),
434-
step_info.step.get_system().as_deref(),
435-
machine.hostname.clone(),
436-
BuildStatus::Busy,
437-
None,
438-
None,
439-
vec![],
440-
)
441-
.await?;
432+
let mut tx = db.begin_transaction().await?;
433+
let nr = tx
434+
.create_build_step(
435+
Some(job.result.get_start_time_as_i32()?),
436+
build_id,
437+
&self.store.print_store_path(&resolved_path),
438+
step_info.step.get_system().as_deref(),
439+
machine.hostname.clone(),
440+
BuildStatus::Busy,
441+
None,
442+
None,
443+
vec![],
444+
)
445+
.await?;
442446

443-
tx.set_resolved_to(build_id, step_nr, nr).await?;
444-
tx.commit().await?;
445-
nr
447+
tx.set_resolved_to(build_id, step_nr, nr).await?;
448+
tx.commit().await?;
449+
450+
// Actually schedule the step
451+
let resolved_step = match self
452+
.create_step(
453+
build.clone(),
454+
resolved_path,
455+
Some(build.clone()),
456+
None,
457+
Arc::new(parking_lot::RwLock::new(HashSet::new())),
458+
Arc::new(parking_lot::RwLock::new(HashSet::new())),
459+
Arc::new(parking_lot::RwLock::new(HashSet::new())),
460+
)
461+
.await
462+
{
463+
CreateStepResult::None => {
464+
return Err(anyhow::anyhow!("Could not create resolved build step"));
465+
}
466+
CreateStepResult::Valid(step) => step,
467+
CreateStepResult::PreviousFailure(step) => step,
446468
};
447-
job.step_nr = resolved_step_nr;
448-
resolved_path
449-
} else {
450-
drv.to_owned()
469+
470+
resolved_step.make_runnable();
471+
472+
// Copy rdeps so dependents don't try to start too early
473+
for rdep in step_info.step.clone_rdeps() {
474+
if let Some(rdep) = rdep.upgrade() {
475+
resolved_step.make_rdep(&rdep);
476+
}
477+
}
478+
479+
// If we're the root of a build then we need to become the new root,
480+
// lest the Step get garbage collected.
481+
if *build
482+
.toplevel
483+
.compare_and_swap(&step_info.step, Some(resolved_step))
484+
== Some(step_info.step.clone())
485+
{
486+
build.propagate_priorities();
487+
}
488+
489+
// New steps runnable
490+
self.trigger_dispatch();
491+
492+
// No more work to do, build will happen in another step
493+
return Ok(RealiseStepResult::Resolved);
451494
};
452495

453496
{
@@ -456,7 +499,7 @@ impl State {
456499
tx.commit().await?;
457500
}
458501
tracing::info!(
459-
"Submitting build drv={effective_drv} on machine={} hostname={} build_id={build_id} step_nr={}",
502+
"Submitting build drv={drv} on machine={} hostname={} build_id={build_id} step_nr={}",
460503
machine.id,
461504
machine.hostname,
462505
job.step_nr,
@@ -473,7 +516,7 @@ impl State {
473516
machine
474517
.build_drv(
475518
job,
476-
effective_drv,
519+
drv.clone(),
477520
&build_options,
478521
// TODO: cleanup
479522
if self.config.use_presigned_uploads() {

subprojects/hydra-queue-runner/src/state/queue.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ impl Queues {
504504
nr_waiting += 1;
505505
nr_steps_waiting_all_queues += 1;
506506
}
507+
Ok(crate::state::RealiseStepResult::Resolved) => {}
507508
Ok(
508509
crate::state::RealiseStepResult::MaybeCancelled
509510
| crate::state::RealiseStepResult::CachedFailure,

subprojects/hydra-queue-runner/src/state/step.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,20 @@ impl Step {
351351
.store(state.deps.len() as u64, Ordering::Relaxed);
352352
}
353353

354+
pub fn make_rdep(self: &Arc<Self>, dep: &Arc<Self>) {
355+
dep.add_dep(self.clone());
356+
let mut state = self.state.write();
357+
state.rdeps.push(Arc::downgrade(dep));
358+
self.atomic_state
359+
.rdeps_len
360+
.store(state.rdeps.len() as u64, Ordering::Relaxed);
361+
}
362+
363+
pub fn clone_rdeps(&self) -> Vec<Weak<Step>> {
364+
let state = self.state.read();
365+
state.rdeps.clone()
366+
}
367+
354368
pub fn add_referring_data(
355369
&self,
356370
referring_build: Option<&Arc<Build>>,

0 commit comments

Comments
 (0)