Skip to content

Commit 1b3b544

Browse files
committed
refactor(walltime): port PerfRunner to Profiler trait
PerfRunner becomes PerfProfiler implementing the Profiler trait. The RunnerFifo loop moves out of the perf module and into WallTimeExecutor, which now drives the trait callbacks (on_start_benchmark / on_stop_benchmark / on_ping / GetIntegrationMode) generically. Profiler::wrap stashes the perf control fifo and output path on the profiler instance; finalize harvests the perf.data artifacts and writes walltime.metadata. The OnceCell<BenchmarkData> bridge is gone — fifo data and timestamps are passed directly into finalize. Executor::run now takes &mut self to allow profilers to hold per-run state on the trait object. This cascades through run_executor, orchestrator, and the Memory/Valgrind executors (no behavioral change for those — they don't mutate self).
1 parent d4e32d1 commit 1b3b544

7 files changed

Lines changed: 189 additions & 163 deletions

File tree

src/executor/memory/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl Executor for MemoryExecutor {
9393
}
9494

9595
async fn run(
96-
&self,
96+
&mut self,
9797
execution_context: &ExecutionContext,
9898
_mongo_tracer: &Option<MongoTracer>,
9999
) -> Result<()> {

src/executor/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ pub trait Executor {
106106

107107
/// Runs the executor
108108
async fn run(
109-
&self,
109+
&mut self,
110110
execution_context: &ExecutionContext,
111111
// TODO: use Instruments instead of directly passing the mongodb tracer
112112
mongo_tracer: &Option<MongoTracer>,
@@ -118,7 +118,7 @@ pub trait Executor {
118118
/// Run a single executor: setup → run → teardown → persist logs.
119119
/// Does NOT upload.
120120
pub async fn run_executor(
121-
executor: &dyn Executor,
121+
executor: &mut dyn Executor,
122122
orchestrator: &Orchestrator,
123123
execution_context: &ExecutionContext,
124124
setup_cache_dir: Option<&Path>,

src/executor/orchestrator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl Orchestrator {
157157
let config = self
158158
.config
159159
.executor_config_for_command(part.command, !part.uses_exec_harness);
160-
let executor = get_executor_from_mode(part.mode);
160+
let mut executor = get_executor_from_mode(part.mode);
161161
let profile_folder =
162162
self.resolve_profile_folder(&executor.name(), run_part_index, total_parts)?;
163163

@@ -167,7 +167,7 @@ impl Orchestrator {
167167
activate_rolling_buffer(&part.label);
168168
}
169169

170-
run_executor(executor.as_ref(), self, &ctx, setup_cache_dir).await?;
170+
run_executor(executor.as_mut(), self, &ctx, setup_cache_dir).await?;
171171

172172
if !self.config.show_full_output {
173173
deactivate_rolling_buffer();

src/executor/tests.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,20 +157,19 @@ mod valgrind {
157157
use super::helpers::*;
158158
use crate::executor::valgrind::executor::ValgrindExecutor;
159159

160-
async fn get_valgrind_executor() -> (SemaphorePermit<'static>, &'static ValgrindExecutor) {
161-
static VALGRIND_EXECUTOR: OnceCell<ValgrindExecutor> = OnceCell::const_new();
160+
async fn get_valgrind_executor() -> (SemaphorePermit<'static>, ValgrindExecutor) {
161+
static VALGRIND_SETUP: OnceCell<()> = OnceCell::const_new();
162162

163-
let executor = VALGRIND_EXECUTOR
163+
VALGRIND_SETUP
164164
.get_or_init(|| async {
165165
let executor = ValgrindExecutor;
166166
let system_info = SystemInfo::new().unwrap();
167167
executor.setup(&system_info, None).await.unwrap();
168-
executor
169168
})
170169
.await;
171170
let _lock = acquire_bpf_instrumentation_lock().await;
172171

173-
(_lock, executor)
172+
(_lock, ValgrindExecutor)
174173
}
175174

176175
fn valgrind_config(command: &str) -> ExecutorConfig {
@@ -183,7 +182,7 @@ mod valgrind {
183182
#[apply(test_cases)]
184183
#[test_log::test(tokio::test)]
185184
async fn test_valgrind_executor(#[case] cmd: &str) {
186-
let (_lock, executor) = get_valgrind_executor().await;
185+
let (_lock, mut executor) = get_valgrind_executor().await;
187186

188187
let config = valgrind_config(cmd);
189188
// Unset GITHUB_ACTIONS to force LocalProvider which supports repository_override
@@ -197,7 +196,7 @@ mod valgrind {
197196
#[apply(env_test_cases)]
198197
#[test_log::test(tokio::test)]
199198
async fn test_valgrind_executor_with_env(#[case] env_case: (&str, &str)) {
200-
let (_lock, executor) = get_valgrind_executor().await;
199+
let (_lock, mut executor) = get_valgrind_executor().await;
201200

202201
let (env_var, env_value) = env_case;
203202
temp_env::async_with_vars(
@@ -252,7 +251,7 @@ mod walltime {
252251
#[rstest::rstest]
253252
#[test_log::test(tokio::test)]
254253
async fn test_walltime_executor(#[case] cmd: &str, #[values(false, true)] enable_perf: bool) {
255-
let (_permit, executor) = get_walltime_executor().await;
254+
let (_permit, mut executor) = get_walltime_executor().await;
256255

257256
let config = walltime_config(cmd, enable_perf);
258257
// Unset GITHUB_ACTIONS to force LocalProvider which supports repository_override
@@ -270,7 +269,7 @@ mod walltime {
270269
#[case] env_case: (&str, &str),
271270
#[values(false, true)] enable_perf: bool,
272271
) {
273-
let (_permit, executor) = get_walltime_executor().await;
272+
let (_permit, mut executor) = get_walltime_executor().await;
274273

275274
let (env_var, env_value) = env_case;
276275
temp_env::async_with_vars(
@@ -289,7 +288,7 @@ mod walltime {
289288
#[rstest::rstest]
290289
#[test_log::test(tokio::test)]
291290
async fn test_walltime_executor_in_working_dir(#[values(false, true)] enable_perf: bool) {
292-
let (_permit, executor) = get_walltime_executor().await;
291+
let (_permit, mut executor) = get_walltime_executor().await;
293292

294293
let cmd = r#"
295294
if [ "$(basename "$(pwd)")" != "within_sub_directory" ]; then
@@ -321,7 +320,7 @@ fi
321320
#[rstest::rstest]
322321
#[test_log::test(tokio::test)]
323322
async fn test_walltime_executor_fails(#[values(false, true)] enable_perf: bool) {
324-
let (_permit, executor) = get_walltime_executor().await;
323+
let (_permit, mut executor) = get_walltime_executor().await;
325324

326325
let config = walltime_config("exit 1", enable_perf);
327326
// Unset GITHUB_ACTIONS to force LocalProvider which supports repository_override
@@ -361,7 +360,7 @@ fi
361360
async fn test_exec_harness(#[case] cmd: &str) {
362361
use exec_harness::walltime::WalltimeExecutionArgs;
363362

364-
let (_permit, executor) = get_walltime_executor().await;
363+
let (_permit, mut executor) = get_walltime_executor().await;
365364

366365
let walltime_args = WalltimeExecutionArgs {
367366
warmup_time: Some("0s".to_string()),
@@ -427,7 +426,7 @@ mod memory {
427426
#[apply(test_cases)]
428427
#[test_log::test(tokio::test)]
429428
async fn test_memory_executor(#[case] cmd: &str) {
430-
let (_permit, _lock, executor) = get_memory_executor().await;
429+
let (_permit, _lock, mut executor) = get_memory_executor().await;
431430

432431
// Unset GITHUB_ACTIONS to force LocalProvider which supports repository_override
433432
temp_env::async_with_vars(&[("GITHUB_ACTIONS", None::<&str>)], async {
@@ -441,7 +440,7 @@ mod memory {
441440
#[apply(env_test_cases)]
442441
#[test_log::test(tokio::test)]
443442
async fn test_memory_executor_with_env(#[case] env_case: (&str, &str)) {
444-
let (_permit, _lock, executor) = get_memory_executor().await;
443+
let (_permit, _lock, mut executor) = get_memory_executor().await;
445444

446445
let (env_var, env_value) = env_case;
447446
temp_env::async_with_vars(
@@ -473,7 +472,7 @@ fi
473472
);
474473
let config = memory_config(&cmd);
475474
let (execution_context, _temp_dir) = create_test_setup(config).await;
476-
let (_permit, _lock, executor) = get_memory_executor().await;
475+
let (_permit, _lock, mut executor) = get_memory_executor().await;
477476

478477
temp_env::async_with_vars(&[("PATH", Some(&modified_path))], async {
479478
executor.run(&execution_context, &None).await.unwrap();

src/executor/valgrind/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl Executor for ValgrindExecutor {
5050
}
5151

5252
async fn run(
53-
&self,
53+
&mut self,
5454
execution_context: &ExecutionContext,
5555
mongo_tracer: &Option<MongoTracer>,
5656
) -> Result<()> {

src/executor/wall_time/executor.rs

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,26 @@
11
use super::helpers::validate_walltime_results;
22
use super::isolation::wrap_with_isolation;
3-
use super::profiler::perf::PerfRunner;
3+
use super::profiler::Profiler;
4+
use super::profiler::perf::PerfProfiler;
45
use crate::executor::Executor;
56
use crate::executor::ExecutorConfig;
67
use crate::executor::ToolStatus;
78
use crate::executor::helpers::command::CommandBuilder;
89
use crate::executor::helpers::env::{build_path_env, get_base_injected_env};
910
use crate::executor::helpers::get_bench_command::get_bench_command;
1011
use crate::executor::helpers::run_command_with_log_pipe::run_command_with_log_pipe;
12+
use crate::executor::helpers::run_command_with_log_pipe::run_command_with_log_pipe_and_callback;
1113
use crate::executor::helpers::run_with_env::wrap_with_env;
1214
use crate::executor::helpers::run_with_sudo::wrap_with_sudo;
15+
use crate::executor::shared::fifo::RunnerFifo;
1316
use crate::executor::{ExecutionContext, ExecutorName, ExecutorSupport};
1417
use crate::instruments::mongo_tracer::MongoTracer;
1518
use crate::prelude::*;
1619
use crate::runner_mode::RunnerMode;
1720
use crate::system::{SupportedOs, SystemInfo};
1821
use async_trait::async_trait;
22+
use runner_shared::fifo::Command as FifoCommand;
23+
use runner_shared::fifo::IntegrationMode;
1924
use std::fs::canonicalize;
2025
use std::io::Write;
2126
use std::path::Path;
@@ -72,14 +77,17 @@ impl Drop for HookScriptsGuard {
7277
}
7378

7479
pub struct WallTimeExecutor {
75-
perf: Option<PerfRunner>,
80+
profiler: Option<Box<dyn Profiler>>,
7681
}
7782

7883
impl WallTimeExecutor {
7984
pub fn new() -> Self {
80-
Self {
81-
perf: cfg!(target_os = "linux").then(PerfRunner::new),
82-
}
85+
let profiler: Option<Box<dyn Profiler>> = if cfg!(target_os = "linux") {
86+
Some(Box::new(PerfProfiler::new()))
87+
} else {
88+
None
89+
};
90+
Self { profiler }
8391
}
8492

8593
fn walltime_bench_cmd(
@@ -122,9 +130,7 @@ impl Executor for WallTimeExecutor {
122130
}
123131

124132
fn tool_status(&self) -> Option<ToolStatus> {
125-
self.perf
126-
.as_ref()
127-
.map(|_| super::profiler::perf::setup::get_perf_status())
133+
self.profiler.as_ref().and_then(|p| p.tool_status())
128134
}
129135

130136
fn support_level(&self, system_info: &SystemInfo) -> ExecutorSupport {
@@ -136,33 +142,33 @@ impl Executor for WallTimeExecutor {
136142
}
137143

138144
async fn setup(&self, system_info: &SystemInfo, setup_cache_dir: Option<&Path>) -> Result<()> {
139-
if self.perf.is_some() {
140-
return PerfRunner::setup_environment(system_info, setup_cache_dir).await;
145+
if let Some(profiler) = &self.profiler {
146+
profiler.setup(system_info, setup_cache_dir).await?;
141147
}
142-
143148
Ok(())
144149
}
145150

146151
async fn run(
147-
&self,
152+
&mut self,
148153
execution_context: &ExecutionContext,
149154
_mongo_tracer: &Option<MongoTracer>,
150155
) -> Result<()> {
151-
let status = {
152-
let _guard = HookScriptsGuard::setup();
153-
154-
let (_env_file, _script_file, cmd_builder) =
155-
WallTimeExecutor::walltime_bench_cmd(&execution_context.config, execution_context)?;
156-
if let Some(perf) = &self.perf
157-
&& execution_context.config.enable_perf
158-
{
159-
perf.run(
156+
let _guard = HookScriptsGuard::setup();
157+
158+
let (_env_file, _script_file, cmd_builder) =
159+
WallTimeExecutor::walltime_bench_cmd(&execution_context.config, execution_context)?;
160+
161+
let status = match self.profiler.as_mut() {
162+
Some(profiler) if execution_context.config.enable_perf => {
163+
run_with_profiler(
164+
profiler.as_mut(),
160165
cmd_builder,
161166
&execution_context.config,
162167
&execution_context.profile_folder,
163168
)
164169
.await
165-
} else {
170+
}
171+
_ => {
166172
let cmd_builder = if cfg!(target_os = "linux") {
167173
wrap_with_sudo(cmd_builder)?
168174
} else {
@@ -187,13 +193,6 @@ impl Executor for WallTimeExecutor {
187193
async fn teardown(&self, execution_context: &ExecutionContext) -> Result<()> {
188194
debug!("Copying files to the profile folder");
189195

190-
if let Some(perf) = &self.perf
191-
&& execution_context.config.enable_perf
192-
{
193-
perf.save_files_to(&execution_context.profile_folder)
194-
.await?;
195-
}
196-
197196
validate_walltime_results(
198197
&execution_context.profile_folder,
199198
execution_context.config.allow_empty,
@@ -203,6 +202,55 @@ impl Executor for WallTimeExecutor {
203202
}
204203
}
205204

205+
/// Drive a single benchmark run through a [`Profiler`]: wrap the command,
206+
/// spawn it, dispatch FIFO commands from the integration into the profiler's
207+
/// hooks, and finalize once the child exits.
208+
async fn run_with_profiler(
209+
profiler: &mut dyn Profiler,
210+
cmd_builder: CommandBuilder,
211+
config: &ExecutorConfig,
212+
profile_folder: &Path,
213+
) -> Result<std::process::ExitStatus> {
214+
let wrapped = profiler.wrap(cmd_builder, config, profile_folder).await?;
215+
let cmd = wrapped.build();
216+
debug!("cmd: {cmd:?}");
217+
218+
let mut runner_fifo = RunnerFifo::new()?;
219+
220+
run_command_with_log_pipe_and_callback(cmd, async move |mut child| {
221+
let on_cmd = async |c: &FifoCommand| match c {
222+
FifoCommand::StartBenchmark => {
223+
profiler.on_start_benchmark().await?;
224+
Ok(None)
225+
}
226+
FifoCommand::StopBenchmark => {
227+
profiler.on_stop_benchmark().await?;
228+
Ok(None)
229+
}
230+
#[allow(deprecated)]
231+
FifoCommand::PingProfiler => Ok(Some(if profiler.on_ping().await? {
232+
FifoCommand::Ack
233+
} else {
234+
FifoCommand::Err
235+
})),
236+
FifoCommand::GetIntegrationMode => Ok(Some(FifoCommand::IntegrationModeResponse(
237+
IntegrationMode::Walltime,
238+
))),
239+
_ => Ok(None),
240+
};
241+
242+
let (timestamps, fifo_data, exit_status) =
243+
runner_fifo.handle_fifo_messages(&mut child, on_cmd).await?;
244+
245+
profiler
246+
.finalize(fifo_data, timestamps, profile_folder)
247+
.await?;
248+
249+
Ok(exit_status)
250+
})
251+
.await
252+
}
253+
206254
#[cfg(test)]
207255
mod tests {
208256
use tempfile::NamedTempFile;

0 commit comments

Comments
 (0)