Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 71 additions & 63 deletions src/extensions/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rustls_connector::TlsStream as RustlsStream;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::ops::DerefMut;
use std::time::Duration;
use std::time::{Duration, Instant};

/// `Handle` allows a client to block waiting for changes to the remote mailbox.
///
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct Handle<'a, T: Read + Write> {
session: &'a mut Session<T>,
timeout: Duration,
keepalive: bool,
done: bool,
last_idle: Option<Instant>,
}

/// The result of a wait on a [`Handle`]
Expand Down Expand Up @@ -91,11 +91,13 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
session,
timeout: Duration::from_secs(29 * 60),
keepalive: true,
done: false,
last_idle: None,
}
}

fn init(&mut self) -> Result<()> {
self.last_idle = Some(Instant::now());

// https://tools.ietf.org/html/rfc2177
//
// The IDLE command takes no arguments.
Expand All @@ -108,39 +110,94 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
let mut v = Vec::new();
self.session.readline(&mut v)?;
if v.starts_with(b"+") {
self.done = false;
return Ok(());
}

self.last_idle = None;
self.session.read_response_onto(&mut v)?;
// We should *only* get a continuation on an error (i.e., it gives BAD or NO).
unreachable!();
}

fn terminate(&mut self) -> Result<()> {
if !self.done {
self.done = true;
if let Some(_) = self.last_idle.take() {
self.session.write_line(b"DONE")?;
self.session.read_response().map(|_| ())
} else {
Ok(())
}
}
}

/// Internal helper that doesn't consume self.
impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
/// Set the timeout duration on the connection. This will also set the frequency
/// at which the connection is refreshed.
///
/// This is necessary so that we can keep using the inner `Session` in `wait_while`.
fn wait_inner<F>(&mut self, reconnect: bool, mut callback: F) -> Result<WaitOutcome>
/// The interval defaults to 29 minutes as given in RFC 2177.
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
self.timeout = interval;
self
}

/// Do not continuously refresh the IDLE connection in the background.
///
/// By default, connections will periodically be refreshed in the background using the
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
/// this function and the connection will simply IDLE until `wait_while` returns or
/// the timeout expires.
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
self.keepalive = keepalive;
self
}

/// Block until the given callback returns `false`, or until a response
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
pub fn wait_while<F>(&mut self, mut callback: F) -> Result<WaitOutcome>
where
F: FnMut(UnsolicitedResponse) -> bool,
{
let mut v = Vec::new();
let result = loop {
match self.session.readline(&mut v) {
match {
// The server MAY consider a client inactive if it has an IDLE command
// running, and if such a server has an inactivity timeout it MAY log
// the client off implicitly at the end of its timeout period. Because
// of that, clients using IDLE are advised to terminate the IDLE and
// re-issue it at least every 29 minutes to avoid being logged off.
// This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals.
self.last_idle
.map_or_else(
// If there's no self.last_idle, initialize the connection (and return a 0 time since idle).
|| self.init().map(|()| Duration::ZERO),
|last_idle| Ok(last_idle.elapsed()),
)
// If no error occurred, read from the stream.
.map(|time_since_idle| {
if self.timeout <= time_since_idle {
return Err(Error::Io(io::ErrorKind::TimedOut.into()));
}
self.session
.stream
.get_mut()
.set_read_timeout(Some(self.timeout - time_since_idle))
.expect("cannot be Some(0) since that is guarded against");
self.session.readline(&mut v)
})
} {
Comment on lines 160 to +187
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing to this flavor? The nesting is too deep.

        let result = loop {
            // The server MAY consider a client inactive if it has an IDLE command
            // running, and if such a server has an inactivity timeout it MAY log
            // the client off implicitly at the end of its timeout period.  Because
            // of that, clients using IDLE are advised to terminate the IDLE and
            // re-issue it at least every 29 minutes to avoid being logged off.
            // This still allows a client to receive immediate mailbox updates even
            // though it need only "poll" at half hour intervals.

            // If there's no self.last_idle, initialize the connection.
            if self.last_idle.is_none() {
                self.init()?;
            }

            let time_since_idle = last_idle.elapsed();
            if self.timeout <= time_since_idle {
                return Err(Error::Io(io::ErrorKind::TimedOut.into()));
            }

            self.session
                .stream
                .get_mut()
                .set_read_timeout(Some(self.timeout - time_since_idle))
                .expect("cannot be Some(0) since that is guarded against");

            // If no error occurred, read from the stream.
            match self.session.readline(&mut v) {

Err(Error::Io(ref e))
if e.kind() == io::ErrorKind::TimedOut
|| e.kind() == io::ErrorKind::WouldBlock =>
{
if self.keepalive {
match self.terminate() {
Ok(()) => {
// The connection gets initialized again on the next iteration.
continue;
}
Err(e) => break Err(e),
}
}
break Ok(WaitOutcome::TimedOut);
}
Ok(_len) => {
Expand Down Expand Up @@ -183,60 +240,11 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
};
};

// Reconnect on timeout if needed
match (reconnect, result) {
(true, Ok(WaitOutcome::TimedOut)) => {
self.terminate()?;
self.init()?;
self.wait_inner(reconnect, callback)
}
(_, result) => result,
}
}
}

impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
/// Set the timeout duration on the connection. This will also set the frequency
/// at which the connection is refreshed.
///
/// The interval defaults to 29 minutes as given in RFC 2177.
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
self.timeout = interval;
self
}

/// Do not continuously refresh the IDLE connection in the background.
///
/// By default, connections will periodically be refreshed in the background using the
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
/// this function and the connection will simply IDLE until `wait_while` returns or
/// the timeout expires.
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
self.keepalive = keepalive;
self
}
// set_read_timeout() can fail if the argument is Some(0), which can never be the
// case here.
self.session.stream.get_mut().set_read_timeout(None).unwrap();
Comment on lines +243 to +245
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to restore the original read_timeout ? When building a Client from custom stream, its timeout may have been customized.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason why it wouldn't be possible.
The previous code overwrote the timeout without attempting to restore the original value as well, so that is a separate change that I'm not planning to implement.


/// Block until the given callback returns `false`, or until a response
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
pub fn wait_while<F>(&mut self, callback: F) -> Result<WaitOutcome>
where
F: FnMut(UnsolicitedResponse) -> bool,
{
self.init()?;
// The server MAY consider a client inactive if it has an IDLE command
// running, and if such a server has an inactivity timeout it MAY log
// the client off implicitly at the end of its timeout period. Because
// of that, clients using IDLE are advised to terminate the IDLE and
// re-issue it at least every 29 minutes to avoid being logged off.
// This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals.
self.session
.stream
.get_mut()
.set_read_timeout(Some(self.timeout))?;
let res = self.wait_inner(self.keepalive, callback);
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
res
result
}
}

Expand Down