Skip to content

Commit e46458e

Browse files
committed
wip: try make process async + parallel callback
1 parent b647a3f commit e46458e

7 files changed

Lines changed: 67 additions & 56 deletions

File tree

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ serde = { version = "1.0.192", features = ["derive"] }
2929
serde_json = { version = "1.0.108", features = ["preserve_order"] }
3030
url = "2.4.1"
3131
sha256 = "1.4.0"
32-
tokio = { version = "1", features = ["macros", "rt"] }
32+
tokio = { version = "1", features = ["macros", "rt","process"] }
3333
tokio-tar = "0.3.1"
3434
md5 = "0.7.0"
3535
base64 = "0.21.0"

src/run/instruments/mongo_tracer.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ use std::{
22
env,
33
io::Read,
44
path::{Path, PathBuf},
5-
process::{Child, Command, Stdio},
5+
process::{Child, Stdio},
66
str::FromStr,
7-
thread,
87
};
8+
use tokio::process::Command;
99

1010
use reqwest::Client;
1111
use tokio::fs;
@@ -150,21 +150,22 @@ impl MongoTracer {
150150
} else {
151151
info!("No MongoDB URI provided, user will have to provide it dynamically through the CodSpeed integration");
152152
}
153-
let mut process = command
153+
let process = command
154154
.stdout(Stdio::piped())
155155
.stderr(Stdio::piped())
156156
.spawn()?;
157157

158-
let process_stdout = process.stdout.take().expect("error taking child stdout");
159-
let process_stderr = process.stderr.take().expect("error taking child stderr");
160-
thread::spawn(move || {
161-
dump_tracer_log(process_stdout).expect("error communicating with child stdout")
162-
});
163-
thread::spawn(move || {
164-
dump_tracer_log(process_stderr).expect("error communicating with child stderr")
165-
});
158+
// let process_stdout = process.stdout.take().expect("error taking child stdout");
159+
// let process_stderr = process.stderr.take().expect("error taking child stderr");
160+
// FIXME: Add this again
161+
// thread::spawn(move || {
162+
// dump_tracer_log(process_stdout).expect("error communicating with child stdout")
163+
// });
164+
// thread::spawn(move || {
165+
// dump_tracer_log(process_stderr).expect("error communicating with child stderr")
166+
// });
166167

167-
self.process = Some(process);
168+
// self.process = Some(process);
168169

