Skip to content

Commit 3ce0ea1

Browse files
committed
feat: add fifo ipc
Wee need the multi-threaded runtime for reading/writing to the async FIFOs in a new task. Using a single-threaded runtime is limited and sometimes deadlocks the current thread. The fix is to spawn a new thread and create a new single-threaded runtime there, which is arguably more complicated than enabling the multi-threaded runtime.
1 parent 388622f commit 3ce0ea1

4 files changed

Lines changed: 157 additions & 2 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ serde = { version = "1.0.192", features = ["derive"] }
2929
serde_json = { version = "1.0.108", features = ["preserve_order"] }
3030
url = "2.4.1"
3131
sha256 = "1.4.0"
32-
tokio = { version = "1", features = ["macros", "rt"] }
32+
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread"] }
3333
tokio-tar = "0.3.1"
3434
md5 = "0.7.0"
3535
base64 = "0.21.0"

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ lazy_static! {
2727
format!("{VALGRIND_CODSPEED_BASE_VERSION}-0codspeed1");
2828
}
2929

30-
#[tokio::main(flavor = "current_thread")]
30+
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
3131
async fn main() {
3232
let res = crate::app::run().await;
3333
if let Err(err) = res {
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use super::{FifoCommand, RUNNER_ACK_FIFO, RUNNER_CTL_FIFO};
2+
use std::path::PathBuf;
3+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
4+
use tokio::net::unix::pipe::OpenOptions as TokioPipeOpenOptions;
5+
use tokio::net::unix::pipe::Receiver as TokioPipeReader;
6+
use tokio::net::unix::pipe::Sender as TokioPipeSender;
7+
8+
fn create_fifo<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<()> {
9+
// Remove the previous FIFO (if it exists)
10+
let _ = nix::unistd::unlink(path.as_ref());
11+
12+
// Create the FIFO with RWX permissions for the owner
13+
nix::unistd::mkfifo(path.as_ref(), nix::sys::stat::Mode::S_IRWXU)?;
14+
15+
Ok(())
16+
}
17+
18+
pub struct RunnerFifo {
19+
ack_fifo: TokioPipeSender,
20+
ctl_fifo: TokioPipeReader,
21+
}
22+
23+
impl RunnerFifo {
24+
pub fn new() -> anyhow::Result<Self> {
25+
create_fifo(RUNNER_CTL_FIFO)?;
26+
create_fifo(RUNNER_ACK_FIFO)?;
27+
28+
let ack_fifo = TokioPipeOpenOptions::new()
29+
.read_write(true)
30+
.open_sender(RUNNER_ACK_FIFO)?;
31+
let ctl_fifo = TokioPipeOpenOptions::new()
32+
.read_write(true)
33+
.open_receiver(RUNNER_CTL_FIFO)?;
34+
35+
Ok(Self { ctl_fifo, ack_fifo })
36+
}
37+
38+
pub async fn recv_cmd(&mut self) -> anyhow::Result<FifoCommand> {
39+
let mut len_buffer = [0u8; 4];
40+
self.ctl_fifo.read_exact(&mut len_buffer).await?;
41+
let message_len = u32::from_le_bytes(len_buffer) as usize;
42+
43+
let mut buffer = vec![0u8; message_len];
44+
loop {
45+
if self.ctl_fifo.read_exact(&mut buffer).await.is_ok() {
46+
break;
47+
}
48+
}
49+
50+
let decoded = bincode::deserialize(&buffer)?;
51+
Ok(decoded)
52+
}
53+
54+
pub async fn send_cmd(&mut self, cmd: FifoCommand) -> anyhow::Result<()> {
55+
let encoded = bincode::serialize(&cmd)?;
56+
57+
self.ack_fifo
58+
.write_all(&(encoded.len() as u32).to_le_bytes())
59+
.await?;
60+
self.ack_fifo.write_all(&encoded).await?;
61+
Ok(())
62+
}
63+
}
64+
65+
pub struct PerfFifo {
66+
ctl_fifo: TokioPipeSender,
67+
ack_fifo: TokioPipeReader,
68+
69+
pub(crate) ctl_fifo_path: PathBuf,
70+
pub(crate) ack_fifo_path: PathBuf,
71+
}
72+
73+
impl PerfFifo {
74+
pub fn new() -> anyhow::Result<Self> {
75+
let fifo_dir = tempfile::tempdir()?.into_path();
76+
77+
let ctl_fifo_path = fifo_dir.join("codspeed_perf.ctl.fifo");
78+
let ack_fifo_path = fifo_dir.join("codspeed_perf.ack.fifo");
79+
80+
create_fifo(&ctl_fifo_path)?;
81+
create_fifo(&ack_fifo_path)?;
82+
83+
let ack_fifo = TokioPipeOpenOptions::new()
84+
.read_write(true)
85+
.open_receiver(&ack_fifo_path)?;
86+
let ctl_fifo = TokioPipeOpenOptions::new()
87+
.read_write(true)
88+
.open_sender(&ctl_fifo_path)?;
89+
90+
Ok(Self {
91+
ctl_fifo,
92+
ack_fifo,
93+
ctl_fifo_path,
94+
ack_fifo_path,
95+
})
96+
}
97+
98+
pub async fn start_events(&mut self) -> anyhow::Result<()> {
99+
self.ctl_fifo.write_all(b"enable\n").await?;
100+
self.wait_for_ack().await;
101+
102+
Ok(())
103+
}
104+
105+
pub async fn stop_events(&mut self) -> anyhow::Result<()> {
106+
self.ctl_fifo.write_all(b"disable\n").await?;
107+
self.wait_for_ack().await;
108+
109+
Ok(())
110+
}
111+
112+
pub async fn ping(&mut self) -> anyhow::Result<()> {
113+
self.ctl_fifo.write_all(b"ping\n").await?;
114+
self.wait_for_ack().await;
115+
116+
Ok(())
117+
}
118+
119+
async fn wait_for_ack(&mut self) {
120+
const ACK: &[u8] = b"ack\n\x00";
121+
122+
loop {
123+
let mut buf: [u8; ACK.len()] = [0; ACK.len()];
124+
if self.ack_fifo.read_exact(&mut buf).await.is_err() {
125+
continue;
126+
}
127+
128+
if buf == ACK {
129+
break;
130+
}
131+
}
132+
}
133+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// !!!!!!!!!!!!!!!!!!!!!!!!
2+
// !! DO NOT TOUCH BELOW !!
3+
// !!!!!!!!!!!!!!!!!!!!!!!!
4+
// Has to be in sync with `codspeed-rust/codspeed`.
5+
//
6+
pub const RUNNER_CTL_FIFO: &str = "/tmp/runner.ctl.fifo";
7+
pub const RUNNER_ACK_FIFO: &str = "/tmp/runner.ack.fifo";
8+
9+
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
10+
pub enum Command {
11+
CurrentBenchmark { pid: u32, uri: String },
12+
StartBenchmark,
13+
StopBenchmark,
14+
Ack,
15+
PingPerf,
16+
SetIntegration { name: String, version: String },
17+
Err,
18+
}
19+
//
20+
// !!!!!!!!!!!!!!!!!!!!!!!!
21+
// !! DO NOT TOUCH ABOVE !!
22+
// !!!!!!!!!!!!!!!!!!!!!!!!

0 commit comments

Comments
 (0)