Skip to content

Commit 80c123a

Browse files
authored
fix: respect reconnect=false and clamp server-supplied retry: 0 (#135)
## Summary Stacked on top of #134. Addresses three pre-existing reconnect-control gaps surfaced during the multi-agent review of that PR. 1. **`retry: 0` collapses backoff to zero.** A server emitting `retry: 0` set `BackoffRetry::base_delay` to `Duration::ZERO`, after which every reconnect path (`client.rs:474, 525, 547, 612` and the new parse-error path) computed `next_delay() == 0` and reconnected immediately — a tight loop. Clamps `change_base_delay` to a 1 ms floor. 2. **EOF arm ignored `reconnect_opts.reconnect`.** The body-exhausted branch unconditionally scheduled `WaitingToReconnect` even when reconnect was disabled. Now honors the flag and transitions to `StreamClosed`, matching every other error path. 3. **Parse-error arm with `reconnect=false` left the parser poisoned.** No state transition happened, so the next poll drained the broken body to EOF — where (2) above papered over the bug. Now transitions to `StreamClosed` so the documented \"do not use the stream after error\" contract holds. ## Context Stacked PR — base is the `rl/sdk-2345/parser-error-reconnect-state` branch from #134, **not** main. Will retarget once #134 lands. Tracked in [SDK-2347](https://launchdarkly.atlassian.net/browse/SDK-2347). Predecessor: [SDK-2345](https://launchdarkly.atlassian.net/browse/SDK-2345) / #134. Surfaced from the multi-agent review of #134 (findings 1, 2, and the corresponding suggested follow-ups 4 and 5). All three were pre-existing — #134 only made the parse-error path more visible. ## Test plan - [x] `test_change_base_delay_clamps_to_minimum` (`retry.rs`) — pins the 1 ms floor against `change_base_delay(Duration::ZERO)` and a sub-floor value. - [x] `parser_error_closes_stream_when_reconnect_disabled` (`client.rs`) — asserts the stream emits one `InvalidLine` then `None` when reconnect is off. - [x] `eof_closes_stream_when_reconnect_disabled` (`client.rs`) — asserts the stream emits the event, `Eof`, then `None` when reconnect is off. - [x] Existing `parser_error_schedules_reconnect_immediately` from #134 still passes. - [x] `cargo test` — 63 lib tests + 1 doc test pass. - [x] `cargo fmt --check` clean. [SDK-2347]: https://launchdarkly.atlassian.net/browse/SDK-2347?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
1 parent 2ac2998 commit 80c123a

2 files changed

Lines changed: 173 additions & 12 deletions

File tree

eventsource-client/src/client.rs

Lines changed: 149 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,10 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
506506
StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) {
507507
Some(Ok(result)) => {
508508
if let Err(e) = this.event_parser.process_bytes(result) {
509-
// A parse error means the current response body is
510-
// unusable; schedule a reconnect to abandon it.
509+
// The current response body is unusable. Either
510+
// schedule a reconnect or close the stream so a
511+
// caller that disabled reconnect doesn't keep
512+
// reading from a poisoned parser.
511513
if self.props.reconnect_opts.reconnect {
512514
let duration = self
513515
.as_mut()
@@ -517,6 +519,8 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
517519
self.as_mut().project().state.set(State::WaitingToReconnect(
518520
delay(duration, "reconnecting"),
519521
));
522+
} else {
523+
self.as_mut().project().state.set(State::StreamClosed);
520524
}
521525
return Poll::Ready(Some(Err(e)));
522526
}
@@ -547,15 +551,19 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
547551
return Poll::Ready(Some(Err(Error::Transport(e))));
548552
}
549553
None => {
550-
let duration = self
551-
.as_mut()
552-
.project()
553-
.retry_strategy
554-
.next_delay(Instant::now());
555-
self.as_mut()
556-
.project()
557-
.state
558-
.set(State::WaitingToReconnect(delay(duration, "retrying")));
554+
if self.props.reconnect_opts.reconnect {
555+
let duration = self
556+
.as_mut()
557+
.project()
558+
.retry_strategy
559+
.next_delay(Instant::now());
560+
self.as_mut()
561+
.project()
562+
.state
563+
.set(State::WaitingToReconnect(delay(duration, "retrying")));
564+
} else {
565+
self.as_mut().project().state.set(State::StreamClosed);
566+
}
559567

560568
if self.event_parser.was_processing() {
561569
return Poll::Ready(Some(Err(Error::UnexpectedEof)));
@@ -799,4 +807,134 @@ mod tests {
799807
items.get(2)
800808
);
801809
}
810+
811+
// With reconnect disabled, a parse error should close the stream so the
812+
// next poll returns `None` rather than continuing to read from a poisoned
813+
// parser or reconnecting via the EOF arm.
814+
#[cfg(feature = "hyper")]
815+
#[tokio::test(flavor = "multi_thread")]
816+
async fn parser_error_closes_stream_when_reconnect_disabled() {
817+
use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
818+
use futures::StreamExt;
819+
use launchdarkly_sdk_transport::HyperTransport;
820+
821+
let mut server = mockito::Server::new_async().await;
822+
let _mock = server
823+
.mock("GET", "/")
824+
.with_status(200)
825+
.with_body(b"\xff\xfe:bad\n\n".as_ref())
826+
.create_async()
827+
.await;
828+
829+
let transport = HyperTransport::new().expect("failed to build transport");
830+
let client = ClientBuilder::for_url(&server.url())
831+
.unwrap()
832+
.reconnect(
833+
ReconnectOptionsBuilder::new(false)
834+
.retry_initial(true)
835+
.build(),
836+
)
837+
.build_with_transport(transport);
838+
839+
let mut stream = client.stream();
840+
841+
let mut items = Vec::new();
842+
tokio::time::timeout(Duration::from_secs(2), async {
843+
while items.len() < 3 {
844+
match stream.next().await {
845+
Some(item) => items.push(item),
846+
None => {
847+
items.push(Ok(SSE::Comment("__stream_ended__".into())));
848+
break;
849+
}
850+
}
851+
}
852+
})
853+
.await
854+
.expect("timed out waiting for stream to close");
855+
856+
assert!(
857+
matches!(items.first(), Some(Ok(SSE::Connected(_)))),
858+
"expected initial Connected, got {:?}",
859+
items.first()
860+
);
861+
assert!(
862+
matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
863+
"expected InvalidLine error, got {:?}",
864+
items.get(1)
865+
);
866+
assert!(
867+
matches!(
868+
items.get(2),
869+
Some(Ok(SSE::Comment(s))) if s == "__stream_ended__"
870+
),
871+
"expected stream to end (None) after parse error with reconnect disabled, got {:?}",
872+
items.get(2)
873+
);
874+
}
875+
876+
// With reconnect disabled, a clean end-of-body should close the stream
877+
// rather than scheduling a reconnect.
878+
#[cfg(feature = "hyper")]
879+
#[tokio::test(flavor = "multi_thread")]
880+
async fn eof_closes_stream_when_reconnect_disabled() {
881+
use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
882+
use futures::StreamExt;
883+
use launchdarkly_sdk_transport::HyperTransport;
884+
885+
let mut server = mockito::Server::new_async().await;
886+
let _mock = server
887+
.mock("GET", "/")
888+
.with_status(200)
889+
.with_body("event: hello\ndata: world\n\n")
890+
.create_async()
891+
.await;
892+
893+
let transport = HyperTransport::new().expect("failed to build transport");
894+
let client = ClientBuilder::for_url(&server.url())
895+
.unwrap()
896+
.reconnect(
897+
ReconnectOptionsBuilder::new(false)
898+
.retry_initial(true)
899+
.build(),
900+
)
901+
.build_with_transport(transport);
902+
903+
let mut stream = client.stream();
904+
905+
let mut items: Vec<Option<crate::Result<SSE>>> = Vec::new();
906+
tokio::time::timeout(Duration::from_secs(2), async {
907+
for _ in 0..4 {
908+
let item = stream.next().await;
909+
let is_terminal = item.is_none();
910+
items.push(item);
911+
if is_terminal {
912+
break;
913+
}
914+
}
915+
})
916+
.await
917+
.expect("timed out waiting for stream to close");
918+
919+
assert!(
920+
matches!(items.first(), Some(Some(Ok(SSE::Connected(_))))),
921+
"expected initial Connected, got {:?}",
922+
items.first()
923+
);
924+
assert!(
925+
matches!(items.get(1), Some(Some(Ok(SSE::Event(e)))) if e.event_type == "hello"),
926+
"expected hello event, got {:?}",
927+
items.get(1)
928+
);
929+
assert!(
930+
matches!(items.get(2), Some(Some(Err(Error::Eof)))),
931+
"expected Eof error after body ends, got {:?}",
932+
items.get(2)
933+
);
934+
assert!(
935+
matches!(items.get(3), Some(None)),
936+
"expected stream to end (None) after EOF with reconnect disabled, got {:?}",
937+
items.get(3)
938+
);
939+
}
802940
}

eventsource-client/src/retry.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ pub(crate) trait RetryStrategy {
1515

1616
const DEFAULT_RESET_RETRY_INTERVAL: Duration = Duration::from_secs(60);
1717

18+
/// Floor applied to a server-supplied SSE `retry:` value. A server that
19+
/// sends `retry: 0` would otherwise collapse the backoff to zero and
20+
/// reconnect would become a tight loop.
21+
const MINIMUM_BASE_DELAY: Duration = Duration::from_millis(1);
22+
1823
pub(crate) struct BackoffRetry {
1924
base_delay: Duration,
2025
max_delay: Duration,
@@ -66,7 +71,7 @@ impl RetryStrategy for BackoffRetry {
6671
}
6772

6873
fn change_base_delay(&mut self, base_delay: Duration) {
69-
self.base_delay = base_delay;
74+
self.base_delay = std::cmp::max(base_delay, MINIMUM_BASE_DELAY);
7075
self.next_delay = self.base_delay;
7176
}
7277

@@ -111,6 +116,24 @@ mod tests {
111116
assert_eq!(retry.next_delay(start.add(Duration::from_secs(2))), base);
112117
}
113118

119+
#[test]
120+
fn test_change_base_delay_clamps_to_minimum() {
121+
// A server that sends `retry: 0` would otherwise produce a zero-delay
122+
// reconnect loop. The clamp ensures the next delay stays at least
123+
// MINIMUM_BASE_DELAY.
124+
let mut retry =
125+
BackoffRetry::new(Duration::from_secs(10), Duration::from_secs(30), 1, false);
126+
let start = Instant::now();
127+
128+
retry.change_base_delay(Duration::ZERO);
129+
assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY);
130+
131+
// Sub-minimum values are also clamped, not silently accepted.
132+
let below_minimum = super::MINIMUM_BASE_DELAY / 2;
133+
retry.change_base_delay(below_minimum);
134+
assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY);
135+
}
136+
114137
#[test]
115138
fn test_with_backoff() {
116139
let base = Duration::from_secs(10);

0 commit comments

Comments
 (0)