diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 18c2241..043425d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,13 +1,13 @@ name: Run CI on: push: - branches: [ main ] + branches: [main] paths-ignore: - - '**.md' # Do not need to run CI for markdown changes. + - "**.md" # Do not need to run CI for markdown changes. pull_request: - branches: [ main ] + branches: [main] paths-ignore: - - '**.md' + - "**.md" jobs: ci-build: @@ -20,7 +20,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.82 + rustup override set 1.83 rustup component add rustfmt clippy - uses: ./.github/actions/ci diff --git a/.github/workflows/manual-publish.yml b/.github/workflows/manual-publish.yml index e884301..9f350b0 100644 --- a/.github/workflows/manual-publish.yml +++ b/.github/workflows/manual-publish.yml @@ -19,7 +19,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.82 + rustup override set 1.83 rustup component add rustfmt clippy - uses: ./.github/actions/ci diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 603a2f2..ffaf013 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -24,7 +24,7 @@ jobs: - name: Setup rust tooling if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }} run: | - rustup override set 1.82 + rustup override set 1.83 rustup component add rustfmt clippy - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.2.0 diff --git a/contract-tests/src/bin/sse-test-api/stream_entity.rs b/contract-tests/src/bin/sse-test-api/stream_entity.rs index 7e6361a..a46e189 100644 --- a/contract-tests/src/bin/sse-test-api/stream_entity.rs +++ b/contract-tests/src/bin/sse-test-api/stream_entity.rs @@ -44,7 +44,7 @@ impl Inner { break; } } - Ok(None) => continue, + Ok(None) => break, Err(e) => { let failure = EventType::Error { error: format!("Error: {:?}", e), @@ -53,11 +53,6 @@ impl Inner { if !self.send_message(failure, &client).await { break; } - - match e { - es::Error::StreamClosed => break, - _ => continue, - } } }; } diff --git a/eventsource-client/Cargo.toml b/eventsource-client/Cargo.toml index b33d647..49bedba 100644 --- a/eventsource-client/Cargo.toml +++ b/eventsource-client/Cargo.toml @@ -5,7 +5,7 @@ description = "Client for the Server-Sent Events protocol (aka EventSource)" repository = "https://github.com/launchdarkly/rust-eventsource-client" authors = ["LaunchDarkly"] edition = "2021" -rust-version = "1.82.0" +rust-version = "1.83.0" license = "Apache-2.0" keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", "server-sent-events"] exclude = ["CHANGELOG.md"] @@ -24,6 +24,7 @@ base64 = "0.22.1" [dev-dependencies] env_logger = "0.10.0" maplit = "1.0.1" +mockito = "1.7.1" simplelog = "0.12.1" tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] } test-case = "3.2.1" diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 03827d2..565f66f 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -18,7 +18,7 @@ use std::{ fmt::{self, Debug, Formatter}, future::Future, io::ErrorKind, - pin::Pin, + pin::{pin, Pin}, str::FromStr, task::{Context, Poll}, time::{Duration, Instant}, @@ -450,7 +450,7 @@ where let state = this.state.project(); match state { - StateProj::StreamClosed => return Poll::Ready(Some(Err(Error::StreamClosed))), + StateProj::StreamClosed => return Poll::Ready(None), // New immediately transitions to Connecting, and exists only // to ensure that we only connect when polled. StateProj::New => { @@ -517,30 +517,49 @@ where } } - self.as_mut().reset_redirects(); - self.as_mut().project().state.set(State::New); - - return Poll::Ready(Some(Err(Error::UnexpectedResponse( + let error = Error::UnexpectedResponse( Response::new(resp.status(), resp.headers().clone()), ErrorBody::new(resp.into_body()), - )))); + ); + + if !*retry { + self.as_mut().project().state.set(State::StreamClosed); + return Poll::Ready(Some(Err(error))); + } + + self.as_mut().reset_redirects(); + + let duration = self + .as_mut() + .project() + .retry_strategy + .next_delay(Instant::now()); + + self.as_mut() + .project() + .state + .set(State::WaitingToReconnect(delay(duration, "retrying"))); + + return Poll::Ready(Some(Err(error))); } Err(e) => { // This happens when the server is unreachable, e.g. connection refused. warn!("request returned an error: {}", e); if !*retry { - self.as_mut().project().state.set(State::New); + self.as_mut().project().state.set(State::StreamClosed); return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e))))); } + let duration = self .as_mut() .project() .retry_strategy .next_delay(Instant::now()); + self.as_mut() .project() .state - .set(State::WaitingToReconnect(delay(duration, "retrying"))) + .set(State::WaitingToReconnect(delay(duration, "retrying"))); } }, StateProj::FollowingRedirect(maybe_header) => match uri_from_header(maybe_header) { @@ -665,4 +684,84 @@ mod tests { assert_eq!(Some(&expected), actual); } + + use std::{pin::pin, str::FromStr, time::Duration}; + + use futures::TryStreamExt; + use hyper::{client::HttpConnector, Body, HeaderMap, Request, Uri}; + use hyper_timeout::TimeoutConnector; + use tokio::time::timeout; + + use crate::{ + client::{RequestProps, State}, + ReconnectOptionsBuilder, ReconnectingRequest, + }; + + const INVALID_URI: &'static str = "http://mycrazyunexsistenturl.invaliddomainext"; + + #[test_case(INVALID_URI, false, |state| matches!(state, State::StreamClosed))] + #[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))] + #[tokio::test] + async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) { + let default_timeout = Some(Duration::from_secs(1)); + let conn = HttpConnector::new(); + let mut connector = TimeoutConnector::new(conn); + connector.set_connect_timeout(default_timeout); + connector.set_read_timeout(default_timeout); + connector.set_write_timeout(default_timeout); + + let reconnect_opts = ReconnectOptionsBuilder::new(false) + .backoff_factor(1) + .delay(Duration::from_secs(1)) + .retry_initial(retry_initial) + .build(); + + let http = hyper::Client::builder().build::<_, hyper::Body>(connector); + let req_props = RequestProps { + url: Uri::from_str(uri).unwrap(), + headers: HeaderMap::new(), + method: "GET".to_string(), + body: None, + reconnect_opts, + max_redirects: 10, + }; + + let mut reconnecting_request = ReconnectingRequest::new(http, req_props, None); + + // sets initial state + let resp = reconnecting_request.http.request( + Request::builder() + .method("GET") + .uri(uri) + .body(Body::empty()) + .unwrap(), + ); + + reconnecting_request.state = State::Connecting { + retry: reconnecting_request.props.reconnect_opts.retry_initial, + resp, + }; + + let mut reconnecting_request = pin!(reconnecting_request); + + timeout(Duration::from_millis(500), reconnecting_request.try_next()) + .await + .ok(); + + assert!(expected(&reconnecting_request.state)); + } + + #[test_case(false, |state| matches!(state, State::StreamClosed))] + #[test_case(true, |state| matches!(state, State::WaitingToReconnect(_)))] + #[tokio::test] + async fn initial_connection_mocked_server(retry_initial: bool, expected: fn(&State) -> bool) { + let mut mock_server = mockito::Server::new_async().await; + let _mock = mock_server + .mock("GET", "/") + .with_status(404) + .create_async() + .await; + + initial_connection(&mock_server.url(), retry_initial, expected).await; + } } diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index f84f891..b4d60f5 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -30,7 +30,6 @@ impl std::error::Error for HeaderError { #[derive(Debug)] pub enum Error { TimedOut, - StreamClosed, /// An invalid request parameter InvalidParameter(Box), /// The HTTP response could not be handled. @@ -56,7 +55,6 @@ impl std::fmt::Display for Error { use Error::*; match self { TimedOut => write!(f, "timed out"), - StreamClosed => write!(f, "stream closed"), InvalidParameter(err) => write!(f, "invalid parameter: {err}"), UnexpectedResponse(r, _) => { let status = r.status();