Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eventsource-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ base64 = "0.22.1"
[dev-dependencies]
env_logger = "0.10.0"
maplit = "1.0.1"
mockito = "1.7.1"
simplelog = "0.12.1"
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] }
test-case = "3.2.1"
Expand Down
117 changes: 108 additions & 9 deletions eventsource-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
fmt::{self, Debug, Formatter},
future::Future,
io::ErrorKind,
pin::Pin,
pin::{pin, Pin},
str::FromStr,
task::{Context, Poll},
time::{Duration, Instant},
Expand Down Expand Up @@ -450,7 +450,7 @@ where

let state = this.state.project();
match state {
StateProj::StreamClosed => return Poll::Ready(Some(Err(Error::StreamClosed))),
StateProj::StreamClosed => return Poll::Ready(None),
Comment thread
flejz marked this conversation as resolved.
// New immediately transitions to Connecting, and exists only
// to ensure that we only connect when polled.
StateProj::New => {
Expand Down Expand Up @@ -517,30 +517,49 @@ where
}
}

self.as_mut().reset_redirects();
self.as_mut().project().state.set(State::New);

return Poll::Ready(Some(Err(Error::UnexpectedResponse(
let error = Error::UnexpectedResponse(
Comment thread
flejz marked this conversation as resolved.
Response::new(resp.status(), resp.headers().clone()),
ErrorBody::new(resp.into_body()),
))));
);

if !*retry {
self.as_mut().project().state.set(State::StreamClosed);
Comment thread
flejz marked this conversation as resolved.
return Poll::Ready(Some(Err(error)));
}

self.as_mut().reset_redirects();

let duration = self
.as_mut()
.project()
.retry_strategy
.next_delay(Instant::now());

self.as_mut()
.project()
.state
.set(State::WaitingToReconnect(delay(duration, "retrying")));

return Poll::Ready(Some(Err(error)));
}
Err(e) => {
// This happens when the server is unreachable, e.g. connection refused.
warn!("request returned an error: {}", e);
if !*retry {
self.as_mut().project().state.set(State::New);
self.as_mut().project().state.set(State::StreamClosed);
return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e)))));
}

let duration = self
.as_mut()
.project()
.retry_strategy
.next_delay(Instant::now());

self.as_mut()
.project()
.state
.set(State::WaitingToReconnect(delay(duration, "retrying")))
.set(State::WaitingToReconnect(delay(duration, "retrying")));
}
},
StateProj::FollowingRedirect(maybe_header) => match uri_from_header(maybe_header) {
Expand Down Expand Up @@ -665,4 +684,84 @@ mod tests {

assert_eq!(Some(&expected), actual);
}

use std::{pin::pin, str::FromStr, time::Duration};

use futures::TryStreamExt;
use hyper::{client::HttpConnector, Body, HeaderMap, Request, Uri};
use hyper_timeout::TimeoutConnector;
use tokio::time::timeout;

use crate::{
client::{RequestProps, State},
ReconnectOptionsBuilder, ReconnectingRequest,
};

const INVALID_URI: &'static str = "http://mycrazyunexsistenturl.invaliddomainext";

#[test_case(INVALID_URI, false, |state| matches!(state, State::StreamClosed))]
#[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))]
#[tokio::test]
async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) {
let default_timeout = Some(Duration::from_secs(1));
let conn = HttpConnector::new();
let mut connector = TimeoutConnector::new(conn);
connector.set_connect_timeout(default_timeout);
connector.set_read_timeout(default_timeout);
connector.set_write_timeout(default_timeout);

let reconnect_opts = ReconnectOptionsBuilder::new(false)
.backoff_factor(1)
.delay(Duration::from_secs(1))
.retry_initial(retry_initial)
.build();

let http = hyper::Client::builder().build::<_, hyper::Body>(connector);
let req_props = RequestProps {
url: Uri::from_str(uri).unwrap(),
headers: HeaderMap::new(),
method: "GET".to_string(),
body: None,
reconnect_opts,
max_redirects: 10,
};

let mut reconnecting_request = ReconnectingRequest::new(http, req_props, None);

// sets initial state
let resp = reconnecting_request.http.request(
Request::builder()
.method("GET")
.uri(uri)
.body(Body::empty())
.unwrap(),
);

reconnecting_request.state = State::Connecting {
retry: reconnecting_request.props.reconnect_opts.retry_initial,
resp,
};

let mut reconnecting_request = pin!(reconnecting_request);

timeout(Duration::from_millis(500), reconnecting_request.try_next())
.await
.ok();

expected(&reconnecting_request.state);
}

#[test_case(false, |state| matches!(state, State::StreamClosed))]
#[test_case(true, |state| matches!(state, State::WaitingToReconnect(_)))]
#[tokio::test]
async fn initial_connection_mocked_server(retry_initial: bool, expected: fn(&State) -> bool) {
let mut mock_server = mockito::Server::new_async().await;
let _mock = mock_server
.mock("GET", "/")
.with_status(404)
.create_async()
.await;

initial_connection(&mock_server.url(), retry_initial, expected).await;
}
}
Loading