169170
Ok(())
170171
}
@@ -243,7 +244,7 @@ pub async fn install_mongodb_tracer() -> Result<()> {
243244
let output = Command::new("bash")
244245
.arg(installer_path.to_str().unwrap())
245246
.stdout(Stdio::piped())
246-
.output()
247+
.output().await
247248
.map_err(|_| anyhow!("Failed to install mongo-tracer"))?;
248249

249250
if !output.status.success() {

src/run/runner/helpers/run_command_with_log_pipe.rs

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
use crate::local_logger::suspend_progress_bar;
21
use crate::prelude::*;
3-
use crate::run::runner::EXECUTOR_TARGET;
42
use std::future::Future;
5-
use std::io::{Read, Write};
6-
use std::process::Command;
73
use std::process::ExitStatus;
8-
use std::thread;
4+
use tokio::process::Command;
95

106
/// Run a command and log its output to stdout and stderr
117
///
@@ -25,49 +21,52 @@ where
2521
F: FnOnce(u32) -> Fut,
2622
Fut: Future<Output = anyhow::Result<()>>,
2723
{
28-
fn log_tee(
29-
mut reader: impl Read,
30-
mut writer: impl Write,
31-
log_prefix: Option<&str>,
32-
) -> Result<()> {
33-
let prefix = log_prefix.unwrap_or("");
34-
let mut buffer = [0; 1024];
35-
loop {
36-
let bytes_read = reader.read(&mut buffer)?;
37-
if bytes_read == 0 {
38-
break;
39-
}
40-
suspend_progress_bar(|| {
41-
writer.write_all(&buffer[..bytes_read]).unwrap();
42-
trace!(
43-
target: EXECUTOR_TARGET,
44-
"{}{}",
45-
prefix,
46-
String::from_utf8_lossy(&buffer[..bytes_read])
47-
);
48-
});
49-
}
50-
Ok(())
51-
}
24+
// fn log_tee(
25+
// mut reader: impl Read,
26+
// mut writer: impl Write,
27+
// log_prefix: Option<&str>,
28+
// ) -> Result<()> {
29+
// let prefix = log_prefix.unwrap_or("");
30+
// let mut buffer = [0; 1024];
31+
// loop {
32+
// let bytes_read = reader.read(&mut buffer)?;
33+
// if bytes_read == 0 {
34+
// break;
35+
// }
36+
// suspend_progress_bar(|| {
37+
// writer.write_all(&buffer[..bytes_read]).unwrap();
38+
// trace!(
39+
// target: EXECUTOR_TARGET,
40+
// "{}{}",
41+
// prefix,
42+
// String::from_utf8_lossy(&buffer[..bytes_read])
43+
// );
44+
// });
45+
// }
46+
// Ok(())
47+
// }
5248

5349
let mut process = cmd
5450
.stdout(std::process::Stdio::piped())
5551
.stderr(std::process::Stdio::piped())
5652
.spawn()
5753
.context("failed to spawn the process")?;
58-
let stdout = process.stdout.take().expect("unable to get stdout");
59-
let stderr = process.stderr.take().expect("unable to get stderr");
60-
thread::spawn(move || {
61-
log_tee(stdout, std::io::stdout(), None).unwrap();
62-
});
54+
// let stdout = process.stdout.take().expect("unable to get stdout");
55+
// let stderr = process.stderr.take().expect("unable to get stderr");
56+
// thread::spawn(move || {
57+
// log_tee(stdout, std::io::stdout(), None).unwrap();
58+
// });
6359

64-
thread::spawn(move || {
65-
log_tee(stderr, std::io::stderr(), Some("[stderr]")).unwrap();
66-
});
60+
// thread::spawn(move || {
61+
// log_tee(stderr, std::io::stderr(), Some("[stderr]")).unwrap();
62+
// });
6763

68-
cb(process.id()).await?;
64+
let (_, exit_status) = tokio::join!(
65+
cb(process.id().unwrap()),
66+
process.wait()
67+
);
6968

70-
process.wait().context("failed to wait for the process")
69+
exit_status.context("failed to wait for the process")
7170
}
7271

7372
pub async fn run_command_with_log_pipe(cmd: Command) -> Result<ExitStatus> {

src/run/runner/valgrind/measure.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use std::io::Write;
1313
use std::os::unix::fs::PermissionsExt;
1414
use std::os::unix::process::ExitStatusExt;
1515
use std::path::Path;
16-
use std::{env::consts::ARCH, process::Command};
16+
use std::{env::consts::ARCH};
17+
use tokio::process::Command;
1718
use tempfile::TempPath;
1819

1920
lazy_static! {

src/run/runner/wall_time/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::run::runner::{ExecutorName, RunData, RunnerMode};
99
use crate::run::{check_system::SystemInfo, config::Config};
1010
use async_trait::async_trait;
1111
use std::fs::canonicalize;
12-
use std::process::Command;
12+
use tokio::process::Command;
1313

1414
pub struct WallTimeExecutor {
1515
perf: Option<PerfRunner>,

src/run/runner/wall_time/perf/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use perf_map::ProcessSymbols;
1313
use shared::Command as FifoCommand;
1414
use std::collections::HashSet;
1515
use std::path::PathBuf;
16-
use std::process::Command;
16+
use tokio::process::Command;
1717
use std::time::Duration;
1818
use std::{cell::OnceCell, collections::HashMap, process::ExitStatus};
1919
use tempfile::TempDir;

0 commit comments

Comments
 (0)