@@ -6,11 +6,11 @@ use runner_shared::artifacts::ExecutionTimestamps;
66use runner_shared:: fifo:: { Command as FifoCommand , MarkerType } ;
77use runner_shared:: fifo:: { RUNNER_ACK_FIFO , RUNNER_CTL_FIFO } ;
88use std:: cmp:: Ordering ;
9+ use std:: os:: unix:: fs:: OpenOptionsExt ;
910use std:: path:: { Path , PathBuf } ;
1011use std:: { collections:: HashSet , time:: Duration } ;
1112use tokio:: io:: AsyncWriteExt ;
1213use tokio:: net:: unix:: pid_t;
13- use tokio:: net:: unix:: pipe:: OpenOptions as TokioPipeOpenOptions ;
1414use tokio:: net:: unix:: pipe:: Receiver as TokioPipeReader ;
1515use tokio:: net:: unix:: pipe:: Sender as TokioPipeSender ;
1616use tokio:: time:: error:: Elapsed ;
@@ -38,8 +38,8 @@ impl GenericFifo {
3838 create_fifo ( ctl_fifo) ?;
3939 create_fifo ( ack_fifo) ?;
4040
41- let ctl_sender = get_pipe_open_options ( ) . open_sender ( ctl_fifo) ?;
42- let ack_reader = get_pipe_open_options ( ) . open_receiver ( ack_fifo) ?;
41+ let ctl_sender = open_fifo_sender ( ctl_fifo) ?;
42+ let ack_reader = open_fifo_receiver ( ack_fifo) ?;
4343
4444 Ok ( Self {
4545 ctl_path : ctl_fifo. to_path_buf ( ) ,
@@ -85,12 +85,27 @@ pub struct RunnerFifo {
8585 ctl_reader : FramedRead < TokioPipeReader , LengthDelimitedCodec > ,
8686}
8787
88- fn get_pipe_open_options ( ) -> TokioPipeOpenOptions {
89- #[ cfg_attr( not( target_os = "linux" ) , allow( unused_mut) ) ]
90- let mut options = TokioPipeOpenOptions :: new ( ) ;
91- #[ cfg( target_os = "linux" ) ]
92- options. read_write ( true ) ;
93- options
88+ /// Open a FIFO in O_RDWR | O_NONBLOCK mode.
89+ ///
90+ /// Tokio's `OpenOptions::read_write(true)` is Linux-only, but the underlying O_RDWR
91+ /// trick works on every Unix: opening a FIFO read-write avoids the deadlock where
92+ /// `open(O_WRONLY)` blocks (or returns ENXIO under O_NONBLOCK) until a reader is
93+ /// connected, and vice versa. Since we open both ends before the peer process
94+ /// (the integration) is even spawned, we need this on macOS too.
95+ fn open_fifo_rdwr ( path : & Path ) -> anyhow:: Result < std:: fs:: File > {
96+ Ok ( std:: fs:: OpenOptions :: new ( )
97+ . read ( true )
98+ . write ( true )
99+ . custom_flags ( libc:: O_NONBLOCK )
100+ . open ( path) ?)
101+ }
102+
103+ fn open_fifo_sender ( path : & Path ) -> anyhow:: Result < TokioPipeSender > {
104+ Ok ( TokioPipeSender :: from_file ( open_fifo_rdwr ( path) ?) ?)
105+ }
106+
107+ fn open_fifo_receiver ( path : & Path ) -> anyhow:: Result < TokioPipeReader > {
108+ Ok ( TokioPipeReader :: from_file ( open_fifo_rdwr ( path) ?) ?)
94109}
95110
96111impl RunnerFifo {
@@ -102,8 +117,8 @@ impl RunnerFifo {
102117 create_fifo ( ctl_path) ?;
103118 create_fifo ( ack_path) ?;
104119
105- let ack_fifo = get_pipe_open_options ( ) . open_sender ( ack_path) ?;
106- let ctl_fifo = get_pipe_open_options ( ) . open_receiver ( ctl_path) ?;
120+ let ack_fifo = open_fifo_sender ( ack_path) ?;
121+ let ctl_fifo = open_fifo_receiver ( ctl_path) ?;
107122
108123 let codec = LengthDelimitedCodec :: builder ( )
109124 . length_field_length ( 4 )
@@ -288,7 +303,7 @@ mod tests {
288303 let ack_path = temp_dir. path ( ) . join ( "ack_fifo" ) ;
289304
290305 let mut fifo = RunnerFifo :: open ( & ctl_path, & ack_path) . unwrap ( ) ;
291- let mut writer = get_pipe_open_options ( ) . open_sender ( & ctl_path) . unwrap ( ) ;
306+ let mut writer = open_fifo_sender ( & ctl_path) . unwrap ( ) ;
292307
293308 let cmd = FifoCommand :: Ack ;
294309 let payload = bincode:: serialize ( & cmd) . unwrap ( ) ;
0 commit comments