Skip to content

Commit 1d0e9cf

Browse files
committed
chore: remove codspeed dep, switch to tokio unix pipe, add runner fifo, enable multi-threaded tokio, add bench_order
1 parent b6df390 commit 1d0e9cf

7 files changed

Lines changed: 204 additions & 144 deletions

File tree

Cargo.lock

Lines changed: 6 additions & 70 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ sysinfo = { version = "0.33.1", features = ["serde"] }
4646
indicatif = "0.17.8"
4747
console = "0.15.8"
4848
async-trait = "0.1.82"
49-
codspeed = { git = "https://github.com/CodspeedHQ/codspeed-rust", branch = "cod-674-collect-profiles-while-benchmarks-are-running", version = "=2.9.1" }
5049
libc = "0.2.171"
5150
bincode = "1.3.3"
5251
object = "0.36.7"
5352
linux-perf-data = "0.11.0"
5453
debugid = "0.8.0"
5554
memmap2 = "0.9.5"
55+
nix = { version = "0.29.0", features = ["fs"] }
5656

5757
[dev-dependencies]
5858
temp-env = { version = "0.3.6", features = ["async_closure"] }

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ lazy_static! {
2727
format!("{VALGRIND_CODSPEED_BASE_VERSION}-0codspeed1");
2828
}
2929

