Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
name: Run CI
on:
push:
branches: [ main ]
branches: [main]
paths-ignore:
- '**.md' # Do not need to run CI for markdown changes.
- "**.md" # Do not need to run CI for markdown changes.
pull_request:
branches: [ main ]
branches: [main]
paths-ignore:
- '**.md'
- "**.md"

jobs:
ci-build:
Expand All @@ -20,7 +20,7 @@ jobs:

- name: Setup rust tooling
run: |
rustup override set 1.82
rustup override set 1.83
rustup component add rustfmt clippy

- uses: ./.github/actions/ci
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/manual-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- name: Setup rust tooling
run: |
rustup override set 1.82
rustup override set 1.83
rustup component add rustfmt clippy

- uses: ./.github/actions/ci
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-please.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Setup rust tooling
if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }}
run: |
rustup override set 1.82
rustup override set 1.83
rustup component add rustfmt clippy

- uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.2.0
Expand Down
7 changes: 1 addition & 6 deletions contract-tests/src/bin/sse-test-api/stream_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Inner {
break;
}
}
Ok(None) => continue,
Ok(None) => break,
Err(e) => {
let failure = EventType::Error {
error: format!("Error: {:?}", e),
Expand All @@ -53,11 +53,6 @@ impl Inner {
if !self.send_message(failure, &client).await {
break;
}

match e {
es::Error::StreamClosed => break,
_ => continue,
}
}
};
}
Expand Down
3 changes: 2 additions & 1 deletion eventsource-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Client for the Server-Sent Events protocol (aka EventSource)"
repository = "https://github.com/launchdarkly/rust-eventsource-client"
authors = ["LaunchDarkly"]
edition = "2021"
rust-version = "1.82.0"
rust-version = "1.83.0"
license = "Apache-2.0"
keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", "server-sent-events"]
exclude = ["CHANGELOG.md"]
Expand All @@ -24,6 +24,7 @@ base64 = "0.22.1"
[dev-dependencies]
env_logger = "0.10.0"
maplit = "1.0.1"
mockito = "1.7.1"
simplelog = "0.12.1"
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] }
test-case = "3.2.1"
Expand Down
117 changes: 108 additions & 9 deletions eventsource-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
fmt::{self, Debug, Formatter},
future::Future,
io::ErrorKind,
pin::Pin,
pin::{pin, Pin},
str::FromStr,
task::{Context, Poll},
time::{Duration, Instant},
Expand Down Expand Up @@ -450,7 +450,7 @@ where

let state = this.state.project();
match state {
StateProj::StreamClosed => return Poll::Ready(Some(Err(Error::StreamClosed))),
StateProj::StreamClosed => return Poll::Ready(None),
Comment thread
flejz marked this conversation as resolved.
// New immediately transitions to Connecting, and exists only
// to ensure that we only connect when polled.
StateProj::New => {
Expand Down Expand Up @@ -517,30 +517,49 @@ where
}
}

self.as_mut().reset_redirects();
self.as_mut().project().state.set(State::New);

return Poll::Ready(Some(Err(Error::UnexpectedResponse(
let error = Error::UnexpectedResponse(
Comment thread
flejz marked this conversation as resolved.
Response::new(resp.status(), resp.headers().clone()),
ErrorBody::new(resp.into_body()),
))));
);

if !*retry {
self.as_mut().project().state.set(State::StreamClosed);
Comment thread
flejz marked this conversation as resolved.
return Poll::Ready(Some(Err(error)));
}

self.as_mut().reset_redirects();

let duration = self
.as_mut()
.project()
.retry_strategy
.next_delay(Instant::now());

self.as_mut()
.project()
.state
.set(State::WaitingToReconnect(delay(duration, "retrying")));

return Poll::Ready(Some(Err(error)));
}
Err(e) => {
// This happens when the server is unreachable, e.g. connection refused.
warn!("request returned an error: {}", e);
if !*retry {
self.as_mut().project().state.set(State::New);
self.as_mut().project().state.set(State::StreamClosed);
return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e)))));
}

let duration = self
.as_mut()
.project()
.retry_strategy
.next_delay(Instant::now());

self.as_mut()
.project()
.state
.set(State::WaitingToReconnect(delay(duration, "retrying")))
.set(State::WaitingToReconnect(delay(duration, "retrying")));
}
},
StateProj::FollowingRedirect(maybe_header) => match uri_from_header(maybe_header) {
Expand Down Expand Up @@ -665,4 +684,84 @@ mod tests {

assert_eq!(Some(&expected), actual);
}

use std::{pin::pin, str::FromStr, time::Duration};

use futures::TryStreamExt;
use hyper::{client::HttpConnector, Body, HeaderMap, Request, Uri};
use hyper_timeout::TimeoutConnector;
use tokio::time::timeout;

use crate::{
client::{RequestProps, State},
ReconnectOptionsBuilder, ReconnectingRequest,
};

const INVALID_URI: &'static str = "http://mycrazyunexsistenturl.invaliddomainext";

#[test_case(INVALID_URI, false, |state| matches!(state, State::StreamClosed))]
#[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))]
#[tokio::test]
async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) {
let default_timeout = Some(Duration::from_secs(1));
let conn = HttpConnector::new();
let mut connector = TimeoutConnector::new(conn);
connector.set_connect_timeout(default_timeout);
connector.set_read_timeout(default_timeout);
connector.set_write_timeout(default_timeout);

let reconnect_opts = ReconnectOptionsBuilder::new(false)
.backoff_factor(1)
.delay(Duration::from_secs(1))
.retry_initial(retry_initial)
.build();

let http = hyper::Client::builder().build::<_, hyper::Body>(connector);
let req_props = RequestProps {
url: Uri::from_str(uri).unwrap(),
headers: HeaderMap::new(),
method: "GET".to_string(),
body: None,
reconnect_opts,
max_redirects: 10,
};

let mut reconnecting_request = ReconnectingRequest::new(http, req_props, None);

// sets initial state
let resp = reconnecting_request.http.request(
Request::builder()
.method("GET")
.uri(uri)
.body(Body::empty())
.unwrap(),
);

reconnecting_request.state = State::Connecting {
retry: reconnecting_request.props.reconnect_opts.retry_initial,
resp,
};

let mut reconnecting_request = pin!(reconnecting_request);

timeout(Duration::from_millis(500), reconnecting_request.try_next())
.await
.ok();

assert!(expected(&reconnecting_request.state));
}

#[test_case(false, |state| matches!(state, State::StreamClosed))]
#[test_case(true, |state| matches!(state, State::WaitingToReconnect(_)))]
#[tokio::test]
async fn initial_connection_mocked_server(retry_initial: bool, expected: fn(&State) -> bool) {
let mut mock_server = mockito::Server::new_async().await;
let _mock = mock_server
.mock("GET", "/")
.with_status(404)
.create_async()
.await;

initial_connection(&mock_server.url(), retry_initial, expected).await;
}
}
2 changes: 0 additions & 2 deletions eventsource-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ impl std::error::Error for HeaderError {
#[derive(Debug)]
pub enum Error {
TimedOut,
StreamClosed,
/// An invalid request parameter
InvalidParameter(Box<dyn std::error::Error + Send + Sync + 'static>),
/// The HTTP response could not be handled.
Expand All @@ -56,7 +55,6 @@ impl std::fmt::Display for Error {
use Error::*;
match self {
TimedOut => write!(f, "timed out"),
StreamClosed => write!(f, "stream closed"),
InvalidParameter(err) => write!(f, "invalid parameter: {err}"),
UnexpectedResponse(r, _) => {
let status = r.status();
Expand Down