diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 043425d..a524f84 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,11 +1,15 @@ name: Run CI on: push: - branches: [main] + branches: + - "main" + - "feat/**" paths-ignore: - "**.md" # Do not need to run CI for markdown changes. pull_request: - branches: [main] + branches: + - "main" + - "feat/**" paths-ignore: - "**.md" diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index ffaf013..18f0e9c 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -3,7 +3,8 @@ name: Run Release Please on: push: branches: - - main + - "main" + - "feat/**" jobs: release-package: diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index b34b0ce..5092214 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -11,10 +11,11 @@ eventsource-client = { path = "../eventsource-client" } serde_json = { version = "1.0.39"} actix = { version = "0.13.1"} actix-web = { version = "4"} -reqwest = { version = "0.11.6", default-features = false, features = ["json", "rustls-tls"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] } env_logger = { version = "0.10.0" } -hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] } log = "0.4.6" +http = "1.0" +bytes = "1.5" [[bin]] name = "sse-test-api" diff --git a/contract-tests/src/bin/sse-test-api/stream_entity.rs b/contract-tests/src/bin/sse-test-api/stream_entity.rs index a46e189..118f12a 100644 --- a/contract-tests/src/bin/sse-test-api/stream_entity.rs +++ b/contract-tests/src/bin/sse-test-api/stream_entity.rs @@ -1,5 +1,5 @@ use actix_web::rt::task::JoinHandle; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use log::error; use std::{ sync::{Arc, Mutex}, @@ -7,9 +7,78 @@ use std::{ }; use eventsource_client as es; +use eventsource_client::{ByteStream, HttpTransport, ResponseFuture, TransportError}; use crate::{Config, EventType}; +// Simple reqwest-based transport implementation +#[derive(Clone)] +struct ReqwestTransport { + client: reqwest::Client, +} + +impl ReqwestTransport { + fn new(timeout: Option) -> Result { + let mut builder = reqwest::Client::builder(); + + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + + let client = builder.build()?; + Ok(Self { client }) + } +} + +impl HttpTransport for ReqwestTransport { + fn request(&self, request: http::Request>) -> ResponseFuture { + let (parts, body_opt) = request.into_parts(); + + let mut req_builder = self + .client + .request(parts.method.clone(), parts.uri.to_string()); + + for (name, value) in parts.headers.iter() { + req_builder = req_builder.header(name, value); + } + + if let Some(body) = body_opt { + req_builder = req_builder.body(body); + } + + let req = match req_builder.build() { + Ok(r) => r, + Err(e) => return Box::pin(async move { Err(TransportError::new(e)) }), + }; + + let client = self.client.clone(); + + Box::pin(async move { + let resp = client.execute(req).await.map_err(TransportError::new)?; + + let status = resp.status(); + let headers = resp.headers().clone(); + + let byte_stream: ByteStream = Box::pin( + resp.bytes_stream() + .map(|result| result.map_err(TransportError::new)), + ); + + let mut response_builder = http::Response::builder().status(status); + + for (name, value) in headers.iter() { + response_builder = response_builder.header(name, value); + } + + let response = response_builder + .body(byte_stream) + .map_err(TransportError::new)?; + + Ok(response) + }) + } +} + pub(crate) struct Inner { callback_counter: Mutex, callback_url: String, @@ -102,9 +171,12 @@ impl Inner { reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms)); } - if let Some(read_timeout_ms) = config.read_timeout_ms { - client_builder = client_builder.read_timeout(Duration::from_millis(read_timeout_ms)); - } + // Create transport with timeout configuration + let timeout = config.read_timeout_ms.map(Duration::from_millis); + let transport = match ReqwestTransport::new(timeout) { + Ok(t) => t, + Err(e) => return Err(format!("Failed to create transport {:?}", e)), + }; if let Some(last_event_id) = &config.last_event_id { client_builder = client_builder.last_event_id(last_event_id.clone()); @@ -128,7 +200,9 @@ impl Inner { } Ok(Box::new( - client_builder.reconnect(reconnect_options.build()).build(), + client_builder + .reconnect(reconnect_options.build()) + .build_with_transport(transport), )) } } diff --git a/eventsource-client/Cargo.toml b/eventsource-client/Cargo.toml index 09c0f84..aa5d1e5 100644 --- a/eventsource-client/Cargo.toml +++ b/eventsource-client/Cargo.toml @@ -11,13 +11,12 @@ keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", " exclude = ["CHANGELOG.md"] [dependencies] +bytes = "1.5" futures = "0.3.21" -hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] } -hyper-rustls = { version = "0.24.1", optional = true } +http = "1.0" log = "0.4.6" pin-project = "1.0.10" tokio = { version = "1.17.0", features = ["time"] } -hyper-timeout = "0.4.1" rand = "0.8.5" base64 = "0.22.1" @@ -31,10 +30,6 @@ test-case = "3.2.1" proptest = "1.0.0" -[features] -default = ["rustls"] -rustls = ["hyper-rustls", "hyper-rustls/http2"] -[[example]] -name = "tail" -required-features = ["rustls"] +[features] +default = [] diff --git a/eventsource-client/README.md b/eventsource-client/README.md index c1e8dd9..2d1c312 100644 --- a/eventsource-client/README.md +++ b/eventsource-client/README.md @@ -4,41 +4,125 @@ Client for the [Server-Sent Events] protocol (aka [EventSource]). +This library focuses on the SSE protocol implementation. You provide the HTTP transport layer (hyper, reqwest, etc.), giving you full control over HTTP configuration like timeouts, TLS, and connection pooling. + [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html [EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource ## Requirements -Requires tokio. +* Tokio async runtime +* An HTTP client library (hyper, reqwest, or custom) + +## Quick Start + +### 1. Add dependencies -## Usage +```toml +[dependencies] +eventsource-client = "0.17" +reqwest = { version = "0.12", features = ["stream"] } # or hyper v1 +futures = "0.3" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +``` + +### 2. Implement HttpTransport -Example that just prints the type of each event received: +Use one of our example implementations: ```rust -use eventsource_client as es; +// See examples/reqwest_transport.rs for complete implementation +use eventsource_client::{HttpTransport, ResponseFuture}; -let mut client = es::ClientBuilder::for_url("https://example.com/stream")? - .header("Authorization", "Basic username:password")? - .build(); +struct ReqwestTransport { + client: reqwest::Client, +} -client - .stream() - .map_ok(|event| println!("got event: {:?}", event)) - .map_err(|err| eprintln!("error streaming events: {:?}", err)); +impl HttpTransport for ReqwestTransport { + fn request(&self, request: http::Request<()>) -> ResponseFuture { + // Convert request and call HTTP client + // See examples/ for full implementation + } +} ``` -(Some boilerplate omitted for clarity; see [examples directory] for complete, -working code.) +### 3. Use the client + +```rust +use eventsource_client::{ClientBuilder, SSE}; +use futures::TryStreamExt; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create HTTP transport + let transport = ReqwestTransport::new()?; + + // Build SSE client + let client = ClientBuilder::for_url("https://example.com/stream")? + .header("Authorization", "Bearer token")? + .build_with_transport(transport); + + // Stream events + let mut stream = client.stream(); + + while let Some(event) = stream.try_next().await? { + match event { + SSE::Event(evt) => println!("Event: {}", evt.event_type), + SSE::Comment(c) => println!("Comment: {}", c), + SSE::Connected(_) => println!("Connected!"), + } + } + + Ok(()) +} +``` -[examples directory]: https://github.com/launchdarkly/rust-eventsource-client/tree/main/eventsource-client/examples ## Features -* tokio-based streaming client. -* Supports setting custom headers on the HTTP request (e.g. for endpoints - requiring authorization). -* Retry for failed connections. -* Reconnection if connection is interrupted, with exponential backoff. +* **Pluggable HTTP transport** - Use any HTTP client (hyper, reqwest, or custom) +* **Tokio-based streaming** - Efficient async/await support +* **Custom headers** - Full control over HTTP requests +* **Automatic reconnection** - Configurable exponential backoff +* **Retry logic** - Handle transient failures gracefully +* **Redirect following** - Automatic handling of HTTP redirects +* **Last-Event-ID** - Resume streams from last received event + +## Migration from v0.16 + +If you're upgrading from v0.16 (which used hyper 0.14 internally), see [MIGRATION.md](MIGRATION.md) for a detailed migration guide. + +Key changes: +- You must now provide an HTTP transport implementation +- Removed `build()`, `build_http()`, and other hyper-specific methods +- Use `build_with_transport(transport)` instead +- Timeout configuration moved to your HTTP transport + +## Why Pluggable Transport? + +1. **Use latest HTTP clients** - Not locked to a specific HTTP library version +2. **Full control** - Configure timeouts, TLS, proxies, etc. exactly as needed +3. **Smaller library** - Focused on SSE protocol, not HTTP implementation +4. **Flexibility** - Swap HTTP clients without changing SSE code + +## Architecture + +``` +┌─────────────────────────────────────┐ +│ Your Application │ +└─────────────┬───────────────────────┘ + │ + ▼ +┌─────────────────────────────────────┐ +│ eventsource-client │ +│ (SSE Protocol Implementation) │ +└─────────────┬───────────────────────┘ + │ HttpTransport trait + ▼ +┌─────────────────────────────────────┐ +│ Your HTTP Client │ +│ (hyper, reqwest, custom, etc.) │ +└─────────────────────────────────────┘ +``` ## Stability diff --git a/eventsource-client/examples/tail.rs b/eventsource-client/examples/tail.rs deleted file mode 100644 index 44fd6c3..0000000 --- a/eventsource-client/examples/tail.rs +++ /dev/null @@ -1,54 +0,0 @@ -use futures::{Stream, TryStreamExt}; -use std::{env, process, time::Duration}; - -use eventsource_client as es; - -#[tokio::main] -async fn main() -> Result<(), es::Error> { - env_logger::init(); - - let args: Vec = env::args().collect(); - - if args.len() != 3 { - eprintln!("Please pass args: "); - process::exit(1); - } - - let url = &args[1]; - let auth_header = &args[2]; - - let client = es::ClientBuilder::for_url(url)? - .header("Authorization", auth_header)? - .reconnect( - es::ReconnectOptions::reconnect(true) - .retry_initial(false) - .delay(Duration::from_secs(1)) - .backoff_factor(2) - .delay_max(Duration::from_secs(60)) - .build(), - ) - .build(); - - let mut stream = tail_events(client); - - while let Ok(Some(_)) = stream.try_next().await {} - - Ok(()) -} - -fn tail_events(client: impl es::Client) -> impl Stream> { - client - .stream() - .map_ok(|event| match event { - es::SSE::Connected(connection) => { - println!("got connected: \nstatus={}", connection.response().status()) - } - es::SSE::Event(ev) => { - println!("got an event: {}\n{}", ev.event_type, ev.data) - } - es::SSE::Comment(comment) => { - println!("got a comment: \n{}", comment) - } - }) - .map_err(|err| eprintln!("error streaming events: {:?}", err)) -} diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 565f66f..a7f3f26 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -1,16 +1,7 @@ use base64::prelude::*; use futures::{ready, Stream}; -use hyper::{ - body::HttpBody, - client::{ - connect::{Connect, Connection}, - ResponseFuture, - }, - header::{HeaderMap, HeaderName, HeaderValue}, - service::Service, - Body, Request, Uri, -}; +use http::{HeaderMap, HeaderName, HeaderValue, Request, Uri}; use log::{debug, info, trace, warn}; use pin_project::pin_project; use std::{ @@ -18,40 +9,31 @@ use std::{ fmt::{self, Debug, Formatter}, future::Future, io::ErrorKind, - pin::{pin, Pin}, + pin::Pin, str::FromStr, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - time::Sleep, -}; +use tokio::time::Sleep; use crate::{ config::ReconnectOptions, response::{ErrorBody, Response}, + {ByteStream, HttpTransport, ResponseFuture}, }; use crate::{ error::{Error, Result}, event_parser::ConnectionDetails, }; -use hyper::client::HttpConnector; -use hyper_timeout::TimeoutConnector; - use crate::event_parser::EventParser; use crate::event_parser::SSE; use crate::retry::{BackoffRetry, RetryStrategy}; use std::error::Error as StdError; -#[cfg(feature = "rustls")] -use hyper_rustls::HttpsConnectorBuilder; - -type BoxError = Box; - /// Represents a [`Pin`]'d [`Send`] + [`Sync`] stream, returned by [`Client`]'s stream method. pub type BoxStream = Pin + Send + Sync>>; @@ -75,9 +57,6 @@ pub struct ClientBuilder { url: Uri, headers: HeaderMap, reconnect_opts: ReconnectOptions, - connect_timeout: Option, - read_timeout: Option, - write_timeout: Option, last_event_id: Option, method: String, body: Option, @@ -99,9 +78,6 @@ impl ClientBuilder { url, headers: header_map, reconnect_opts: ReconnectOptions::default(), - connect_timeout: None, - read_timeout: None, - write_timeout: None, last_event_id: None, method: String::from("GET"), max_redirects: None, @@ -148,25 +124,6 @@ impl ClientBuilder { self.header("Authorization", &value) } - /// Set a connect timeout for the underlying connection. There is no connect timeout by - /// default. - pub fn connect_timeout(mut self, connect_timeout: Duration) -> ClientBuilder { - self.connect_timeout = Some(connect_timeout); - self - } - - /// Set a read timeout for the underlying connection. There is no read timeout by default. - pub fn read_timeout(mut self, read_timeout: Duration) -> ClientBuilder { - self.read_timeout = Some(read_timeout); - self - } - - /// Set a write timeout for the underlying connection. There is no write timeout by default. - pub fn write_timeout(mut self, write_timeout: Duration) -> ClientBuilder { - self.write_timeout = Some(write_timeout); - self - } - /// Configure the client's reconnect behaviour according to the supplied /// [`ReconnectOptions`]. /// @@ -184,60 +141,28 @@ impl ClientBuilder { self } - /// Build with a specific client connector. - pub fn build_with_conn(self, conn: C) -> impl Client - where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into, - { - let mut connector = TimeoutConnector::new(conn); - connector.set_connect_timeout(self.connect_timeout); - connector.set_read_timeout(self.read_timeout); - connector.set_write_timeout(self.write_timeout); - - let client = hyper::Client::builder().build::<_, hyper::Body>(connector); - - ClientImpl { - http: client, - request_props: RequestProps { - url: self.url, - headers: self.headers, - method: self.method, - body: self.body, - reconnect_opts: self.reconnect_opts, - max_redirects: self.max_redirects.unwrap_or(DEFAULT_REDIRECT_LIMIT), - }, - last_event_id: self.last_event_id, - } - } - - /// Build with an HTTP client connector. - pub fn build_http(self) -> impl Client { - self.build_with_conn(HttpConnector::new()) - } - - #[cfg(feature = "rustls")] - /// Build with an HTTPS client connector, using the OS root certificate store. - pub fn build(self) -> impl Client { - let conn = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - self.build_with_conn(conn) - } - - /// Build with the given [`hyper::client::Client`]. - pub fn build_with_http_client(self, http: hyper::Client) -> impl Client + /// Build a client with a custom HTTP transport implementation. + /// + /// # Arguments + /// + /// * `transport` - An implementation of the [`HttpTransport`] trait that will handle + /// HTTP requests. See the `examples/` directory for reference implementations. + /// + /// # Example + /// + /// ```ignore + /// use eventsource_client::ClientBuilder; + /// + /// let transport = MyTransport::new(); + /// let client = ClientBuilder::for_url("https://example.com/events")? + /// .build_with_transport(transport); + /// ``` + pub fn build_with_transport(self, transport: T) -> impl Client where - C: Connect + Clone + Send + Sync + 'static, + T: HttpTransport, { ClientImpl { - http, + transport: Arc::new(transport), request_props: RequestProps { url: self.url, headers: self.headers, @@ -263,17 +188,13 @@ struct RequestProps { /// A client implementation that connects to a server using the Server-Sent Events protocol /// and consumes the event stream indefinitely. -/// Can be parameterized with different hyper Connectors, such as HTTP or HTTPS. -struct ClientImpl { - http: hyper::Client, +struct ClientImpl { + transport: Arc, request_props: RequestProps, last_event_id: Option, } -impl Client for ClientImpl -where - C: Connect + Clone + Send + Sync + 'static, -{ +impl Client for ClientImpl { /// Connect to the server and begin consuming the stream. Produces a /// [`Stream`] of [`Event`](crate::Event)s wrapped in [`Result`]. /// @@ -283,7 +204,7 @@ where /// reconnect for retryable errors. fn stream(&self) -> BoxStream> { Box::pin(ReconnectingRequest::new( - self.http.clone(), + Arc::clone(&self.transport), self.request_props.clone(), self.last_event_id.clone(), )) @@ -299,7 +220,7 @@ enum State { #[pin] resp: ResponseFuture, }, - Connected(#[pin] hyper::Body), + Connected(#[pin] ByteStream), WaitingToReconnect(#[pin] Sleep), FollowingRedirect(Option), StreamClosed, @@ -327,8 +248,8 @@ impl Debug for State { #[must_use = "streams do nothing unless polled"] #[pin_project] -pub struct ReconnectingRequest { - http: hyper::Client, +pub struct ReconnectingRequest { + transport: Arc, props: RequestProps, #[pin] state: State, @@ -341,12 +262,12 @@ pub struct ReconnectingRequest { initial_connection: bool, } -impl ReconnectingRequest { +impl ReconnectingRequest { fn new( - http: hyper::Client, + transport: Arc, props: RequestProps, last_event_id: Option, - ) -> ReconnectingRequest { + ) -> ReconnectingRequest { let reconnect_delay = props.reconnect_opts.delay; let delay_max = props.reconnect_opts.delay_max; let backoff_factor = props.reconnect_opts.backoff_factor; @@ -354,7 +275,7 @@ impl ReconnectingRequest { let url = props.url.clone(); ReconnectingRequest { props, - http, + transport, state: State::New, retry_strategy: Box::new(BackoffRetry::new( reconnect_delay, @@ -370,10 +291,7 @@ impl ReconnectingRequest { } } - fn send_request(&self) -> Result - where - C: Connect + Clone + Send + Sync + 'static, - { + fn send_request(&self) -> Result { let mut request_builder = Request::builder() .method(self.props.method.as_str()) .uri(&self.current_url); @@ -391,16 +309,13 @@ impl ReconnectingRequest { } } - let body = match &self.props.body { - Some(body) => Body::from(body.to_string()), - None => Body::empty(), - }; - + // Include the request body if set. Most SSE requests use GET and will have None, + // but some implementations (e.g., using REPORT method) may include a body. let request = request_builder - .body(body) + .body(self.props.body.clone()) .map_err(|e| Error::InvalidParameter(Box::new(e)))?; - Ok(self.http.request(request)) + Ok(self.transport.request(request)) } fn reset_redirects(self: Pin<&mut Self>) { @@ -419,10 +334,7 @@ impl ReconnectingRequest { } } -impl Stream for ReconnectingRequest -where - C: Connect + Clone + Send + Sync + 'static, -{ +impl Stream for ReconnectingRequest { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -477,7 +389,11 @@ where } StateProj::Connecting { retry, resp } => match ready!(resp.poll(cx)) { Ok(resp) => { - debug!("HTTP response: {:#?}", resp); + debug!( + "HTTP response status: {}, headers: {:?}", + resp.status(), + resp.headers() + ); if resp.status().is_success() { self.as_mut().project().retry_strategy.reset(Instant::now()); @@ -504,7 +420,7 @@ where debug!("following redirect {}", self.redirect_count); self.as_mut().project().state.set(State::FollowingRedirect( - resp.headers().get(hyper::header::LOCATION).cloned(), + resp.headers().get("location").cloned(), )); continue; } else { @@ -517,9 +433,13 @@ where } } + let status = resp.status(); + let headers = resp.headers().clone(); + let body = resp.into_body(); + let error = Error::UnexpectedResponse( - Response::new(resp.status(), resp.headers().clone()), - ErrorBody::new(resp.into_body()), + Response::new(status, headers), + ErrorBody::new(body), ); if !*retry { @@ -547,7 +467,7 @@ where warn!("request returned an error: {}", e); if !*retry { self.as_mut().project().state.set(State::StreamClosed); - return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e))))); + return Poll::Ready(Some(Err(Error::Transport(e)))); } let duration = self @@ -572,7 +492,7 @@ where return Poll::Ready(Some(Err(e))); } }, - StateProj::Connected(body) => match ready!(body.poll_data(cx)) { + StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) { Some(Ok(result)) => { this.event_parser.process_bytes(result)?; continue; @@ -590,15 +510,16 @@ where .set(State::WaitingToReconnect(delay(duration, "reconnecting"))); } + // Check if the underlying error is a timeout if let Some(cause) = e.source() { if let Some(downcast) = cause.downcast_ref::() { if let std::io::ErrorKind::TimedOut = downcast.kind() { return Poll::Ready(Some(Err(Error::TimedOut))); } } - } else { - return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e))))); } + + return Poll::Ready(Some(Err(Error::Transport(e)))); } None => { let duration = self @@ -651,15 +572,16 @@ fn delay(dur: Duration, description: &str) -> Sleep { mod private { use crate::client::ClientImpl; + use crate::HttpTransport; pub trait Sealed {} - impl Sealed for ClientImpl {} + impl Sealed for ClientImpl {} } #[cfg(test)] mod tests { use crate::ClientBuilder; - use hyper::http::HeaderValue; + use http::HeaderValue; use test_case::test_case; #[test_case("user", "pass", "dXNlcjpwYXNz")] @@ -685,40 +607,71 @@ mod tests { assert_eq!(Some(&expected), actual); } - use std::{pin::pin, str::FromStr, time::Duration}; + use std::{pin::pin, sync::Arc, time::Duration}; - use futures::TryStreamExt; - use hyper::{client::HttpConnector, Body, HeaderMap, Request, Uri}; - use hyper_timeout::TimeoutConnector; + use bytes::Bytes; + use futures::{stream, TryStreamExt}; + use http::HeaderMap; use tokio::time::timeout; use crate::{ client::{RequestProps, State}, ReconnectOptionsBuilder, ReconnectingRequest, + {ByteStream, HttpTransport, ResponseFuture, TransportError}, }; - const INVALID_URI: &'static str = "http://mycrazyunexsistenturl.invaliddomainext"; + // Mock transport for testing + #[derive(Clone)] + struct MockTransport { + fail_request: bool, + } + + impl MockTransport { + fn new(_url: String, fail_request: bool) -> Self { + Self { fail_request } + } + } + + impl HttpTransport for MockTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + if self.fail_request { + // Simulate a connection error + Box::pin(async { + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "connection refused", + ))) + }) + } else { + // Return a 404 response + Box::pin(async { + let byte_stream: ByteStream = + Box::pin(stream::iter(vec![Ok(Bytes::from("not found"))])); + let response = http::Response::builder() + .status(404) + .body(byte_stream) + .unwrap(); + Ok(response) + }) + } + } + } + + const INVALID_URI: &str = "http://mycrazyunexsistenturl.invaliddomainext"; #[test_case(INVALID_URI, false, |state| matches!(state, State::StreamClosed))] #[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))] #[tokio::test] async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) { - let default_timeout = Some(Duration::from_secs(1)); - let conn = HttpConnector::new(); - let mut connector = TimeoutConnector::new(conn); - connector.set_connect_timeout(default_timeout); - connector.set_read_timeout(default_timeout); - connector.set_write_timeout(default_timeout); - let reconnect_opts = ReconnectOptionsBuilder::new(false) .backoff_factor(1) .delay(Duration::from_secs(1)) .retry_initial(retry_initial) .build(); - let http = hyper::Client::builder().build::<_, hyper::Body>(connector); + let transport = Arc::new(MockTransport::new(uri.to_string(), true)); let req_props = RequestProps { - url: Uri::from_str(uri).unwrap(), + url: uri.parse().unwrap(), headers: HeaderMap::new(), method: "GET".to_string(), body: None, @@ -726,16 +679,10 @@ mod tests { max_redirects: 10, }; - let mut reconnecting_request = ReconnectingRequest::new(http, req_props, None); + let mut reconnecting_request = ReconnectingRequest::new(transport.clone(), req_props, None); - // sets initial state - let resp = reconnecting_request.http.request( - Request::builder() - .method("GET") - .uri(uri) - .body(Body::empty()) - .unwrap(), - ); + // sets initial state with a failing request + let resp = transport.request(http::Request::builder().uri(uri).body(None).unwrap()); reconnecting_request.state = State::Connecting { retry: reconnecting_request.props.reconnect_opts.retry_initial, diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index b4d60f5..7a1e08b 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -1,4 +1,5 @@ use crate::response::{ErrorBody, Response}; +use crate::TransportError; /// Error type for invalid response headers encountered in ResponseDetails. #[derive(Debug)] @@ -36,6 +37,8 @@ pub enum Error { UnexpectedResponse(Response, ErrorBody), /// An error reading from the HTTP response body. HttpStream(Box), + /// An error from the HTTP transport layer. + Transport(TransportError), /// The HTTP response stream ended Eof, /// The HTTP response stream ended unexpectedly (e.g. in the @@ -61,6 +64,7 @@ impl std::fmt::Display for Error { write!(f, "unexpected response: {status}") } HttpStream(err) => write!(f, "http error: {err}"), + Transport(err) => write!(f, "transport error: {err}"), Eof => write!(f, "eof"), UnexpectedEof => write!(f, "unexpected eof"), InvalidLine(line) => write!(f, "invalid line: {line}"), @@ -96,6 +100,7 @@ impl Error { pub fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { Error::HttpStream(err) => Some(err.as_ref()), + Error::Transport(err) => Some(err), _ => None, } } diff --git a/eventsource-client/src/event_parser.rs b/eventsource-client/src/event_parser.rs index 79a67d3..8856a49 100644 --- a/eventsource-client/src/event_parser.rs +++ b/eventsource-client/src/event_parser.rs @@ -1,6 +1,6 @@ use std::{collections::VecDeque, convert::TryFrom, str::from_utf8}; -use hyper::body::Bytes; +use bytes::Bytes; use log::{debug, log_enabled, trace}; use pin_project::pin_project; diff --git a/eventsource-client/src/lib.rs b/eventsource-client/src/lib.rs index 52e9611..e1a5111 100644 --- a/eventsource-client/src/lib.rs +++ b/eventsource-client/src/lib.rs @@ -1,31 +1,53 @@ #![warn(rust_2018_idioms)] //! Client for the [Server-Sent Events] protocol (aka [EventSource]). //! -//! ``` -//! use futures::{TryStreamExt}; -//! # use eventsource_client::Error; -//! use eventsource_client::{Client, SSE}; -//! # #[tokio::main] -//! # async fn main() -> Result<(), eventsource_client::Error> { -//! let mut client = eventsource_client::ClientBuilder::for_url("https://example.com/stream")? -//! .header("Authorization", "Basic username:password")? -//! .build(); -//! -//! let mut stream = Box::pin(client.stream()) -//! .map_ok(|event| match event { -//! SSE::Comment(comment) => println!("got a comment event: {:?}", comment), -//! SSE::Event(evt) => println!("got an event: {}", evt.event_type), -//! SSE::Connected(_) => println!("got connected") -//! }) -//! .map_err(|e| println!("error streaming events: {:?}", e)); -//! # while let Ok(Some(_)) = stream.try_next().await {} -//! # +//! This library provides SSE protocol support but requires you to bring your own +//! HTTP transport. See the `examples/` directory for reference implementations using +//! popular HTTP clients like hyper and reqwest. +//! +//! # Getting Started +//! +//! ```ignore +//! use futures::TryStreamExt; +//! use eventsource_client::{Client, ClientBuilder, SSE}; +//! +//! # async fn example() -> Result<(), Box> { +//! // You need to implement HttpTransport trait for your HTTP client +//! // See examples/hyper_transport.rs or examples/reqwest_transport.rs for reference implementations +//! # struct MyTransport; +//! # impl eventsource_client::HttpTransport for MyTransport { +//! # fn request(&self, _req: http::Request>) -> eventsource_client::ResponseFuture { +//! # unimplemented!() +//! # } +//! # } +//! let transport = MyTransport::new(); +//! +//! let client = ClientBuilder::for_url("https://example.com/stream")? +//! .header("Authorization", "Bearer token")? +//! .build_with_transport(transport); +//! +//! let mut stream = client.stream(); +//! +//! while let Some(event) = stream.try_next().await? { +//! match event { +//! SSE::Event(evt) => println!("Event: {}", evt.event_type), +//! SSE::Comment(comment) => println!("Comment: {}", comment), +//! SSE::Connected(_) => println!("Connected!"), +//! } +//! } //! # Ok(()) //! # } //! ``` //! -//![Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html -//![EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource +//! # Implementing a Transport +//! +//! See the [`transport`] module documentation for details on implementing +//! the [`HttpTransport`] trait. +//! +//! [`HttpTransport`]: HttpTransport +//! +//! [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html +//! [EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource mod client; mod config; @@ -33,6 +55,7 @@ mod error; mod event_parser; mod response; mod retry; +mod transport; pub use client::*; pub use config::*; @@ -40,3 +63,4 @@ pub use error::*; pub use event_parser::Event; pub use event_parser::SSE; pub use response::Response; +pub use transport::*; diff --git a/eventsource-client/src/response.rs b/eventsource-client/src/response.rs index 4e2eced..07991bc 100644 --- a/eventsource-client/src/response.rs +++ b/eventsource-client/src/response.rs @@ -1,34 +1,33 @@ -use hyper::body::Buf; -use hyper::{header::HeaderValue, Body, HeaderMap, StatusCode}; +use http::{HeaderMap, HeaderValue, StatusCode}; -use crate::{Error, HeaderError}; +use crate::{ByteStream, HeaderError}; +/// Represents an error response body as a stream of bytes. +/// +/// The body is provided as a stream so that users can read error details if needed. +/// For large error responses, the stream allows processing without loading the entire +/// response into memory. pub struct ErrorBody { - body: Body, + body: ByteStream, } impl ErrorBody { - pub fn new(body: Body) -> Self { + /// Create a new ErrorBody from a ByteStream + pub fn new(body: ByteStream) -> Self { Self { body } } - /// Returns the body of the response as a vector of bytes. - /// - /// Caution: This method reads the entire body into memory. You should only use this method if - /// you know the response is of a reasonable size. - pub async fn body_bytes(self) -> Result, Error> { - let buf = match hyper::body::aggregate(self.body).await { - Ok(buf) => buf, - Err(err) => return Err(Error::HttpStream(Box::new(err))), - }; - - Ok(buf.chunk().to_vec()) + /// Consume this ErrorBody and return the underlying ByteStream + pub fn into_stream(self) -> ByteStream { + self.body } } impl std::fmt::Debug for ErrorBody { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ErrorBody").finish() + f.debug_struct("ErrorBody") + .field("body", &"") + .finish() } } diff --git a/eventsource-client/src/transport.rs b/eventsource-client/src/transport.rs new file mode 100644 index 0000000..7867a74 --- /dev/null +++ b/eventsource-client/src/transport.rs @@ -0,0 +1,124 @@ +//! HTTP transport abstraction for Server-Sent Events client. +//! +//! This module defines the [`HttpTransport`] trait which allows users to plug in +//! their own HTTP client implementation (hyper, reqwest, or custom). +//! +//! # Example +//! +//! See the `examples/` directory for reference implementations using popular HTTP clients. + +use bytes::Bytes; +use futures::Stream; +use std::error::Error as StdError; +use std::fmt; +use std::future::Future; +use std::pin::Pin; + +// Re-export http crate types for convenience +pub use http::{HeaderMap, HeaderValue, Request, Response, StatusCode, Uri}; + +/// A pinned, boxed stream of bytes returned by HTTP transports. +/// +/// This represents the streaming response body from an HTTP request. +pub type ByteStream = Pin> + Send + Sync>>; + +/// A pinned, boxed future for an HTTP response. +/// +/// This represents the future returned by [`HttpTransport::request`]. +pub type ResponseFuture = + Pin, TransportError>> + Send + Sync>>; + +/// Error type for HTTP transport operations. +/// +/// This wraps transport-specific errors (network failures, timeouts, etc.) in a +/// common error type that the SSE client can handle uniformly. +#[derive(Debug)] +pub struct TransportError { + inner: Box, +} + +impl TransportError { + /// Create a new transport error from any error type. + pub fn new(err: impl StdError + Send + Sync + 'static) -> Self { + Self { + inner: Box::new(err), + } + } + + /// Get a reference to the inner error. + pub fn inner(&self) -> &(dyn StdError + Send + Sync + 'static) { + &*self.inner + } +} + +impl fmt::Display for TransportError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "transport error: {}", self.inner) + } +} + +impl StdError for TransportError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + Some(&*self.inner) + } +} + +/// Trait for pluggable HTTP transport implementations. +/// +/// Implement this trait to provide HTTP request/response functionality for the +/// SSE client. The transport is responsible for: +/// - Establishing HTTP connections (with TLS if needed) +/// - Sending HTTP requests +/// - Returning streaming HTTP responses +/// - Handling timeouts (if desired) +/// +/// # Example +/// +/// ```ignore +/// use eventsource_client::{HttpTransport, ByteStream, TransportError}; +/// use std::pin::Pin; +/// use std::future::Future; +/// +/// struct MyTransport { +/// // Your HTTP client here +/// } +/// +/// impl HttpTransport for MyTransport { +/// fn request( +/// &self, +/// request: http::Request>, +/// ) -> Pin, TransportError>> + Send>> { +/// // Extract body from request +/// // Convert request to your HTTP client's format +/// // Make the request +/// // Return streaming response +/// todo!() +/// } +/// } +/// ``` +pub trait HttpTransport: Send + Sync + 'static { + /// Execute an HTTP request and return a streaming response. + /// + /// # Arguments + /// + /// * `request` - The HTTP request to execute. The body type is `Option` + /// to support methods like REPORT that may include a request body. Most SSE + /// requests use GET and will have `None` as the body. + /// + /// # Returns + /// + /// A future that resolves to an HTTP response with a streaming body, or a + /// transport error if the request fails. + /// + /// The response should include: + /// - Status code + /// - Response headers + /// - A stream of body bytes + /// + /// # Notes + /// + /// - The transport should NOT follow redirects - the SSE client handles this + /// - The transport should NOT retry requests - the SSE client handles this + /// - The transport MAY implement timeouts as desired + fn request(&self, request: Request>) -> ResponseFuture; +}