1+ use anyhow:: bail;
12use nix:: libc:: O_NONBLOCK ;
23use nix:: sys:: stat;
34use nix:: unistd:: { self , unlink} ;
45use serde:: { Deserialize , Serialize } ;
56use std:: fs:: { File , OpenOptions } ;
67use std:: io:: { Read , Write } ;
78use std:: os:: unix:: fs:: OpenOptionsExt ;
8- use std:: path:: PathBuf ;
9+ use std:: path:: { Path , PathBuf } ;
910
1011pub const RUNNER_CTL_FIFO : & str = "/tmp/runner.ctl.fifo" ;
1112pub const RUNNER_ACK_FIFO : & str = "/tmp/runner.ack.fifo" ;
@@ -16,26 +17,27 @@ pub struct PerfGuard {
1617}
1718
1819impl PerfGuard {
19- pub fn new ( ctl_fifo : & str , ack_fifo : & str ) -> Option < Self > {
20+ pub fn new ( ctl_fifo : & str , ack_fifo : & str ) -> anyhow :: Result < Self > {
2021 let mut instance = Self {
21- ctl_fifo : FifoIpc :: connect ( ctl_fifo) ?. with_writer ( ) . ok ( ) ?,
22- ack_fifo : FifoIpc :: connect ( ack_fifo) ?. with_reader ( ) . ok ( ) ?,
22+ ctl_fifo : FifoIpc :: connect ( ctl_fifo) ?. with_writer ( ) ?,
23+ ack_fifo : FifoIpc :: connect ( ack_fifo) ?. with_reader ( ) ?,
2324 } ;
2425 instance. send_cmd ( Command :: StartBenchmark ) ?;
25- Some ( instance)
26+ Ok ( instance)
2627 }
2728
28- fn send_cmd ( & mut self , cmd : Command ) -> Option < ( ) > {
29+ fn send_cmd ( & mut self , cmd : Command ) -> anyhow :: Result < ( ) > {
2930 self . ctl_fifo . send_cmd ( cmd) ?;
30- self . ack_fifo . wait_for_ack ( ) ? ;
31+ self . ack_fifo . wait_for_ack ( ) ;
3132
32- Some ( ( ) )
33+ Ok ( ( ) )
3334 }
3435}
3536
3637impl Drop for PerfGuard {
3738 fn drop ( & mut self ) {
38- self . send_cmd ( Command :: StopBenchmark ) ;
39+ self . send_cmd ( Command :: StopBenchmark )
40+ . expect ( "Failed to send stop command" ) ;
3941 }
4042}
4143
@@ -46,31 +48,47 @@ pub struct FifoIpc {
4648}
4749
4850impl FifoIpc {
49- pub fn connect < P : Into < PathBuf > > ( path : P ) -> Option < Self > {
51+ /// Creates a new FIFO at the specified path and connects to it.
52+ ///
53+ /// ```rust
54+ /// use codspeed::fifo::{FifoIpc, Command};
55+ ///
56+ /// // Create the reader before the writer (required!):
57+ /// let mut read_fifo = FifoIpc::create("/tmp/doctest.fifo").unwrap().with_reader().unwrap();
58+ ///
59+ /// // Connect to the FIFO and send a command
60+ /// let mut fifo = FifoIpc::connect("/tmp/doctest.fifo").unwrap().with_writer().unwrap();
61+ /// fifo.send_cmd(Command::StartBenchmark).unwrap();
62+ ///
63+ /// // Receive the command in the reader
64+ /// let cmd = read_fifo.recv_cmd().unwrap();
65+ /// assert_eq!(cmd, Command::StartBenchmark);
66+ /// ```
67+ pub fn create < P : AsRef < Path > > ( path : P ) -> anyhow:: Result < Self > {
68+ // Remove the previous FIFO (if it exists)
69+ let _ = unlink ( path. as_ref ( ) ) ;
70+
71+ // Create the FIFO with RWX permissions for the owner
72+ unistd:: mkfifo ( path. as_ref ( ) , stat:: Mode :: S_IRWXU ) ?;
73+
74+ Self :: connect ( path. as_ref ( ) )
75+ }
76+
77+ pub fn connect < P : Into < PathBuf > > ( path : P ) -> anyhow:: Result < Self > {
5078 let path = path. into ( ) ;
5179
5280 if !path. exists ( ) {
53- return None ;
81+ bail ! ( "FIFO does not exist: {}" , path . display ( ) ) ;
5482 }
5583
56- Some ( Self {
84+ Ok ( Self {
5785 path,
5886 reader : None ,
5987 writer : None ,
6088 } )
6189 }
6290
63- pub fn create ( path : & str ) -> Option < Self > {
64- // Remove the previous FIFO (if it exists)
65- let _ = unlink ( path) ;
66-
67- // Create the FIFO with RWX permissions for the owner
68- unistd:: mkfifo ( path, stat:: Mode :: S_IRWXU ) . unwrap ( ) ;
69-
70- Self :: connect ( path)
71- }
72-
73- pub fn with_reader ( mut self ) -> std:: io:: Result < Self > {
91+ pub fn with_reader ( mut self ) -> anyhow:: Result < Self > {
7492 self . reader = Some (
7593 OpenOptions :: new ( )
7694 . write ( true )
@@ -82,7 +100,7 @@ impl FifoIpc {
82100 }
83101
84102 /// WARNING: Writer must be opened _AFTER_ the reader.
85- pub fn with_writer ( mut self ) -> std :: io :: Result < Self > {
103+ pub fn with_writer ( mut self ) -> anyhow :: Result < Self > {
86104 self . writer = Some (
87105 OpenOptions :: new ( )
88106 . write ( true )
@@ -92,10 +110,10 @@ impl FifoIpc {
92110 Ok ( self )
93111 }
94112
95- pub fn recv_cmd ( & mut self ) -> Option < Command > {
113+ pub fn recv_cmd ( & mut self ) -> anyhow :: Result < Command > {
96114 // First read the length (u32 = 4 bytes)
97115 let mut len_buffer = [ 0u8 ; 4 ] ;
98- self . read_exact ( & mut len_buffer) . ok ( ) ?;
116+ self . read_exact ( & mut len_buffer) ?;
99117 let message_len = u32:: from_le_bytes ( len_buffer) as usize ;
100118
101119 // Try to read the message
@@ -106,26 +124,23 @@ impl FifoIpc {
106124 }
107125 }
108126
109- let decoded = bincode:: deserialize ( & buffer) . ok ( ) ?;
110- Some ( decoded)
127+ let decoded = bincode:: deserialize ( & buffer) ?;
128+ Ok ( decoded)
111129 }
112130
113- pub fn send_cmd ( & mut self , cmd : Command ) -> Option < ( ) > {
114- let encoded = bincode:: serialize ( & cmd) . ok ( ) ?;
115- self . write_all ( & ( encoded. len ( ) as u32 ) . to_le_bytes ( ) ) . ok ( ) ?;
116- self . write_all ( & encoded) . ok ( ) ?;
117- Some ( ( ) )
131+ pub fn send_cmd ( & mut self , cmd : Command ) -> anyhow :: Result < ( ) > {
132+ let encoded = bincode:: serialize ( & cmd) ?;
133+ self . write_all ( & ( encoded. len ( ) as u32 ) . to_le_bytes ( ) ) ?;
134+ self . write_all ( & encoded) ?;
135+ Ok ( ( ) )
118136 }
119137
120- pub fn wait_for_ack ( & mut self ) -> Option < ( ) > {
121- // Wait for ACK command
138+ pub fn wait_for_ack ( & mut self ) {
122139 loop {
123- if let Some ( Command :: Ack ) = self . recv_cmd ( ) {
140+ if let Ok ( Command :: Ack ) = self . recv_cmd ( ) {
124141 break ;
125142 }
126143 }
127-
128- Some ( ( ) )
129144 }
130145}
131146
0 commit comments