diff --git a/.github/actions/ci/action.yml b/.github/actions/ci/action.yml index e45beab..63050cc 100644 --- a/.github/actions/ci/action.yml +++ b/.github/actions/ci/action.yml @@ -12,6 +12,10 @@ runs: shell: bash run: cargo test --all-features -p eventsource-client + - name: Run slower integration tests + shell: bash + run: cargo test --all-features -p eventsource-client --lib -- --ignored + - name: Run clippy checks shell: bash run: cargo clippy --all-features -p eventsource-client -- -D warnings diff --git a/Makefile b/Makefile index 6201a3f..dc672d7 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ start-contract-test-service-bg: @make start-contract-test-service >$(TEMP_TEST_OUTPUT) 2>&1 & run-contract-tests: - @curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/v2.0.0/downloader/run.sh \ + @curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/main/downloader/run.sh \ | VERSION=v2 PARAMS="-url http://localhost:8080 -debug -stop-service-at-end $(SKIPFLAGS) $(TEST_HARNESS_PARAMS)" sh contract-tests: build-contract-tests start-contract-test-service-bg run-contract-tests diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index 5092214..a6ae094 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -7,11 +7,12 @@ license = "Apache-2.0" [dependencies] futures = { version = "0.3.21" } serde = { version = "1.0", features = ["derive"] } -eventsource-client = { path = "../eventsource-client" } +eventsource-client = { path = "../eventsource-client", features = ["hyper", "hyper-rustls"] } serde_json = { version = "1.0.39"} actix = { version = "0.13.1"} actix-web = { version = "4"} -reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] } +# reqwest is required for callback client (test harness communication) +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } env_logger = { version = "0.10.0" } log = "0.4.6" http = "1.0" diff --git a/contract-tests/src/bin/sse-test-api/stream_entity.rs b/contract-tests/src/bin/sse-test-api/stream_entity.rs index 118f12a..ba36a91 100644 --- a/contract-tests/src/bin/sse-test-api/stream_entity.rs +++ b/contract-tests/src/bin/sse-test-api/stream_entity.rs @@ -1,5 +1,5 @@ use actix_web::rt::task::JoinHandle; -use futures::{StreamExt, TryStreamExt}; +use futures::TryStreamExt; use log::error; use std::{ sync::{Arc, Mutex}, @@ -7,78 +7,10 @@ use std::{ }; use eventsource_client as es; -use eventsource_client::{ByteStream, HttpTransport, ResponseFuture, TransportError}; +use eventsource_client::HyperTransport; use crate::{Config, EventType}; -// Simple reqwest-based transport implementation -#[derive(Clone)] -struct ReqwestTransport { - client: reqwest::Client, -} - -impl ReqwestTransport { - fn new(timeout: Option) -> Result { - let mut builder = reqwest::Client::builder(); - - if let Some(timeout) = timeout { - builder = builder.timeout(timeout); - } - - let client = builder.build()?; - Ok(Self { client }) - } -} - -impl HttpTransport for ReqwestTransport { - fn request(&self, request: http::Request>) -> ResponseFuture { - let (parts, body_opt) = request.into_parts(); - - let mut req_builder = self - .client - .request(parts.method.clone(), parts.uri.to_string()); - - for (name, value) in parts.headers.iter() { - req_builder = req_builder.header(name, value); - } - - if let Some(body) = body_opt { - req_builder = req_builder.body(body); - } - - let req = match req_builder.build() { - Ok(r) => r, - Err(e) => return Box::pin(async move { Err(TransportError::new(e)) }), - }; - - let client = self.client.clone(); - - Box::pin(async move { - let resp = client.execute(req).await.map_err(TransportError::new)?; - - let status = resp.status(); - let headers = resp.headers().clone(); - - let byte_stream: ByteStream = Box::pin( - resp.bytes_stream() - .map(|result| result.map_err(TransportError::new)), - ); - - let mut response_builder = http::Response::builder().status(status); - - for (name, value) in headers.iter() { - response_builder = response_builder.header(name, value); - } - - let response = response_builder - .body(byte_stream) - .map_err(TransportError::new)?; - - Ok(response) - }) - } -} - pub(crate) struct Inner { callback_counter: Mutex, callback_url: String, @@ -116,7 +48,7 @@ impl Inner { Ok(None) => break, Err(e) => { let failure = EventType::Error { - error: format!("Error: {:?}", e), + error: format!("Error: {e:?}"), }; if !self.send_message(failure, &client).await { @@ -131,7 +63,7 @@ impl Inner { let json = match serde_json::to_string(&event_type) { Ok(s) => s, Err(e) => { - error!("Failed to json encode event type {:?}", e); + error!("Failed to json encode event type {e:?}"); return false; } }; @@ -142,7 +74,7 @@ impl Inner { match client .post(format!("{}/{}", self.callback_url, counter_val)) - .body(format!("{}\n", json)) + .body(format!("{json}\n")) .send() .await { @@ -151,7 +83,7 @@ impl Inner { *counter = counter_val + 1 } Err(e) => { - error!("Failed to send post back to test harness {:?}", e); + error!("Failed to send post back to test harness {e:?}"); return false; } }; @@ -162,7 +94,7 @@ impl Inner { fn build_client(config: &Config) -> Result, String> { let mut client_builder = match es::ClientBuilder::for_url(&config.stream_url) { Ok(cb) => cb, - Err(e) => return Err(format!("Failed to create client builder {:?}", e)), + Err(e) => return Err(format!("Failed to create client builder {e:?}")), }; let mut reconnect_options = es::ReconnectOptions::reconnect(true); @@ -171,13 +103,6 @@ impl Inner { reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms)); } - // Create transport with timeout configuration - let timeout = config.read_timeout_ms.map(Duration::from_millis); - let transport = match ReqwestTransport::new(timeout) { - Ok(t) => t, - Err(e) => return Err(format!("Failed to create transport {:?}", e)), - }; - if let Some(last_event_id) = &config.last_event_id { client_builder = client_builder.last_event_id(last_event_id.clone()); } @@ -194,11 +119,20 @@ impl Inner { for (name, value) in headers { client_builder = match client_builder.header(name, value) { Ok(cb) => cb, - Err(e) => return Err(format!("Unable to set header {:?}", e)), + Err(e) => return Err(format!("Unable to set header {e:?}")), }; } } + // Build with HyperTransport + let mut transport_builder = HyperTransport::builder(); + + if let Some(timeout_ms) = config.read_timeout_ms { + transport_builder = transport_builder.read_timeout(Duration::from_millis(timeout_ms)); + } + + let transport = transport_builder.build_https(); + Ok(Box::new( client_builder .reconnect(reconnect_options.build()) diff --git a/eventsource-client/Cargo.toml b/eventsource-client/Cargo.toml index aa5d1e5..7a32be4 100644 --- a/eventsource-client/Cargo.toml +++ b/eventsource-client/Cargo.toml @@ -20,6 +20,16 @@ tokio = { version = "1.17.0", features = ["time"] } rand = "0.8.5" base64 = "0.22.1" +# +# Dependencies for hyper transport +# +hyper = { version = "1.0", features = ["client", "http1", "http2"], optional = true } +hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "tokio"], optional = true } +http-body-util = { version = "0.1", optional = true } +hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "webpki-roots"], optional = true } +hyper-timeout = { version = "0.5", optional = true } +tower = { version = "0.4", optional = true } + [dev-dependencies] env_logger = "0.10.0" maplit = "1.0.1" @@ -29,7 +39,7 @@ tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] } test-case = "3.2.1" proptest = "1.0.0" - - [features] -default = [] +default = ["hyper"] +hyper = ["dep:hyper", "dep:hyper-util", "dep:http-body-util", "dep:hyper-timeout", "dep:tower"] +hyper-rustls = ["dep:hyper-rustls"] diff --git a/eventsource-client/README.md b/eventsource-client/README.md index 2d1c312..b022699 100644 --- a/eventsource-client/README.md +++ b/eventsource-client/README.md @@ -4,7 +4,7 @@ Client for the [Server-Sent Events] protocol (aka [EventSource]). -This library focuses on the SSE protocol implementation. You provide the HTTP transport layer (hyper, reqwest, etc.), giving you full control over HTTP configuration like timeouts, TLS, and connection pooling. +This library provides a complete SSE protocol implementation with a built-in HTTP transport powered by hyper v1. The pluggable transport design also allows you to use your own HTTP client (reqwest, custom, etc.) if needed. [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html [EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource @@ -12,7 +12,8 @@ This library focuses on the SSE protocol implementation. You provide the HTTP tr ## Requirements * Tokio async runtime -* An HTTP client library (hyper, reqwest, or custom) +* Enable the `hyper` feature for the built-in HTTP transport (enabled by default) +* Optionally enable `hyper-rustls` for HTTPS support ## Quick Start @@ -20,42 +21,29 @@ This library focuses on the SSE protocol implementation. You provide the HTTP tr ```toml [dependencies] -eventsource-client = "0.17" -reqwest = { version = "0.12", features = ["stream"] } # or hyper v1 +eventsource-client = { version = "0.17", features = ["hyper", "hyper-rustls"] } futures = "0.3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } ``` -### 2. Implement HttpTransport +**Features:** +- `hyper` - Enables the built-in `HyperTransport` for HTTP support (enabled by default) +- `hyper-rustls` - Adds HTTPS support via rustls (optional) -Use one of our example implementations: +### 2. Use the client ```rust -// See examples/reqwest_transport.rs for complete implementation -use eventsource_client::{HttpTransport, ResponseFuture}; - -struct ReqwestTransport { - client: reqwest::Client, -} - -impl HttpTransport for ReqwestTransport { - fn request(&self, request: http::Request<()>) -> ResponseFuture { - // Convert request and call HTTP client - // See examples/ for full implementation - } -} -``` - -### 3. Use the client - -```rust -use eventsource_client::{ClientBuilder, SSE}; +use eventsource_client::{ClientBuilder, HyperTransport, SSE}; use futures::TryStreamExt; +use std::time::Duration; #[tokio::main] async fn main() -> Result<(), Box> { - // Create HTTP transport - let transport = ReqwestTransport::new()?; + // Create HTTP transport with timeouts + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_https(); // or .build_http() for plain HTTP // Build SSE client let client = ClientBuilder::for_url("https://example.com/stream")? @@ -77,9 +65,35 @@ async fn main() -> Result<(), Box> { } ``` +## Example + +The `tail` example demonstrates a complete SSE client using the built-in `HyperTransport`: + +**Run with HTTP:** +```bash +cargo run --example tail --features hyper -- http://sse.dev/test "Bearer token" +``` + +**Run with HTTPS:** +```bash +cargo run --example tail --features hyper,hyper-rustls -- https://sse.dev/test "Bearer token" +``` + +The example shows: +- Creating a `HyperTransport` with custom timeouts +- Building an SSE client with authentication headers +- Configuring automatic reconnection with exponential backoff +- Handling different SSE event types (events, comments, connection status) +- Proper error handling for HTTPS URLs without the `hyper-rustls` feature + +See [`examples/tail.rs`](https://github.com/launchdarkly/rust-eventsource-client/tree/main/eventsource-client/examples/tail.rs) for the complete implementation. + ## Features -* **Pluggable HTTP transport** - Use any HTTP client (hyper, reqwest, or custom) +* **Built-in HTTP transport** - Production-ready `HyperTransport` powered by hyper v1 +* **Configurable timeouts** - Connect, read, and write timeout support +* **HTTPS support** - Optional rustls integration via the `hyper-rustls` feature +* **Pluggable transport** - Use a custom HTTP client if needed (reqwest, etc.) * **Tokio-based streaming** - Efficient async/await support * **Custom headers** - Full control over HTTP requests * **Automatic reconnection** - Configurable exponential backoff @@ -87,22 +101,36 @@ async fn main() -> Result<(), Box> { * **Redirect following** - Automatic handling of HTTP redirects * **Last-Event-ID** - Resume streams from last received event -## Migration from v0.16 +## Custom HTTP Transport -If you're upgrading from v0.16 (which used hyper 0.14 internally), see [MIGRATION.md](MIGRATION.md) for a detailed migration guide. +While the built-in `HyperTransport` works for most use cases, you can implement the `HttpTransport` trait to use your own HTTP client: -Key changes: -- You must now provide an HTTP transport implementation -- Removed `build()`, `build_http()`, and other hyper-specific methods -- Use `build_with_transport(transport)` instead -- Timeout configuration moved to your HTTP transport +```rust +use eventsource_client::{HttpTransport, ByteStream, TransportError}; +use std::pin::Pin; +use std::future::Future; -## Why Pluggable Transport? +#[derive(Clone)] +struct MyTransport { + // Your HTTP client here +} + +impl HttpTransport for MyTransport { + fn request( + &self, + request: http::Request>, + ) -> Pin, TransportError>> + Send + Sync + 'static>> { + // Implement HTTP request handling + // See the HttpTransport trait documentation for details + todo!() + } +} +``` -1. **Use latest HTTP clients** - Not locked to a specific HTTP library version -2. **Full control** - Configure timeouts, TLS, proxies, etc. exactly as needed -3. **Smaller library** - Focused on SSE protocol, not HTTP implementation -4. **Flexibility** - Swap HTTP clients without changing SSE code +This allows you to: +- Use a different HTTP client (reqwest, custom, etc.) +- Implement custom connection pooling or proxy logic +- Add specialized middleware or observability ## Architecture @@ -119,12 +147,12 @@ Key changes: │ HttpTransport trait ▼ ┌─────────────────────────────────────┐ -│ Your HTTP Client │ -│ (hyper, reqwest, custom, etc.) │ +│ HTTP Transport Layer │ +│ • HyperTransport (built-in) │ +│ • Custom (reqwest, etc.) │ └─────────────────────────────────────┘ ``` ## Stability -Early stage release for feedback purposes. May contain bugs or performance -issues. API subject to change. +This library is actively maintained. The SSE protocol implementation is stable. Breaking changes follow semantic versioning. diff --git a/eventsource-client/examples/tail.rs b/eventsource-client/examples/tail.rs new file mode 100644 index 0000000..c2368ca --- /dev/null +++ b/eventsource-client/examples/tail.rs @@ -0,0 +1,123 @@ +//! Example SSE client that tails an event stream +//! +//! This example uses the built-in HyperTransport for HTTP/HTTPS support. +//! +//! To run this example with HTTP support: +//! ```bash +//! cargo run --example tail --features hyper -- http://example.com/events "Bearer token" +//! ``` +//! +//! To run this example with HTTPS support: +//! ```bash +//! cargo run --example tail --features hyper,hyper-rustls -- https://example.com/events "Bearer token" +//! ``` + +use futures::{Stream, TryStreamExt}; +use std::{env, process, time::Duration}; + +use eventsource_client as es; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let args: Vec = env::args().collect(); + + if args.len() != 3 { + eprintln!("Please pass args: "); + eprintln!("Example: cargo run --example tail --features hyper https://sse.dev/test 'Bearer token'"); + process::exit(1); + } + + let url = &args[1]; + let auth_header = &args[2]; + + // Run the appropriate version based on URL scheme and features + if url.starts_with("https://") { + #[cfg(feature = "hyper-rustls")] + { + run_with_https(url, auth_header).await?; + } + #[cfg(not(feature = "hyper-rustls"))] + { + eprintln!("Error: HTTPS URL requires the 'hyper-rustls' feature"); + eprintln!( + "Run with: cargo run --example tail --features hyper,hyper-rustls -- {} '{}'", + url, auth_header + ); + process::exit(1); + } + } else { + run_with_http(url, auth_header).await?; + } + + Ok(()) +} + +async fn run_with_http(url: &str, auth_header: &str) -> Result<(), Box> { + let transport = es::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_http(); + + let client = es::ClientBuilder::for_url(url)? + .header("Authorization", auth_header)? + .reconnect( + es::ReconnectOptions::reconnect(true) + .retry_initial(false) + .delay(Duration::from_secs(1)) + .backoff_factor(2) + .delay_max(Duration::from_secs(60)) + .build(), + ) + .build_with_transport(transport); + + let mut stream = tail_events(client); + + while let Ok(Some(_)) = stream.try_next().await {} + + Ok(()) +} + +#[cfg(feature = "hyper-rustls")] +async fn run_with_https(url: &str, auth_header: &str) -> Result<(), Box> { + let transport = es::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_https(); + + let client = es::ClientBuilder::for_url(url)? + .header("Authorization", auth_header)? + .reconnect( + es::ReconnectOptions::reconnect(true) + .retry_initial(false) + .delay(Duration::from_secs(1)) + .backoff_factor(2) + .delay_max(Duration::from_secs(60)) + .build(), + ) + .build_with_transport(transport); + + let mut stream = tail_events(client); + + while let Ok(Some(_)) = stream.try_next().await {} + + Ok(()) +} + +fn tail_events(client: impl es::Client) -> impl Stream> { + client + .stream() + .map_ok(|event| match event { + es::SSE::Connected(connection) => { + println!("got connected: \nstatus={}", connection.response().status()) + } + es::SSE::Event(ev) => { + println!("got an event: {}\n{}", ev.event_type, ev.data) + } + es::SSE::Comment(comment) => { + println!("got a comment: \n{comment}") + } + }) + .map_err(|err| eprintln!("error streaming events: {err:?}")) +} diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index a7f3f26..b679105 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -117,9 +117,9 @@ impl ClientBuilder { /// Set the Authorization header with the calculated basic authentication value. pub fn basic_auth(self, username: &str, password: &str) -> Result { - let auth = format!("{}:{}", username, password); + let auth = format!("{username}:{password}"); let encoded = BASE64_STANDARD.encode(auth); - let value = format!("Basic {}", encoded); + let value = format!("Basic {encoded}"); self.header("Authorization", &value) } @@ -154,7 +154,8 @@ impl ClientBuilder { /// use eventsource_client::ClientBuilder; /// /// let transport = MyTransport::new(); - /// let client = ClientBuilder::for_url("https://example.com/events")? + /// let client = ClientBuilder::for_url("https://sse.dev/test") + /// .expect("failed to create client builder") /// .build_with_transport(transport); /// ``` pub fn build_with_transport(self, transport: T) -> impl Client @@ -464,7 +465,7 @@ impl Stream for ReconnectingRequest { } Err(e) => { // This happens when the server is unreachable, e.g. connection refused. - warn!("request returned an error: {}", e); + warn!("request returned an error: {e}"); if !*retry { self.as_mut().project().state.set(State::StreamClosed); return Poll::Ready(Some(Err(Error::Transport(e)))); @@ -566,7 +567,7 @@ fn uri_from_header(maybe_header: &Option) -> Result { } fn delay(dur: Duration, description: &str) -> Sleep { - info!("Waiting {:?} before {}", dur, description); + info!("Waiting {dur:?} before {description}"); tokio::time::sleep(dur) } @@ -601,7 +602,7 @@ mod tests { .expect("failed to add authentication"); let actual = builder.headers.get("Authorization"); - let expected = HeaderValue::from_str(format!("Basic {}", expected).as_str()) + let expected = HeaderValue::from_str(format!("Basic {expected}").as_str()) .expect("unable to create expected header"); assert_eq!(Some(&expected), actual); diff --git a/eventsource-client/src/event_parser.rs b/eventsource-client/src/event_parser.rs index 8856a49..8e705fb 100644 --- a/eventsource-client/src/event_parser.rs +++ b/eventsource-client/src/event_parser.rs @@ -186,7 +186,7 @@ impl EventParser { } pub fn process_bytes(&mut self, bytes: Bytes) -> Result<()> { - trace!("Parsing bytes {:?}", bytes); + trace!("Parsing bytes {bytes:?}"); // We get bytes from the underlying stream in chunks. Decoding a chunk has two phases: // decode the chunk into lines, and decode the lines into events. // @@ -255,7 +255,7 @@ impl EventParser { Ok(retry) => { event_data.retry = Some(retry); } - _ => debug!("Failed to parse {:?} into retry value", value), + _ => debug!("Failed to parse {value:?} into retry value"), }; } } @@ -424,7 +424,7 @@ mod tests { match parse_field(b"\x80: invalid UTF-8") { Err(InvalidLine(msg)) => assert!(msg.contains("Utf8Error")), - res => panic!("expected InvalidLine error, got {:?}", res), + res => panic!("expected InvalidLine error, got {res:?}"), } } @@ -719,8 +719,8 @@ mod tests { } fn read_contents_from_file(name: &str) -> Vec { - std::fs::read(format!("test-data/{}", name)) - .unwrap_or_else(|_| panic!("couldn't read {}", name)) + std::fs::read(format!("test-data/{name}")) + .unwrap_or_else(|_| panic!("couldn't read {name}")) } proptest! { diff --git a/eventsource-client/src/lib.rs b/eventsource-client/src/lib.rs index e1a5111..545a46f 100644 --- a/eventsource-client/src/lib.rs +++ b/eventsource-client/src/lib.rs @@ -56,6 +56,8 @@ mod event_parser; mod response; mod retry; mod transport; +#[cfg(feature = "hyper")] +mod transport_hyper; pub use client::*; pub use config::*; @@ -64,3 +66,5 @@ pub use event_parser::Event; pub use event_parser::SSE; pub use response::Response; pub use transport::*; +#[cfg(feature = "hyper")] +pub use transport_hyper::*; diff --git a/eventsource-client/src/transport.rs b/eventsource-client/src/transport.rs index 7867a74..afa5825 100644 --- a/eventsource-client/src/transport.rs +++ b/eventsource-client/src/transport.rs @@ -96,7 +96,7 @@ impl StdError for TransportError { /// } /// } /// ``` -pub trait HttpTransport: Send + Sync + 'static { +pub trait HttpTransport: Send + Sync + Clone + 'static { /// Execute an HTTP request and return a streaming response. /// /// # Arguments diff --git a/eventsource-client/src/transport_hyper.rs b/eventsource-client/src/transport_hyper.rs new file mode 100644 index 0000000..ef52b86 --- /dev/null +++ b/eventsource-client/src/transport_hyper.rs @@ -0,0 +1,684 @@ +//! Hyper v1 transport implementation for eventsource-client +//! +//! This crate provides a production-ready [`HyperTransport`] implementation that +//! integrates hyper v1 with the eventsource-client library. +//! +//! # Example +//! +//! ```no_run +//! use eventsource_client::{ClientBuilder, HyperTransport}; +//! +//! # async fn example() -> Result<(), Box> { +//! let transport = HyperTransport::new(); +//! let client = ClientBuilder::for_url("https://example.com/stream")? +//! .build_with_transport(transport); +//! # Ok(()) +//! # } +//! ``` +//! +//! # Features +//! +//! - `hyper-rustls`: Enable HTTPS support using rustls (via [`HyperTransport::builder().https()`]) +//! +//! # Timeout Configuration +//! +//! ```no_run +//! use eventsource_client::{ClientBuilder, HyperTransport}; +//! use std::time::Duration; +//! +//! # async fn example() -> Result<(), Box> { +//! let transport = HyperTransport::builder() +//! .connect_timeout(Duration::from_secs(10)) +//! .read_timeout(Duration::from_secs(30)) +//! .build_http(); +//! +//! let client = ClientBuilder::for_url("https://example.com/stream")? +//! .build_with_transport(transport); +//! # Ok(()) +//! # } +//! ``` + +use crate::{ByteStream, HttpTransport, TransportError}; +use bytes::Bytes; +use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; +use hyper::body::Incoming; +use hyper_timeout::TimeoutConnector; +use hyper_util::client::legacy::Client as HyperClient; +use hyper_util::rt::TokioExecutor; +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +/// A transport implementation using hyper v1.x +/// +/// This struct wraps a hyper client and implements the [`HttpTransport`] trait +/// for use with eventsource-client. +/// +/// # Timeout Support +/// +/// All three timeout types are fully supported via `hyper-timeout`: +/// - `connect_timeout` - Timeout for establishing the TCP connection +/// - `read_timeout` - Timeout for reading data from the connection +/// - `write_timeout` - Timeout for writing data to the connection +/// +/// Timeouts are configured using the builder pattern. See [`HyperTransportBuilder`] for details. +/// +/// # Example +/// +/// ```no_run +/// use eventsource_client::{ClientBuilder, HyperTransport}; +/// +/// # async fn example() -> Result<(), Box> { +/// // Create transport with default HTTP connector +/// let transport = HyperTransport::new(); +/// +/// // Build SSE client +/// let client = ClientBuilder::for_url("https://example.com/stream")? +/// .build_with_transport(transport); +/// # Ok(()) +/// # } +/// ``` +#[derive(Clone)] +pub struct HyperTransport> +{ + client: HyperClient>>, +} + +/// Builder for configuring a [`HyperTransport`]. +/// +/// This builder allows you to configure timeouts and choose between HTTP and HTTPS connectors. +/// +/// # Example +/// +/// ```no_run +/// use eventsource_client::HyperTransport; +/// use std::time::Duration; +/// +/// let transport = HyperTransport::builder() +/// .connect_timeout(Duration::from_secs(10)) +/// .read_timeout(Duration::from_secs(30)) +/// .build_http(); +/// ``` +#[derive(Default)] +pub struct HyperTransportBuilder { + connect_timeout: Option, + read_timeout: Option, + write_timeout: Option, +} + +impl HyperTransport { + /// Create a new HyperTransport with default HTTP connector and no timeouts + /// + /// This creates a basic HTTP-only client that supports both HTTP/1 and HTTP/2. + /// For HTTPS support or timeout configuration, use [`HyperTransport::builder()`]. + pub fn new() -> Self { + let connector = hyper_util::client::legacy::connect::HttpConnector::new(); + let timeout_connector = TimeoutConnector::new(connector); + let client = HyperClient::builder(TokioExecutor::new()).build(timeout_connector); + + Self { client } + } + + /// Create a new HyperTransport with HTTPS support using rustls + /// + /// This creates an HTTPS client that supports both HTTP/1 and HTTP/2 protocols. + /// This method is only available when the `hyper-rustls` feature is enabled. + /// For timeout configuration, use [`HyperTransport::builder().build_https()`] instead. + /// + /// # Example + /// + /// ```no_run + /// # #[cfg(feature = "hyper-rustls")] + /// # { + /// use eventsource_client::{ClientBuilder, HyperTransport}; + /// + /// # async fn example() -> Result<(), Box> { + /// let transport = HyperTransport::new_https(); + /// let client = ClientBuilder::for_url("https://example.com/stream")? + /// .build_with_transport(transport); + /// # Ok(()) + /// # } + /// # } + /// ``` + #[cfg(feature = "hyper-rustls")] + pub fn new_https() -> HyperTransport< + TimeoutConnector< + hyper_rustls::HttpsConnector, + >, + > { + use hyper_rustls::HttpsConnectorBuilder; + + let connector = HttpsConnectorBuilder::new() + .with_webpki_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + + let timeout_connector = TimeoutConnector::new(connector); + let client = HyperClient::builder(TokioExecutor::new()).build(timeout_connector); + + HyperTransport { client } + } + + /// Create a new builder for configuring HyperTransport + /// + /// The builder allows you to configure timeouts and choose between HTTP and HTTPS connectors. + /// + /// # Example + /// + /// ```no_run + /// use eventsource_client::HyperTransport; + /// use std::time::Duration; + /// + /// let transport = HyperTransport::builder() + /// .connect_timeout(Duration::from_secs(10)) + /// .read_timeout(Duration::from_secs(30)) + /// .build_http(); + /// ``` + pub fn builder() -> HyperTransportBuilder { + HyperTransportBuilder::default() + } +} + +impl HyperTransportBuilder { + /// Set a connect timeout for establishing connections + /// + /// This timeout applies when establishing the TCP connection to the server. + /// There is no connect timeout by default. + pub fn connect_timeout(mut self, timeout: Duration) -> Self { + self.connect_timeout = Some(timeout); + self + } + + /// Set a read timeout for reading from connections + /// + /// This timeout applies when reading data from the connection. + /// There is no read timeout by default. + pub fn read_timeout(mut self, timeout: Duration) -> Self { + self.read_timeout = Some(timeout); + self + } + + /// Set a write timeout for writing to connections + /// + /// This timeout applies when writing data to the connection. + /// There is no write timeout by default. + pub fn write_timeout(mut self, timeout: Duration) -> Self { + self.write_timeout = Some(timeout); + self + } + + /// Build with an HTTP connector + /// + /// Creates a transport that supports HTTP/1 and HTTP/2 over plain HTTP. + pub fn build_http(self) -> HyperTransport { + let connector = hyper_util::client::legacy::connect::HttpConnector::new(); + self.build_with_connector(connector) + } + + /// Build with an HTTPS connector using rustls + /// + /// Creates a transport that supports HTTP/1 and HTTP/2 over HTTPS using rustls. + /// This method is only available when the `hyper-rustls` feature is enabled. + /// + /// # Example + /// + /// ```no_run + /// # #[cfg(feature = "hyper-rustls")] + /// # { + /// use eventsource_client::HyperTransport; + /// use std::time::Duration; + /// + /// let transport = HyperTransport::builder() + /// .connect_timeout(Duration::from_secs(10)) + /// .build_https(); + /// # } + /// ``` + #[cfg(feature = "hyper-rustls")] + pub fn build_https( + self, + ) -> HyperTransport< + TimeoutConnector< + hyper_rustls::HttpsConnector, + >, + > { + use hyper_rustls::HttpsConnectorBuilder; + + let connector = HttpsConnectorBuilder::new() + .with_webpki_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + + self.build_with_connector(connector) + } + + /// Build with a custom connector + /// + /// This allows you to provide your own connector implementation, which is useful for: + /// - Custom TLS configuration + /// - Proxy support + /// - Connection pooling customization + /// - Custom DNS resolution + /// + /// The connector will be automatically wrapped with a `TimeoutConnector` that applies + /// the configured timeout settings. + /// + /// # Example + /// + /// ```no_run + /// use eventsource_client::HyperTransport; + /// use hyper_util::client::legacy::connect::HttpConnector; + /// use std::time::Duration; + /// + /// let mut connector = HttpConnector::new(); + /// // Configure the connector as needed + /// connector.set_nodelay(true); + /// + /// let transport = HyperTransport::builder() + /// .read_timeout(Duration::from_secs(30)) + /// .build_with_connector(connector); + /// ``` + pub fn build_with_connector(self, connector: C) -> HyperTransport> + where + C: tower::Service + Clone + Send + Sync + 'static, + C::Response: hyper_util::client::legacy::connect::Connection + + hyper::rt::Read + + hyper::rt::Write + + Send + + Unpin, + C::Future: Send + 'static, + C::Error: Into>, + { + let mut timeout_connector = TimeoutConnector::new(connector); + timeout_connector.set_connect_timeout(self.connect_timeout); + timeout_connector.set_read_timeout(self.read_timeout); + timeout_connector.set_write_timeout(self.write_timeout); + + let client = HyperClient::builder(TokioExecutor::new()).build(timeout_connector); + + HyperTransport { client } + } +} + +impl Default for HyperTransport { + fn default() -> Self { + Self::new() + } +} + +impl HttpTransport for HyperTransport +where + C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static, +{ + fn request( + &self, + request: http::Request>, + ) -> Pin< + Box< + dyn Future, TransportError>> + + Send + + Sync + + 'static, + >, + > { + // Convert http::Request> to hyper::Request + let (parts, body_opt) = request.into_parts(); + + // Convert Option to BoxBody + let body: BoxBody> = match body_opt { + Some(body_str) => { + // Use Full for non-empty bodies + Full::new(Bytes::from(body_str)) + .map_err(|e| Box::new(e) as Box) + .boxed() + } + None => { + // Use Empty for no body + Empty::::new() + .map_err(|e| Box::new(e) as Box) + .boxed() + } + }; + + let hyper_req = hyper::Request::from_parts(parts, body); + + let client = self.client.clone(); + + Box::pin(async move { + // Make the request - timeouts are handled by TimeoutConnector + let resp = client + .request(hyper_req) + .await + .map_err(TransportError::new)?; + + let (parts, body) = resp.into_parts(); + + // Convert hyper's Incoming body to ByteStream + let byte_stream: ByteStream = Box::pin(body_to_stream(body)); + + Ok(http::Response::from_parts(parts, byte_stream)) + }) + } +} + +/// Convert hyper's Incoming body to a Stream of Bytes +fn body_to_stream( + body: Incoming, +) -> impl futures::Stream> + Send + Sync { + futures::stream::unfold(body, |mut body| async move { + match body.frame().await { + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + Some((Ok(data), body)) + } else { + // Skip non-data frames (trailers, etc.) + Some(( + Err(TransportError::new(std::io::Error::other("non-data frame"))), + body, + )) + } + } + Some(Err(e)) => Some((Err(TransportError::new(e)), body)), + None => None, + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use http::{Method, Request}; + use std::time::Duration; + + #[test] + fn test_hyper_transport_new() { + let transport = HyperTransport::new(); + // If we can create it without panic, the test passes + // This verifies the default HTTP connector is set up correctly + drop(transport); + } + + #[test] + fn test_hyper_transport_default() { + let transport = HyperTransport::default(); + // Verify Default trait implementation + drop(transport); + } + + #[cfg(feature = "hyper-rustls")] + #[test] + fn test_hyper_transport_new_https() { + let transport = HyperTransport::new_https(); + // If we can create it without panic, the test passes + // This verifies the HTTPS connector with rustls is set up correctly + drop(transport); + } + + #[test] + fn test_builder_default() { + let builder = HyperTransport::builder(); + let transport = builder.build_http(); + // Verify we can build with default settings + drop(transport); + } + + #[test] + fn test_builder_with_connect_timeout() { + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(5)) + .build_http(); + // Verify we can build with connect timeout + drop(transport); + } + + #[test] + fn test_builder_with_read_timeout() { + let transport = HyperTransport::builder() + .read_timeout(Duration::from_secs(10)) + .build_http(); + // Verify we can build with read timeout + drop(transport); + } + + #[test] + fn test_builder_with_write_timeout() { + let transport = HyperTransport::builder() + .write_timeout(Duration::from_secs(10)) + .build_http(); + // Verify we can build with write timeout + drop(transport); + } + + #[test] + fn test_builder_with_all_timeouts() { + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(5)) + .read_timeout(Duration::from_secs(30)) + .write_timeout(Duration::from_secs(10)) + .build_http(); + // Verify we can build with all timeouts configured + drop(transport); + } + + #[cfg(feature = "hyper-rustls")] + #[test] + fn test_builder_https() { + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(5)) + .read_timeout(Duration::from_secs(30)) + .build_https(); + // Verify we can build HTTPS transport with timeouts + drop(transport); + } + + #[test] + fn test_builder_with_custom_connector() { + let mut connector = hyper_util::client::legacy::connect::HttpConnector::new(); + connector.set_nodelay(true); + + let transport = HyperTransport::builder() + .read_timeout(Duration::from_secs(30)) + .build_with_connector(connector); + // Verify we can build with a custom connector + drop(transport); + } + + #[test] + fn test_transport_is_clone() { + let transport = HyperTransport::new(); + let _cloned = transport.clone(); + // Verify HyperTransport implements Clone + } + + #[tokio::test] + async fn test_http_transport_trait_implemented() { + let transport = HyperTransport::new(); + + // Create a basic request + let request = Request::builder() + .method(Method::GET) + .uri("http://httpbin.org/get") + .body(None) + .expect("failed to build request"); + + // Verify the trait is implemented by attempting to call it + // We're not actually making the request here, just verifying the types work + let _future = transport.request(request); + // The future exists and has the correct type signature + } + + #[tokio::test] + async fn test_request_with_empty_body() { + // This test verifies that we can construct a request with no body + let transport = HyperTransport::new(); + + let request = Request::builder() + .method(Method::GET) + .uri("http://httpbin.org/get") + .body(None) + .expect("failed to build request"); + + // Just verify we can create the future - not actually making network call + let _future = transport.request(request); + } + + #[tokio::test] + async fn test_request_with_string_body() { + // This test verifies that we can construct a request with a string body + let transport = HyperTransport::new(); + + let request = Request::builder() + .method(Method::POST) + .uri("http://httpbin.org/post") + .body(Some(String::from("test body"))) + .expect("failed to build request"); + + // Just verify we can create the future - not actually making network call + let _future = transport.request(request); + } + + #[tokio::test] + async fn test_body_to_stream_empty() { + // Create an empty incoming body for testing + // This is a bit tricky since Incoming is not easily constructible + // We'll test the integration through the full request path instead + + // For now, this is a placeholder showing the test structure + // A full implementation would require setting up a test HTTP server + } + + // Integration tests that actually make HTTP requests + // These require a running HTTP server, so they're marked as ignored by default + + #[tokio::test] + #[ignore] // Run with: cargo test -- --ignored + async fn test_integration_http_request() { + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_http(); + + let request = Request::builder() + .method(Method::GET) + .uri("http://httpbin.org/get") + .body(None) + .expect("failed to build request"); + + let response = transport.request(request).await; + assert!(response.is_ok(), "Request should succeed"); + + let response = response.unwrap(); + assert!(response.status().is_success(), "Status should be success"); + + // Verify we can read from the stream + let mut stream = response.into_body(); + let mut received_data = false; + while let Some(result) = stream.next().await { + assert!(result.is_ok(), "Stream chunk should not error"); + received_data = true; + } + assert!(received_data, "Should have received some data"); + } + + #[cfg(feature = "hyper-rustls")] + #[tokio::test] + #[ignore] // Run with: cargo test -- --ignored + async fn test_integration_https_request() { + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_https(); + + // Using example.com as it's highly reliable and well-maintained + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(None) + .expect("failed to build request"); + + let response = transport.request(request).await; + assert!( + response.is_ok(), + "HTTPS request should succeed: {:?}", + response.as_ref().err() + ); + + let response = response.unwrap(); + assert!( + response.status().is_success(), + "Status should be success: {}", + response.status() + ); + } + + #[tokio::test] + #[ignore] // Run with: cargo test -- --ignored + async fn test_integration_request_with_body() { + let transport = HyperTransport::new(); + + let body_content = r#"{"test": "data"}"#; + let request = Request::builder() + .method(Method::POST) + .uri("http://httpbin.org/post") + .header("Content-Type", "application/json") + .body(Some(body_content.to_string())) + .expect("failed to build request"); + + let response = transport.request(request).await; + assert!(response.is_ok(), "POST request should succeed"); + + let response = response.unwrap(); + assert!(response.status().is_success(), "Status should be success"); + } + + #[tokio::test] + #[ignore] // Run with: cargo test -- --ignored + async fn test_integration_streaming_response() { + let transport = HyperTransport::new(); + + let request = Request::builder() + .method(Method::GET) + .uri("http://httpbin.org/stream/10") + .body(None) + .expect("failed to build request"); + + let response = transport.request(request).await; + assert!(response.is_ok(), "Streaming request should succeed"); + + let response = response.unwrap(); + assert!(response.status().is_success(), "Status should be success"); + + // Verify we receive multiple chunks + let mut stream = response.into_body(); + let mut chunk_count = 0; + while let Some(result) = stream.next().await { + assert!(result.is_ok(), "Stream chunk should not error"); + let chunk = result.unwrap(); + assert!(!chunk.is_empty(), "Chunk should not be empty"); + chunk_count += 1; + } + assert!(chunk_count > 0, "Should have received multiple chunks"); + } + + #[tokio::test] + #[ignore] // Run with: cargo test -- --ignored + async fn test_integration_connect_timeout() { + // Use a non-routable IP to test connect timeout + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_millis(100)) + .build_http(); + + let request = Request::builder() + .method(Method::GET) + .uri("http://10.255.255.1/") + .body(None) + .expect("failed to build request"); + + let response = transport.request(request).await; + assert!(response.is_err(), "Request should timeout"); + } +}