30-
#[tokio::main(flavor = "current_thread")]
30+
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
3131
async fn main() {
3232
let res = crate::app::run().await;
3333
if let Err(err) = res {

src/run/runner/wall_time/executor.rs

Lines changed: 72 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,11 @@ use crate::run::runner::helpers::run_command_with_log_pipe::{
88
};
99
use crate::run::runner::wall_time::perf;
1010
use crate::run::runner::wall_time::perf::fifo::PerfFifo;
11+
use crate::run::runner::wall_time::perf::fifo::RunnerFifo;
1112
use crate::run::runner::{ExecutorName, RunData, RunnerMode};
1213
use crate::run::{check_system::SystemInfo, config::Config};
1314
use async_trait::async_trait;
14-
use codspeed::fifo::FifoIpc;
15-
use codspeed::fifo::RUNNER_ACK_FIFO;
16-
use codspeed::fifo::RUNNER_CTL_FIFO;
15+
use std::cell::OnceCell;
1716
use std::fs::canonicalize;
1817
use std::process::Command;
1918
use tempfile::TempDir;
@@ -24,13 +23,20 @@ use super::perf::unwind_data::UnwindDataLoader;
2423
const PERF_DATA_PREFIX: &str = "perf.data.";
2524

2625
pub struct WallTimeExecutor {
26+
use_perf: bool,
2727
perf_dir: TempDir,
28+
bench_order: OnceCell<Vec<String>>,
2829
}
2930

3031
impl WallTimeExecutor {
3132
pub fn new() -> Self {
33+
let use_perf = std::env::var("USE_PERF").map(|v| v == "1").unwrap_or(true);
34+
debug!("Running the cmd with perf: {}", use_perf);
35+
3236
Self {
33-
perf_dir: tempfile::tempdir().unwrap(),
37+
use_perf,
38+
perf_dir: tempfile::tempdir().expect("Failed to create temporary directory"),
39+
bench_order: OnceCell::new(),
3440
}
3541
}
3642
}
@@ -63,12 +69,9 @@ impl Executor for WallTimeExecutor {
6369
cmd.current_dir(abs_cwd);
6470
}
6571

66-
let use_perf = std::env::var("USE_PERF").map(|v| v == "1").unwrap_or(true);
67-
debug!("Running the cmd with perf: {}", use_perf);
68-
69-
let status = if use_perf {
72+
let status = if self.use_perf {
7073
let perf_fifo = PerfFifo::new()?;
71-
let ctl_fifo = FifoIpc::create(RUNNER_CTL_FIFO)?.with_reader()?;
74+
let runner_fifo = RunnerFifo::new()?;
7275

7376
// We have to pass a file to perf, which will create `perf.data.<timestamp>` files
7477
// when the output is split.
@@ -80,7 +83,7 @@ impl Executor for WallTimeExecutor {
8083
cmd.args([
8184
"-c",
8285
&format!(
83-
"perf record --data --freq=1000 --switch-output --control=fifo:{},{} --delay=-1 -g --call-graph=dwarf --output={} -- {}",
86+
"perf record --data --freq=max --switch-output --control=fifo:{},{} --delay=-1 -g --call-graph=dwarf --output={} -- {}",
8487
perf_fifo.ctl_fifo_path.to_string_lossy(),
8588
perf_fifo.ack_fifo_path.to_string_lossy(),
8689
perf_file.path().to_string_lossy(),
@@ -89,28 +92,28 @@ impl Executor for WallTimeExecutor {
8992
]);
9093
debug!("cmd: {:?}", cmd);
9194

95+
let mut thread_handle = None;
9296
let on_process_started = |pid: u32| -> anyhow::Result<()> {
9397
debug!("Process id: {}", pid);
9498

95-
let ack_fifo = FifoIpc::create(RUNNER_ACK_FIFO)?
96-
.with_reader()? // FIFO needs a reader to be opened with writer
97-
.with_writer()?;
98-
99-
std::thread::spawn(move || {
100-
let rt = tokio::runtime::Runtime::new().unwrap();
101-
rt.block_on(async move {
102-
if let Err(error) =
103-
perf::fifo::handle_fifo(pid, ctl_fifo, ack_fifo, perf_fifo).await
104-
{
105-
error!("Error handling FIFO: {}", error);
106-
}
107-
});
99+
let handle = tokio::task::spawn(async move {
100+
perf::fifo::handle_fifo(pid, runner_fifo, perf_fifo).await
108101
});
102+
thread_handle = Some(handle);
109103

110104
Ok(())
111105
};
106+
let status = run_command_with_log_pipe_and_callback(cmd, on_process_started);
107+
info!("Process finished with status: {:?}", status);
108+
109+
// Write the bench_order to the perf directory
110+
let bench_order = thread_handle
111+
.context("No thread found")?
112+
.await
113+
.map_err(|e| anyhow!("failed to join thread: {:?}", e))??;
114+
let _ = self.bench_order.set(bench_order);
112115

113-
run_command_with_log_pipe_and_callback(cmd, on_process_started)
116+
status
114117
} else {
115118
cmd.args(["-c", get_bench_command(config)?.as_str()]);
116119
run_command_with_log_pipe(cmd)
@@ -132,30 +135,53 @@ impl Executor for WallTimeExecutor {
132135
_system_info: &SystemInfo,
133136
run_data: &RunData,
134137
) -> Result<()> {
135-
// Copy the perf data files to the profile folder
136-
let map_files = std::fs::read_dir(&self.perf_dir)?
137-
.filter_map(|entry| entry.ok())
138-
.map(|entry| entry.path())
139-
.filter(|path| {
140-
path.file_name()
141-
.map(|name| name.to_string_lossy().starts_with(PERF_DATA_PREFIX))
142-
.unwrap_or(false)
143-
});
144-
for entry in map_files {
145-
let perf_map = SyntheticPerfMap::from_perf_file(entry.as_path());
146-
let _ = perf_map.save_to(&run_data.profile_folder);
147-
148-
if let Some(data) = UnwindDataLoader::from_perf_file(entry.as_path()) {
149-
data.save_to(&run_data.profile_folder)?;
138+
debug!("Copying files to the profile folder");
139+
140+
if self.use_perf {
141+
let bench_order = self
142+
.bench_order
143+
.get()
144+
.expect("Benchmark order is not available");
145+
146+
// Copy the perf data files to the profile folder
147+
let map_files = std::fs::read_dir(&self.perf_dir)?
148+
.filter_map(|entry| entry.ok())
149+
.map(|entry| entry.path())
150+
.filter(|path| {
151+
path.file_name()
152+
.map(|name| name.to_string_lossy().starts_with(PERF_DATA_PREFIX))
153+
.unwrap_or(false)
154+
})
155+
.collect::<Vec<_>>();
156+
157+
assert_eq!(
158+
map_files.len() - 1, // First perf.data is empty
159+
bench_order.len(),
160+
"Number of perf data files does not match the number of benchmarks"
161+
);
162+
163+
for entry in map_files {
164+
let perf_map = SyntheticPerfMap::from_perf_file(entry.as_path());
165+
let _ = perf_map.save_to(&run_data.profile_folder);
166+
167+
if let Some(data) = UnwindDataLoader::from_perf_file(entry.as_path()) {
168+
data.save_to(&run_data.profile_folder)?;
169+
}
170+
171+
let src_path = &entry;
172+
let dst_file_name = format!(
173+
"{}.perf",
174+
entry.file_name().unwrap_or_default().to_string_lossy()
175+
);
176+
let dst_path = run_data.profile_folder.join(dst_file_name);
177+
std::fs::copy(src_path, dst_path)?;
150178
}
151179

152-
let src_path = &entry;
153-
let dst_file_name = format!(
154-
"{}.perf",
155-
entry.file_name().unwrap_or_default().to_string_lossy()
156-
);
157-
let dst_path = run_data.profile_folder.join(dst_file_name);
158-
std::fs::copy(src_path, dst_path)?;
180+
// Copy bench_order.txt to the profile folder
181+
std::fs::write(
182+
run_data.profile_folder.join("metadata.bench_order"),
183+
bench_order.join("\n"),
184+
)?;
159185
}
160186

161187
Ok(())

0 commit comments

Comments
 (0)