Skip to content

Commit ed20ce4

Browse files
tmt/test: Sort tests by time taken, use mpsc channels
Sort tests in descending order of time taken for completion so longer tests get scheduled together. Also, update to use mpsc channels for communication between threads Signed-off-by: Pragyan Poudyal <pragyanpoudyal41999@gmail.com>
1 parent 65a094b commit ed20ce4

1 file changed

Lines changed: 66 additions & 17 deletions

File tree

crates/xtask/src/tmt.rs

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::thread::JoinHandle;
1+
use std::sync::mpsc;
22
use std::time::Duration;
3+
use std::usize;
34

45
use anyhow::{Context, Result};
56
use camino::{Utf8Path, Utf8PathBuf};
@@ -39,6 +40,36 @@ const ENV_BOOTC_UPGRADE_IMAGE: &str = "BOOTC_upgrade_image";
3940
// Distro identifiers
4041
const DISTRO_CENTOS_9: &str = "centos-9";
4142

43+
// Tests sorted by time taken (descending)
44+
const TESTS_SORTED_BY_TIME: [&str; 23] = [
45+
// 10+ mins
46+
"multi-device-esp",
47+
"composefs-gc-uki",
48+
"composefs-gc",
49+
// 5+ mins
50+
"loader-entries-source",
51+
"download-only-upgrade",
52+
"bib-build",
53+
"rollback",
54+
"logically-bound-switch",
55+
"soft-reboot",
56+
"switch-to-unified",
57+
"image-pushpull-upgrade",
58+
"install-no-boot-dir",
59+
"upgrade-tag",
60+
"custom-selinux-policy",
61+
"factory-reset",
62+
// 3+ mins
63+
"upgrade-check-status",
64+
"soft-reboot-selinux-policy",
65+
"install-bootloader-none",
66+
"install-outside-container",
67+
"install-unified-flag",
68+
"usroverlay",
69+
"image-upgrade-reboot",
70+
"install-karg-delete",
71+
];
72+
4273
// Import the argument types from xtask.rs
4374
use crate::bcvk::BcvkInstallOpts;
4475
use crate::{RunTmtArgs, SealState, TmtProvisionArgs};
@@ -653,6 +684,13 @@ pub(crate) fn run_tmt(sh: &Shell, args: &RunTmtArgs) -> Result<()> {
653684
return Ok(());
654685
}
655686

687+
plans.sort_by_key(|full_plan_name| {
688+
TESTS_SORTED_BY_TIME
689+
.iter()
690+
.position(|test_time| full_plan_name.contains(test_time))
691+
.unwrap_or(usize::MAX)
692+
});
693+
656694
println!("Found {} test plan(s): {:?}", plans.len(), plans);
657695

658696
let mut install_opts = Vec::new();
@@ -696,7 +734,7 @@ pub(crate) fn run_tmt(sh: &Shell, args: &RunTmtArgs) -> Result<()> {
696734
// Environment variables to pass to tmt (in addition to args.env)
697735
let mut tmt_env_vars = Vec::new();
698736

699-
let mut handles: Vec<JoinHandle<RunPlanResult>> = vec![];
737+
let mut active_threads = 0;
700738

701739
let num_cpu = std::thread::available_parallelism()
702740
.map(|c| c.get())
@@ -715,6 +753,8 @@ pub(crate) fn run_tmt(sh: &Shell, args: &RunTmtArgs) -> Result<()> {
715753

716754
println!("parallel_vms: {parallel_vms}");
717755

756+
let (tx, rx) = mpsc::channel::<RunPlanResult>();
757+
718758
// Run each plan in its own VM
719759
for plan in plans {
720760
let plan_name = sanitize_plan_name(plan);
@@ -773,8 +813,9 @@ pub(crate) fn run_tmt(sh: &Shell, args: &RunTmtArgs) -> Result<()> {
773813
let vm_mem = vm_mem.to_string();
774814
let vm_cpu = vm_cpu.to_string();
775815

776-
let handle = std::thread::spawn(move || {
777-
run_plan(
816+
let tx_clone = tx.clone();
817+
std::thread::spawn(move || {
818+
let result = run_plan(
778819
cloned_plan,
779820
cloned_vm_name,
780821
image,
@@ -786,36 +827,44 @@ pub(crate) fn run_tmt(sh: &Shell, args: &RunTmtArgs) -> Result<()> {
786827
preserve_vm,
787828
vm_cpu,
788829
vm_mem,
789-
)
790-
});
830+
);
791831

792-
handles.push(handle);
832+
if let Err(e) = tx_clone.send(result) {
833+
eprintln!("Failed to send result through channel: {}", e);
834+
}
835+
});
793836

794-
if handles.len() >= parallel_vms {
795-
let e = handles.remove(0).join();
837+
active_threads += 1;
796838

797-
match e {
839+
// wait for a thread to complete if we've reached the parallel limit
840+
if active_threads >= parallel_vms {
841+
match rx.recv() {
798842
Ok(plan_result) => {
799843
test_results.push(plan_result);
844+
active_threads -= 1;
800845
}
801-
802846
Err(e) => {
803-
eprintln!("Join failed: {e:?}");
847+
eprintln!("Failed to receive result from channel: {}", e);
848+
// still decrement to avoid infinite loop
849+
// in theory this shouldn't happen as we loop over plans, but
850+
// for sanity
851+
active_threads -= 1;
804852
}
805853
}
806854
}
807855
}
808856

809-
for h in handles {
810-
let e = h.join();
857+
// drop the sender to signal no more messages
858+
drop(tx);
811859

812-
match e {
860+
// remaining results from channel
861+
for _ in 0..active_threads {
862+
match rx.recv() {
813863
Ok(plan_result) => {
814864
test_results.push(plan_result);
815865
}
816-
817866
Err(e) => {
818-
eprintln!("Join failed: {e:?}");
867+
eprintln!("Failed to receive remaining result from channel: {}", e);
819868
}
820869
}
821870
}

0 commit comments

Comments
 (0)