Skip to content

Commit 84358c6

Browse files
committed
feat(codspeed): add fifo ipc method
1 parent 3cd8045 commit 84358c6

4 files changed

Lines changed: 241 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 37 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/codspeed/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ categories = [
1818
keywords = ["codspeed", "benchmark"]
1919

2020
[dependencies]
21+
bincode = "1.3.3"
2122
colored = "2.0.0"
2223
libc = "^0.2"
24+
nix = { version = "0.29.0", features = ["fs"] }
2325
serde = { workspace = true }
2426
serde_json = { workspace = true }
2527
uuid = { version = "1.12.1", features = ["v4"] }

crates/codspeed/src/fifo.rs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
use nix::libc::O_NONBLOCK;
2+
use nix::sys::stat;
3+
use nix::unistd::{self, unlink};
4+
use serde::{Deserialize, Serialize};
5+
use std::fs::{File, OpenOptions};
6+
use std::io::{Read, Write};
7+
use std::os::unix::fs::OpenOptionsExt;
8+
use std::path::PathBuf;
9+
10+
pub const RUNNER_CTL_FIFO: &str = "/tmp/runner.ctl.fifo";
11+
pub const RUNNER_ACK_FIFO: &str = "/tmp/runner.ack.fifo";
12+
13+
pub struct PerfGuard {
14+
ctl_fifo: FifoIpc,
15+
ack_fifo: FifoIpc,
16+
}
17+
18+
impl PerfGuard {
19+
pub fn new(ctl_fifo: &str, ack_fifo: &str) -> Option<Self> {
20+
let mut instance = Self {
21+
ctl_fifo: FifoIpc::connect(ctl_fifo)?.with_writer().ok()?,
22+
ack_fifo: FifoIpc::connect(ack_fifo)?.with_reader().ok()?,
23+
};
24+
instance.send_cmd(Command::StartBenchmark)?;
25+
Some(instance)
26+
}
27+
28+
fn send_cmd(&mut self, cmd: Command) -> Option<()> {
29+
self.ctl_fifo.send_cmd(cmd)?;
30+
self.ack_fifo.wait_for_ack()?;
31+
32+
Some(())
33+
}
34+
}
35+
36+
impl Drop for PerfGuard {
37+
fn drop(&mut self) {
38+
self.send_cmd(Command::StopBenchmark);
39+
}
40+
}
41+
42+
pub struct FifoIpc {
43+
path: PathBuf,
44+
reader: Option<File>,
45+
writer: Option<File>,
46+
}
47+
48+
impl FifoIpc {
49+
pub fn connect<P: Into<PathBuf>>(path: P) -> Option<Self> {
50+
let path = path.into();
51+
52+
if !path.exists() {
53+
return None;
54+
}
55+
56+
Some(Self {
57+
path,
58+
reader: None,
59+
writer: None,
60+
})
61+
}
62+
63+
pub fn create(path: &str) -> Option<Self> {
64+
// Remove the previous FIFO (if it exists)
65+
let _ = unlink(path);
66+
67+
// Create the FIFO with RWX permissions for the owner
68+
unistd::mkfifo(path, stat::Mode::S_IRWXU).unwrap();
69+
70+
Self::connect(path)
71+
}
72+
73+
pub fn with_reader(mut self) -> std::io::Result<Self> {
74+
self.reader = Some(
75+
OpenOptions::new()
76+
.write(true)
77+
.read(true)
78+
.custom_flags(O_NONBLOCK)
79+
.open(&self.path)?,
80+
);
81+
Ok(self)
82+
}
83+
84+
/// WARNING: Writer must be opened _AFTER_ the reader.
85+
pub fn with_writer(mut self) -> std::io::Result<Self> {
86+
self.writer = Some(
87+
OpenOptions::new()
88+
.write(true)
89+
.custom_flags(O_NONBLOCK)
90+
.open(&self.path)?,
91+
);
92+
Ok(self)
93+
}
94+
95+
pub fn recv_cmd(&mut self) -> Option<Command> {
96+
// First read the length (u32 = 4 bytes)
97+
let mut len_buffer = [0u8; 4];
98+
self.read_exact(&mut len_buffer).ok()?;
99+
let message_len = u32::from_le_bytes(len_buffer) as usize;
100+
101+
// Try to read the message
102+
let mut buffer = vec![0u8; message_len];
103+
loop {
104+
if self.read_exact(&mut buffer).is_ok() {
105+
break;
106+
}
107+
}
108+
109+
let decoded = bincode::deserialize(&buffer).ok()?;
110+
Some(decoded)
111+
}
112+
113+
pub fn send_cmd(&mut self, cmd: Command) -> Option<()> {
114+
let encoded = bincode::serialize(&cmd).ok()?;
115+
self.write_all(&(encoded.len() as u32).to_le_bytes()).ok()?;
116+
self.write_all(&encoded).ok()?;
117+
Some(())
118+
}
119+
120+
pub fn wait_for_ack(&mut self) -> Option<()> {
121+
// Wait for ACK command
122+
loop {
123+
if let Some(Command::Ack) = self.recv_cmd() {
124+
break;
125+
}
126+
}
127+
128+
Some(())
129+
}
130+
}
131+
132+
impl std::io::Write for FifoIpc {
133+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
134+
if let Some(writer) = self.writer.as_mut() {
135+
writer.write(buf)
136+
} else {
137+
Err(std::io::Error::new(
138+
std::io::ErrorKind::NotConnected,
139+
"Writer not initialized",
140+
))
141+
}
142+
}
143+
144+
fn flush(&mut self) -> std::io::Result<()> {
145+
Ok(())
146+
}
147+
}
148+
149+
impl std::io::Read for FifoIpc {
150+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
151+
if let Some(reader) = self.reader.as_mut() {
152+
reader.read(buf)
153+
} else {
154+
Err(std::io::Error::new(
155+
std::io::ErrorKind::NotConnected,
156+
"Reader not initialized",
157+
))
158+
}
159+
}
160+
}
161+
162+
#[derive(Serialize, Deserialize, Debug, PartialEq)]
163+
pub enum Command {
164+
StartBenchmark,
165+
StopBenchmark,
166+
Ack,
167+
}
168+
169+
#[cfg(test)]
170+
mod tests {
171+
use super::*;
172+
173+
#[test]
174+
fn test_ipc_write_read() {
175+
let mut fifo = FifoIpc::create("/tmp/test1.fifo")
176+
.unwrap()
177+
.with_reader()
178+
.unwrap()
179+
.with_writer()
180+
.unwrap();
181+
182+
fifo.write_all(b"Hello").unwrap();
183+
let mut buffer = [0; 5];
184+
fifo.read_exact(&mut buffer).unwrap();
185+
assert_eq!(&buffer, b"Hello");
186+
}
187+
188+
#[test]
189+
fn test_ipc_send_recv_cmd() {
190+
let mut fifo = FifoIpc::create("/tmp/test2.fifo")
191+
.unwrap()
192+
.with_reader()
193+
.unwrap()
194+
.with_writer()
195+
.unwrap();
196+
197+
fifo.send_cmd(Command::StartBenchmark).unwrap();
198+
let cmd = fifo.recv_cmd().unwrap();
199+
assert_eq!(cmd, Command::StartBenchmark);
200+
}
201+
}

crates/codspeed/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod codspeed;
2+
pub mod fifo;
23
mod macros;
34
mod measurement;
45
mod request;

0 commit comments

Comments
 (0)