Skip to content

Commit 2ac2998

Browse files
authored
fix: schedule reconnect after parse error during streaming (#134)
## Summary Every error path in `ReconnectingRequest::poll_next` during the `Connected` state sets state to `WaitingToReconnect` before returning the error — except for parser errors propagated through `process_bytes(...)?`. After a parse error, the state stayed at `Connected` and the next poll continued draining the (already-broken) response body before finally hitting end-of-body and emitting a spurious `UnexpectedEof` before the reconnect actually kicked in. This PR schedules a reconnect on parse errors when reconnect is enabled, matching the pattern used by every other error path. The error is still returned to the caller; the only behavioral change is that the next poll cleanly transitions to reconnect rather than draining the broken body. **Before:** `Connected`, `Err(InvalidLine)`, `Err(UnexpectedEof)`, `Connected`, … **After:** `Connected`, `Err(InvalidLine)`, `Connected`, … ## Context Surfaced while investigating [launchdarkly/rust-server-sdk#116](launchdarkly/rust-server-sdk#116) (`StreamingDataSource` silently shuts down on stream errors). That report has two contributing bugs; this PR addresses the rust-eventsource-client side. The rust-server-sdk side will be fixed in a follow-up PR — together they restore the reconnection contract: the eventsource-client owns reconnection, the SDK keeps polling and trusts it. Tracked in [SDK-2345](https://launchdarkly.atlassian.net/browse/SDK-2345). ## Test plan - [x] New test `parser_error_schedules_reconnect_immediately` (`eventsource-client/src/client.rs`) — verifies the next stream item after a parse error is `Connected` from the reconnect, not a spurious `UnexpectedEof`. - [x] `cargo test` — 60 lib tests + 2 doc tests pass; no regressions. [SDK-2345]: https://launchdarkly.atlassian.net/browse/SDK-2345?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes streaming error-handling semantics by scheduling reconnects on parser failures, which can affect how downstream consumers observe errors and reconnect timing. Scope is small and covered by a new integration-style test, but touches core stream state transitions. > > **Overview** > **Fixes reconnection behavior after SSE parse errors during streaming.** When `EventParser::process_bytes` fails in the `Connected` state, the client now (when `ReconnectOptions::reconnect` is enabled) transitions to `WaitingToReconnect` before yielding the parse error, avoiding continued draining of a broken response body and the resulting follow-on errors. > > Updates `Client::stream` docs to clarify that stream errors are non-terminal and that the stream only ends on `Poll::Ready(None)`, and adds a new test (`parser_error_schedules_reconnect_immediately`) asserting the sequence `Connected -> Err(InvalidLine) -> Connected`. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit ddea87d. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 53e64f7 commit 2ac2998

1 file changed

Lines changed: 91 additions & 4 deletions

File tree

eventsource-client/src/client.rs

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,20 @@ impl<T: HttpTransport> Client for ClientImpl<T> {
199199
/// Connect to the server and begin consuming the stream. Produces a
200200
/// [`Stream`] of [`Event`](crate::Event)s wrapped in [`Result`].
201201
///
202-
/// Do not use the stream after it returned an error!
202+
/// Errors yielded by the stream are not terminal: keep polling.
203+
/// When [`ReconnectOptions::reconnect`] is enabled (the default),
204+
/// the stream schedules a reconnect on retryable errors and the
205+
/// next poll resumes from a fresh connection.
203206
///
204-
/// After the first successful connection, the stream will
205-
/// reconnect for retryable errors.
207+
/// The stream is exhausted only when [`Stream::poll_next`] returns
208+
/// [`Poll::Ready(None)`]. That happens when the underlying state
209+
/// machine reaches `StreamClosed` (e.g. a redirect-limit overrun,
210+
/// a malformed `Location` header, or an error during initial
211+
/// connection while [`ReconnectOptions::retry_initial`] is
212+
/// disabled), or after any error when reconnect is disabled.
213+
///
214+
/// [`Poll::Ready(None)`]: std::task::Poll::Ready
215+
/// [`Stream::poll_next`]: futures::Stream::poll_next
206216
fn stream(&self) -> BoxStream<Result<SSE>> {
207217
Box::pin(ReconnectingRequest::new(
208218
Arc::clone(&self.transport),
@@ -495,7 +505,21 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
495505
},
496506
StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) {
497507
Some(Ok(result)) => {
498-
this.event_parser.process_bytes(result)?;
508+
if let Err(e) = this.event_parser.process_bytes(result) {
509+
// A parse error means the current response body is
510+
// unusable; schedule a reconnect to abandon it.
511+
if self.props.reconnect_opts.reconnect {
512+
let duration = self
513+
.as_mut()
514+
.project()
515+
.retry_strategy
516+
.next_delay(Instant::now());
517+
self.as_mut().project().state.set(State::WaitingToReconnect(
518+
delay(duration, "reconnecting"),
519+
));
520+
}
521+
return Poll::Ready(Some(Err(e)));
522+
}
499523
continue;
500524
}
501525
Some(Err(e)) => {
@@ -712,4 +736,67 @@ mod tests {
712736

713737
initial_connection(&mock_server.url(), retry_initial, expected).await;
714738
}
739+
740+
// When a parse error happens during streaming and reconnect is
741+
// enabled, the next stream item should be a fresh `Connected` from
742+
// the reconnect, not another error from continuing to drain the
743+
// broken response body.
744+
#[cfg(feature = "hyper")]
745+
#[tokio::test(flavor = "multi_thread")]
746+
async fn parser_error_schedules_reconnect_immediately() {
747+
use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
748+
use futures::StreamExt;
749+
use launchdarkly_sdk_transport::HyperTransport;
750+
751+
let mut server = mockito::Server::new_async().await;
752+
let _mock = server
753+
.mock("GET", "/")
754+
.with_status(200)
755+
.with_body(b"\xff\xfe:bad\n\n".as_ref())
756+
.create_async()
757+
.await;
758+
759+
let transport = HyperTransport::new().expect("failed to build transport");
760+
let client = ClientBuilder::for_url(&server.url())
761+
.unwrap()
762+
.reconnect(
763+
ReconnectOptionsBuilder::new(true)
764+
.delay(Duration::from_millis(10))
765+
.delay_max(Duration::from_millis(10))
766+
.retry_initial(true)
767+
.build(),
768+
)
769+
.build_with_transport(transport);
770+
771+
let mut stream = client.stream();
772+
773+
// Expected order: Connected, parse error, Connected (reconnect).
774+
let mut items = Vec::new();
775+
tokio::time::timeout(Duration::from_secs(2), async {
776+
while items.len() < 3 {
777+
match stream.next().await {
778+
Some(item) => items.push(item),
779+
None => break,
780+
}
781+
}
782+
})
783+
.await
784+
.expect("timed out waiting for parse error and reconnect");
785+
786+
assert!(
787+
matches!(items.first(), Some(Ok(SSE::Connected(_)))),
788+
"expected initial Connected, got {:?}",
789+
items.first()
790+
);
791+
assert!(
792+
matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
793+
"expected InvalidLine error after first connection, got {:?}",
794+
items.get(1)
795+
);
796+
assert!(
797+
matches!(items.get(2), Some(Ok(SSE::Connected(_)))),
798+
"expected reconnect (Connected) immediately after parse error, got {:?}",
799+
items.get(2)
800+
);
801+
}
715802
}

0 commit comments

Comments
 (0)