@@ -13,7 +13,7 @@ use rustls_connector::TlsStream as RustlsStream;
1313use std:: io:: { self , Read , Write } ;
1414use std:: net:: TcpStream ;
1515use std:: ops:: DerefMut ;
16- use std:: time:: Duration ;
16+ use std:: time:: { Duration , Instant } ;
1717
1818/// `Handle` allows a client to block waiting for changes to the remote mailbox.
1919///
@@ -58,7 +58,7 @@ pub struct Handle<'a, T: Read + Write> {
5858 session : & ' a mut Session < T > ,
5959 timeout : Duration ,
6060 keepalive : bool ,
61- done : bool ,
61+ last_idle : Option < Instant > ,
6262}
6363
6464/// The result of a wait on a [`Handle`]
@@ -91,11 +91,14 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
9191 session,
9292 timeout : Duration :: from_secs ( 29 * 60 ) ,
9393 keepalive : true ,
94- done : false ,
94+ last_idle : None ,
9595 }
9696 }
9797
9898 fn init ( & mut self ) -> Result < ( ) > {
99+ let last_idle = Instant :: now ( ) ;
100+ self . last_idle = Some ( last_idle) ;
101+
99102 // https://tools.ietf.org/html/rfc2177
100103 //
101104 // The IDLE command takes no arguments.
@@ -108,39 +111,97 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
108111 let mut v = Vec :: new ( ) ;
109112 self . session . readline ( & mut v) ?;
110113 if v. starts_with ( b"+" ) {
111- self . done = false ;
112114 return Ok ( ( ) ) ;
113115 }
114116
117+ self . last_idle = None ;
115118 self . session . read_response_onto ( & mut v) ?;
116119 // We should *only* get a continuation on an error (i.e., it gives BAD or NO).
117120 unreachable ! ( ) ;
118121 }
119122
120123 fn terminate ( & mut self ) -> Result < ( ) > {
121- if !self . done {
122- self . done = true ;
124+ if let Some ( _) = self . last_idle . take ( ) {
123125 self . session . write_line ( b"DONE" ) ?;
124126 self . session . read_response ( ) . map ( |_| ( ) )
125127 } else {
126128 Ok ( ( ) )
127129 }
128130 }
131+ }
129132
130- /// Internal helper that doesn't consume self.
133+ impl < ' a , T : SetReadTimeout + Read + Write + ' a > Handle < ' a , T > {
134+ /// Set the timeout duration on the connection. This will also set the frequency
135+ /// at which the connection is refreshed.
131136 ///
132- /// This is necessary so that we can keep using the inner `Session` in `wait_while`.
133- fn wait_inner < F > ( & mut self , reconnect : bool , mut callback : F ) -> Result < WaitOutcome >
137+ /// The interval defaults to 29 minutes as given in RFC 2177.
138+ pub fn timeout ( & mut self , interval : Duration ) -> & mut Self {
139+ self . timeout = interval;
140+ self
141+ }
142+
143+ /// Do not continuously refresh the IDLE connection in the background.
144+ ///
145+ /// By default, connections will periodically be refreshed in the background using the
146+ /// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
147+ /// this function and the connection will simply IDLE until `wait_while` returns or
148+ /// the timeout expires.
149+ pub fn keepalive ( & mut self , keepalive : bool ) -> & mut Self {
150+ self . keepalive = keepalive;
151+ self
152+ }
153+
154+ /// Block until the given callback returns `false`, or until a response
155+ /// arrives that is not explicitly handled by [`UnsolicitedResponse`].
156+ pub fn wait_while < F > ( & mut self , mut callback : F ) -> Result < WaitOutcome >
134157 where
135158 F : FnMut ( UnsolicitedResponse ) -> bool ,
136159 {
137160 let mut v = Vec :: new ( ) ;
138161 let result = loop {
139- match self . session . readline ( & mut v) {
162+ match {
163+ // The server MAY consider a client inactive if it has an IDLE command
164+ // running, and if such a server has an inactivity timeout it MAY log
165+ // the client off implicitly at the end of its timeout period. Because
166+ // of that, clients using IDLE are advised to terminate the IDLE and
167+ // re-issue it at least every 29 minutes to avoid being logged off.
168+ // This still allows a client to receive immediate mailbox updates even
169+ // though it need only "poll" at half hour intervals.
170+ self . last_idle
171+ // Check if the time since last_idle has exceeded the timeout.
172+ . map ( |t| {
173+ let time_since_idle = t. elapsed ( ) ;
174+ if time_since_idle >= self . timeout {
175+ Err ( Error :: Io ( io:: ErrorKind :: TimedOut . into ( ) ) )
176+ } else {
177+ Ok ( time_since_idle)
178+ }
179+ } )
180+ // If there's no self.last_idle, initialize the connection (and return a 0 time since idle).
181+ . unwrap_or_else ( || self . init ( ) . map ( |( ) | Duration :: ZERO ) )
182+ // Finally, if no error occurred, read from the stream.
183+ . map ( |time_since_idle| {
184+ self . session
185+ . stream
186+ . get_mut ( )
187+ . set_read_timeout ( Some ( self . timeout - time_since_idle) )
188+ . expect ( "cannot be Some(0) since time is monotonically increasing" ) ;
189+ self . session . readline ( & mut v)
190+ } )
191+ } {
140192 Err ( Error :: Io ( ref e) )
141193 if e. kind ( ) == io:: ErrorKind :: TimedOut
142194 || e. kind ( ) == io:: ErrorKind :: WouldBlock =>
143195 {
196+ if self . keepalive {
197+ match self . terminate ( ) {
198+ Ok ( ( ) ) => {
199+ // The connection gets initialized again on the next iteration.
200+ continue ,
201+ }
202+ Err ( e) => break Err ( e) ,
203+ }
204+ }
144205 break Ok ( WaitOutcome :: TimedOut ) ;
145206 }
146207 Ok ( _len) => {
@@ -183,60 +244,11 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
183244 } ;
184245 } ;
185246
186- // Reconnect on timeout if needed
187- match ( reconnect, result) {
188- ( true , Ok ( WaitOutcome :: TimedOut ) ) => {
189- self . terminate ( ) ?;
190- self . init ( ) ?;
191- self . wait_inner ( reconnect, callback)
192- }
193- ( _, result) => result,
194- }
195- }
196- }
197-
198- impl < ' a , T : SetReadTimeout + Read + Write + ' a > Handle < ' a , T > {
199- /// Set the timeout duration on the connection. This will also set the frequency
200- /// at which the connection is refreshed.
201- ///
202- /// The interval defaults to 29 minutes as given in RFC 2177.
203- pub fn timeout ( & mut self , interval : Duration ) -> & mut Self {
204- self . timeout = interval;
205- self
206- }
207-
208- /// Do not continuously refresh the IDLE connection in the background.
209- ///
210- /// By default, connections will periodically be refreshed in the background using the
211- /// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
212- /// this function and the connection will simply IDLE until `wait_while` returns or
213- /// the timeout expires.
214- pub fn keepalive ( & mut self , keepalive : bool ) -> & mut Self {
215- self . keepalive = keepalive;
216- self
217- }
247+ // set_read_timeout() can fail if the argument is Some(0), which can never be the
248+ // case here.
249+ self . session . stream . get_mut ( ) . set_read_timeout ( None ) . unwrap ( ) ;
218250
219- /// Block until the given callback returns `false`, or until a response
220- /// arrives that is not explicitly handled by [`UnsolicitedResponse`].
221- pub fn wait_while < F > ( & mut self , callback : F ) -> Result < WaitOutcome >
222- where
223- F : FnMut ( UnsolicitedResponse ) -> bool ,
224- {
225- self . init ( ) ?;
226- // The server MAY consider a client inactive if it has an IDLE command
227- // running, and if such a server has an inactivity timeout it MAY log
228- // the client off implicitly at the end of its timeout period. Because
229- // of that, clients using IDLE are advised to terminate the IDLE and
230- // re-issue it at least every 29 minutes to avoid being logged off.
231- // This still allows a client to receive immediate mailbox updates even
232- // though it need only "poll" at half hour intervals.
233- self . session
234- . stream
235- . get_mut ( )
236- . set_read_timeout ( Some ( self . timeout ) ) ?;
237- let res = self . wait_inner ( self . keepalive , callback) ;
238- let _ = self . session . stream . get_mut ( ) . set_read_timeout ( None ) . is_ok ( ) ;
239- res
251+ result
240252 }
241253}
242254
0 commit comments