From f2cf2c5d97fa7d3285a7c32cfe740fe7e891b90d Mon Sep 17 00:00:00 2001 From: flejz Date: Sat, 13 Dec 2025 10:58:44 +0100 Subject: [PATCH 1/7] feat: transition to an error state when stream initial connection fails withouth retry --- eventsource-client/Cargo.toml | 1 + eventsource-client/src/client.rs | 122 +++++++++++++++++++++++++++++-- eventsource-client/src/error.rs | 3 + 3 files changed, 121 insertions(+), 5 deletions(-) diff --git a/eventsource-client/Cargo.toml b/eventsource-client/Cargo.toml index b33d647..344f5a4 100644 --- a/eventsource-client/Cargo.toml +++ b/eventsource-client/Cargo.toml @@ -24,6 +24,7 @@ base64 = "0.22.1" [dev-dependencies] env_logger = "0.10.0" maplit = "1.0.1" +mockito = "1.7.1" simplelog = "0.12.1" tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] } test-case = "3.2.1" diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 03827d2..1be24dc 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -18,7 +18,7 @@ use std::{ fmt::{self, Debug, Formatter}, future::Future, io::ErrorKind, - pin::Pin, + pin::{pin, Pin}, str::FromStr, task::{Context, Poll}, time::{Duration, Instant}, @@ -300,6 +300,7 @@ enum State { resp: ResponseFuture, }, Connected(#[pin] hyper::Body), + InitializationError(Error), WaitingToReconnect(#[pin] Sleep), FollowingRedirect(Option), StreamClosed, @@ -312,6 +313,10 @@ impl State { State::Connecting { retry: false, .. } => "connecting(no-retry)", State::Connecting { retry: true, .. } => "connecting(retry)", State::Connected(_) => "connected", + State::InitializationError(Error::StreamInitializationError) => { + "initializatoin-error(stream)" + } + State::InitializationError(_) => "initializatoin-error", State::WaitingToReconnect(_) => "waiting-to-reconnect", State::FollowingRedirect(_) => "following-redirect", State::StreamClosed => "closed", @@ -517,8 +522,26 @@ where } } + if !*retry { + self.as_mut() + .project() + .state + .set(State::InitializationError(Error::StreamInitializationError)); + return Poll::Ready(Some(Err(Error::StreamInitializationError))); + } + self.as_mut().reset_redirects(); - self.as_mut().project().state.set(State::New); + + let duration = self + .as_mut() + .project() + .retry_strategy + .next_delay(Instant::now()); + + self.as_mut() + .project() + .state + .set(State::WaitingToReconnect(delay(duration, "retrying"))); return Poll::Ready(Some(Err(Error::UnexpectedResponse( Response::new(resp.status(), resp.headers().clone()), @@ -529,18 +552,23 @@ where // This happens when the server is unreachable, e.g. connection refused. warn!("request returned an error: {}", e); if !*retry { - self.as_mut().project().state.set(State::New); - return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e))))); + self.as_mut() + .project() + .state + .set(State::InitializationError(Error::StreamInitializationError)); + return Poll::Ready(Some(Err(Error::StreamInitializationError))); } + let duration = self .as_mut() .project() .retry_strategy .next_delay(Instant::now()); + self.as_mut() .project() .state - .set(State::WaitingToReconnect(delay(duration, "retrying"))) + .set(State::WaitingToReconnect(delay(duration, "retrying"))); } }, StateProj::FollowingRedirect(maybe_header) => match uri_from_header(maybe_header) { @@ -603,6 +631,10 @@ where info!("Reconnecting"); self.as_mut().project().state.set(State::New); } + StateProj::InitializationError(_) => { + ready!(pin!(delay(Duration::from_secs(60), "errored")).poll(cx)); + return Poll::Ready(Some(Err(Error::StreamInitializationError))); + } }; } } @@ -665,4 +697,84 @@ mod tests { assert_eq!(Some(&expected), actual); } + + use std::{pin::pin, str::FromStr, time::Duration}; + + use futures::TryStreamExt; + use hyper::{client::HttpConnector, Body, HeaderMap, Request, Uri}; + use hyper_timeout::TimeoutConnector; + use tokio::time::timeout; + + use crate::{ + client::{RequestProps, State}, + Error, ReconnectOptionsBuilder, ReconnectingRequest, + }; + + const INVALID_URI: &'static str = "http://mycrazyunexsistenturl.invaliddomainext"; + + #[test_case(INVALID_URI, false, |state| matches!(state, State::InitializationError(Error::StreamInitializationError)))] + #[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))] + #[tokio::test] + async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) { + let default_timeout = Some(Duration::from_secs(1)); + let conn = HttpConnector::new(); + let mut connector = TimeoutConnector::new(conn); + connector.set_connect_timeout(default_timeout); + connector.set_read_timeout(default_timeout); + connector.set_write_timeout(default_timeout); + + let reconnect_opts = ReconnectOptionsBuilder::new(false) + .backoff_factor(1) + .delay(Duration::from_secs(1)) + .retry_initial(retry_initial) + .build(); + + let http = hyper::Client::builder().build::<_, hyper::Body>(connector); + let req_props = RequestProps { + url: Uri::from_str(uri).unwrap(), + headers: HeaderMap::new(), + method: "GET".to_string(), + body: None, + reconnect_opts, + max_redirects: 10, + }; + + let mut reconnecting_request = ReconnectingRequest::new(http, req_props, None); + + // sets initial state + let resp = reconnecting_request.http.request( + Request::builder() + .method("GET") + .uri(uri) + .body(Body::empty()) + .unwrap(), + ); + + reconnecting_request.state = State::Connecting { + retry: reconnecting_request.props.reconnect_opts.retry_initial, + resp, + }; + + let mut reconnecting_request = pin!(reconnecting_request); + + timeout(Duration::from_millis(500), reconnecting_request.try_next()) + .await + .ok(); + + expected(&reconnecting_request.state); + } + + #[test_case(false, |state| matches!(state, State::InitializationError(Error::StreamInitializationError)))] + #[test_case(true, |state| matches!(state, State::WaitingToReconnect(_)))] + #[tokio::test] + async fn initial_connection_mocked_server(retry_initial: bool, expected: fn(&State) -> bool) { + let mut mock_server = mockito::Server::new_async().await; + let _mock = mock_server + .mock("GET", "/") + .with_status(404) + .create_async() + .await; + + initial_connection(&mock_server.url(), retry_initial, expected).await; + } } diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index f84f891..d72dc67 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -31,6 +31,8 @@ impl std::error::Error for HeaderError { pub enum Error { TimedOut, StreamClosed, + /// Error during stream intialization + StreamInitializationError, /// An invalid request parameter InvalidParameter(Box), /// The HTTP response could not be handled. @@ -57,6 +59,7 @@ impl std::fmt::Display for Error { match self { TimedOut => write!(f, "timed out"), StreamClosed => write!(f, "stream closed"), + StreamInitializationError => write!(f, "stream initialization error"), InvalidParameter(err) => write!(f, "invalid parameter: {err}"), UnexpectedResponse(r, _) => { let status = r.status(); From 87232c0ff32b2e6720d0e94d3e1008a3d5e5efcc Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 15 Dec 2025 13:12:55 -0500 Subject: [PATCH 2/7] Remove initialization error state --- eventsource-client/src/client.rs | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 1be24dc..5d7e5e9 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -300,7 +300,6 @@ enum State { resp: ResponseFuture, }, Connected(#[pin] hyper::Body), - InitializationError(Error), WaitingToReconnect(#[pin] Sleep), FollowingRedirect(Option), StreamClosed, @@ -313,10 +312,6 @@ impl State { State::Connecting { retry: false, .. } => "connecting(no-retry)", State::Connecting { retry: true, .. } => "connecting(retry)", State::Connected(_) => "connected", - State::InitializationError(Error::StreamInitializationError) => { - "initializatoin-error(stream)" - } - State::InitializationError(_) => "initializatoin-error", State::WaitingToReconnect(_) => "waiting-to-reconnect", State::FollowingRedirect(_) => "following-redirect", State::StreamClosed => "closed", @@ -455,7 +450,7 @@ where let state = this.state.project(); match state { - StateProj::StreamClosed => return Poll::Ready(Some(Err(Error::StreamClosed))), + StateProj::StreamClosed => return Poll::Ready(None), // New immediately transitions to Connecting, and exists only // to ensure that we only connect when polled. StateProj::New => { @@ -523,10 +518,7 @@ where } if !*retry { - self.as_mut() - .project() - .state - .set(State::InitializationError(Error::StreamInitializationError)); + self.as_mut().project().state.set(State::StreamClosed); return Poll::Ready(Some(Err(Error::StreamInitializationError))); } @@ -552,10 +544,7 @@ where // This happens when the server is unreachable, e.g. connection refused. warn!("request returned an error: {}", e); if !*retry { - self.as_mut() - .project() - .state - .set(State::InitializationError(Error::StreamInitializationError)); + self.as_mut().project().state.set(State::StreamClosed); return Poll::Ready(Some(Err(Error::StreamInitializationError))); } @@ -631,10 +620,6 @@ where info!("Reconnecting"); self.as_mut().project().state.set(State::New); } - StateProj::InitializationError(_) => { - ready!(pin!(delay(Duration::from_secs(60), "errored")).poll(cx)); - return Poll::Ready(Some(Err(Error::StreamInitializationError))); - } }; } } @@ -707,12 +692,12 @@ mod tests { use crate::{ client::{RequestProps, State}, - Error, ReconnectOptionsBuilder, ReconnectingRequest, + ReconnectOptionsBuilder, ReconnectingRequest, }; const INVALID_URI: &'static str = "http://mycrazyunexsistenturl.invaliddomainext"; - #[test_case(INVALID_URI, false, |state| matches!(state, State::InitializationError(Error::StreamInitializationError)))] + #[test_case(INVALID_URI, false, |state| matches!(state, State::StreamClosed))] #[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))] #[tokio::test] async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) { @@ -764,7 +749,7 @@ mod tests { expected(&reconnecting_request.state); } - #[test_case(false, |state| matches!(state, State::InitializationError(Error::StreamInitializationError)))] + #[test_case(false, |state| matches!(state, State::StreamClosed))] #[test_case(true, |state| matches!(state, State::WaitingToReconnect(_)))] #[tokio::test] async fn initial_connection_mocked_server(retry_initial: bool, expected: fn(&State) -> bool) { From b1cd0b6c8d5a19f36b01aff26e60487f5c8e23f6 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 15 Dec 2025 14:26:43 -0500 Subject: [PATCH 3/7] Remove stream initialization error --- eventsource-client/src/client.rs | 14 ++++++++------ eventsource-client/src/error.rs | 3 --- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 5d7e5e9..b53ad60 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -517,9 +517,14 @@ where } } + let error = Error::UnexpectedResponse( + Response::new(resp.status(), resp.headers().clone()), + ErrorBody::new(resp.into_body()), + ); + if !*retry { self.as_mut().project().state.set(State::StreamClosed); - return Poll::Ready(Some(Err(Error::StreamInitializationError))); + return Poll::Ready(Some(Err(error))); } self.as_mut().reset_redirects(); @@ -535,17 +540,14 @@ where .state .set(State::WaitingToReconnect(delay(duration, "retrying"))); - return Poll::Ready(Some(Err(Error::UnexpectedResponse( - Response::new(resp.status(), resp.headers().clone()), - ErrorBody::new(resp.into_body()), - )))); + return Poll::Ready(Some(Err(error))); } Err(e) => { // This happens when the server is unreachable, e.g. connection refused. warn!("request returned an error: {}", e); if !*retry { self.as_mut().project().state.set(State::StreamClosed); - return Poll::Ready(Some(Err(Error::StreamInitializationError))); + return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e))))); } let duration = self diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index d72dc67..f84f891 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -31,8 +31,6 @@ impl std::error::Error for HeaderError { pub enum Error { TimedOut, StreamClosed, - /// Error during stream intialization - StreamInitializationError, /// An invalid request parameter InvalidParameter(Box), /// The HTTP response could not be handled. @@ -59,7 +57,6 @@ impl std::fmt::Display for Error { match self { TimedOut => write!(f, "timed out"), StreamClosed => write!(f, "stream closed"), - StreamInitializationError => write!(f, "stream initialization error"), InvalidParameter(err) => write!(f, "invalid parameter: {err}"), UnexpectedResponse(r, _) => { let status = r.status(); From 338648b65f04e4092680703423f35913c8fb7f65 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 15 Dec 2025 14:47:18 -0500 Subject: [PATCH 4/7] Bump to msrv to 1.83.0 --- .github/workflows/ci.yml | 10 +++++----- .github/workflows/manual-publish.yml | 2 +- .github/workflows/release-please.yml | 2 +- eventsource-client/Cargo.toml | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 18c2241..043425d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,13 +1,13 @@ name: Run CI on: push: - branches: [ main ] + branches: [main] paths-ignore: - - '**.md' # Do not need to run CI for markdown changes. + - "**.md" # Do not need to run CI for markdown changes. pull_request: - branches: [ main ] + branches: [main] paths-ignore: - - '**.md' + - "**.md" jobs: ci-build: @@ -20,7 +20,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.82 + rustup override set 1.83 rustup component add rustfmt clippy - uses: ./.github/actions/ci diff --git a/.github/workflows/manual-publish.yml b/.github/workflows/manual-publish.yml index e884301..9f350b0 100644 --- a/.github/workflows/manual-publish.yml +++ b/.github/workflows/manual-publish.yml @@ -19,7 +19,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.82 + rustup override set 1.83 rustup component add rustfmt clippy - uses: ./.github/actions/ci diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 603a2f2..ffaf013 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -24,7 +24,7 @@ jobs: - name: Setup rust tooling if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }} run: | - rustup override set 1.82 + rustup override set 1.83 rustup component add rustfmt clippy - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.2.0 diff --git a/eventsource-client/Cargo.toml b/eventsource-client/Cargo.toml index 344f5a4..49bedba 100644 --- a/eventsource-client/Cargo.toml +++ b/eventsource-client/Cargo.toml @@ -5,7 +5,7 @@ description = "Client for the Server-Sent Events protocol (aka EventSource)" repository = "https://github.com/launchdarkly/rust-eventsource-client" authors = ["LaunchDarkly"] edition = "2021" -rust-version = "1.82.0" +rust-version = "1.83.0" license = "Apache-2.0" keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", "server-sent-events"] exclude = ["CHANGELOG.md"] From 75353b7bc049fb55a74c0e681dfe4709391bb25b Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 16 Dec 2025 13:10:28 -0500 Subject: [PATCH 5/7] Fix contract-test usage --- contract-tests/src/bin/sse-test-api/stream_entity.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contract-tests/src/bin/sse-test-api/stream_entity.rs b/contract-tests/src/bin/sse-test-api/stream_entity.rs index 7e6361a..7ba9aa4 100644 --- a/contract-tests/src/bin/sse-test-api/stream_entity.rs +++ b/contract-tests/src/bin/sse-test-api/stream_entity.rs @@ -44,7 +44,7 @@ impl Inner { break; } } - Ok(None) => continue, + Ok(None) => break, Err(e) => { let failure = EventType::Error { error: format!("Error: {:?}", e), From f2d95f05cbdce7291a29dbf2328f7e1652487b5f Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 16 Dec 2025 14:39:12 -0500 Subject: [PATCH 6/7] Remove stream closed error --- contract-tests/src/bin/sse-test-api/stream_entity.rs | 5 ----- eventsource-client/src/error.rs | 2 -- 2 files changed, 7 deletions(-) diff --git a/contract-tests/src/bin/sse-test-api/stream_entity.rs b/contract-tests/src/bin/sse-test-api/stream_entity.rs index 7ba9aa4..a46e189 100644 --- a/contract-tests/src/bin/sse-test-api/stream_entity.rs +++ b/contract-tests/src/bin/sse-test-api/stream_entity.rs @@ -53,11 +53,6 @@ impl Inner { if !self.send_message(failure, &client).await { break; } - - match e { - es::Error::StreamClosed => break, - _ => continue, - } } }; } diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index f84f891..b4d60f5 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -30,7 +30,6 @@ impl std::error::Error for HeaderError { #[derive(Debug)] pub enum Error { TimedOut, - StreamClosed, /// An invalid request parameter InvalidParameter(Box), /// The HTTP response could not be handled. @@ -56,7 +55,6 @@ impl std::fmt::Display for Error { use Error::*; match self { TimedOut => write!(f, "timed out"), - StreamClosed => write!(f, "stream closed"), InvalidParameter(err) => write!(f, "invalid parameter: {err}"), UnexpectedResponse(r, _) => { let status = r.status(); From 9c59184b9f3335f2b10d2bcf4165d390792d7b0c Mon Sep 17 00:00:00 2001 From: flejz Date: Wed, 17 Dec 2025 14:33:10 +0100 Subject: [PATCH 7/7] test: assert reconncetion tests --- eventsource-client/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index b53ad60..565f66f 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -748,7 +748,7 @@ mod tests { .await .ok(); - expected(&reconnecting_request.state); + assert!(expected(&reconnecting_request.state)); } #[test_case(false, |state| matches!(state, State::StreamClosed))]