diff --git a/.github/actions/build-docs/action.yml b/.github/actions/build-docs/action.yml deleted file mode 100644 index 9d568d4..0000000 --- a/.github/actions/build-docs/action.yml +++ /dev/null @@ -1,9 +0,0 @@ -name: Build Documentation -description: 'Build Documentation.' - -runs: - using: composite - steps: - - name: Build Documentation - shell: bash - run: cargo doc --no-deps -p eventsource-client diff --git a/.github/actions/ci/action.yml b/.github/actions/ci/action.yml index e45beab..1ef04ff 100644 --- a/.github/actions/ci/action.yml +++ b/.github/actions/ci/action.yml @@ -1,5 +1,11 @@ name: CI Workflow -description: 'Shared CI workflow.' +description: "Shared CI workflow." + +inputs: + cargo-flags: + description: "Flags to pass to cargo commands." + required: false + default: "" runs: using: composite @@ -10,20 +16,24 @@ runs: - name: Run tests shell: bash - run: cargo test --all-features -p eventsource-client + run: cargo test ${{ inputs.cargo-flags }} -p eventsource-client + + - name: Run slower integration tests + shell: bash + run: cargo test ${{ inputs.cargo-flags }} -p eventsource-client --lib -- --ignored - name: Run clippy checks shell: bash - run: cargo clippy --all-features -p eventsource-client -- -D warnings + run: cargo clippy ${{ inputs.cargo-flags }} -p eventsource-client -- -D warnings - name: Build contract tests shell: bash - run: make build-contract-tests + run: CARGO_FLAGS="${{ inputs.cargo-flags }}" make build-contract-tests - name: Start contract test service shell: bash - run: make start-contract-test-service-bg + run: CARGO_FLAGS="${{ inputs.cargo-flags }}" make start-contract-test-service-bg - name: Run contract tests shell: bash - run: make run-contract-tests + run: CARGO_FLAGS="${{ inputs.cargo-flags }}" make run-contract-tests diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c733ab..b68e2ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,21 +1,43 @@ name: Run CI on: push: - branches: [main] + branches: + - "main" + - "feat/**" paths-ignore: - "**.md" # Do not need to run CI for markdown changes. pull_request: - branches: [main] + branches: + - "main" + - "feat/**" paths-ignore: - "**.md" jobs: ci-build: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + features: + - name: "default" + cargo-flags: "" + - name: "hyper-rustls-native-roots" + cargo-flags: "--no-default-features --features hyper-rustls-native-roots" + - name: "hyper-rustls-webpki-roots" + cargo-flags: "--no-default-features --features hyper-rustls-webpki-roots" + - name: "native-tls" + cargo-flags: "--no-default-features --features native-tls" + + name: CI (${{ matrix.features.name }}) steps: - uses: actions/checkout@v4 + - name: Get Rust version + id: rust-version + run: cat ./.github/variables/rust-versions.env >> $GITHUB_OUTPUT + - name: Get Rust version id: rust-version run: cat ./.github/variables/rust-versions.env >> $GITHUB_OUTPUT @@ -26,4 +48,23 @@ jobs: rustup component add rustfmt clippy - uses: ./.github/actions/ci - - uses: ./.github/actions/build-docs + with: + cargo-flags: ${{ matrix.features.cargo-flags }} + + build-docs: + runs-on: ubuntu-latest + name: Build Documentation (all features) + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Setup rust tooling + run: rustup override set nightly + + - name: Install cargo-docs-rs + run: cargo install cargo-docs-rs + + - name: Build documentation + run: cargo docs-rs -p eventsource-client diff --git a/.github/workflows/manual-publish.yml b/.github/workflows/manual-publish.yml index b9d1e0b..76c5f30 100644 --- a/.github/workflows/manual-publish.yml +++ b/.github/workflows/manual-publish.yml @@ -27,7 +27,6 @@ jobs: rustup component add rustfmt clippy - uses: ./.github/actions/ci - - uses: ./.github/actions/build-docs - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.2.0 name: "Get crates.io token" diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 0bd0335..f2ce658 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -3,7 +3,8 @@ name: Run Release Please on: push: branches: - - main + - "main" + - "feat/**" jobs: release-package: @@ -26,6 +27,11 @@ jobs: id: rust-version run: cat ./.github/variables/rust-versions.env >> $GITHUB_OUTPUT + - name: Get Rust version + if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }} + id: rust-version + run: cat ./.github/variables/rust-versions.env >> $GITHUB_OUTPUT + - name: Setup rust tooling if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }} run: | @@ -42,9 +48,6 @@ jobs: - uses: ./.github/actions/ci if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }} - - uses: ./.github/actions/build-docs - if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }} - - uses: ./.github/actions/publish if: ${{ steps.release.outputs['eventsource-client--release_created'] == 'true' }} with: diff --git a/.release-please-manifest.json b/.release-please-manifest.json index acf969e..bffb35b 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - "eventsource-client": "0.16.2" + "eventsource-client": "0.16.2" } diff --git a/Makefile b/Makefile index 6201a3f..d3ded8a 100644 --- a/Makefile +++ b/Makefile @@ -1,17 +1,18 @@ TEMP_TEST_OUTPUT=/tmp/contract-test-service.log +CARGO_FLAGS ?= "" build-contract-tests: - @cargo build + @cargo build $(CARGO_FLAGS) start-contract-test-service: build-contract-tests @./target/debug/sse-test-api start-contract-test-service-bg: @echo "Test service output will be captured in $(TEMP_TEST_OUTPUT)" - @make start-contract-test-service >$(TEMP_TEST_OUTPUT) 2>&1 & + @$(MAKE) start-contract-test-service >$(TEMP_TEST_OUTPUT) 2>&1 & run-contract-tests: - @curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/v2.0.0/downloader/run.sh \ + @curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/main/downloader/run.sh \ | VERSION=v2 PARAMS="-url http://localhost:8080 -debug -stop-service-at-end $(SKIPFLAGS) $(TEST_HARNESS_PARAMS)" sh contract-tests: build-contract-tests start-contract-test-service-bg run-contract-tests diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index b34b0ce..954db76 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -11,10 +11,21 @@ eventsource-client = { path = "../eventsource-client" } serde_json = { version = "1.0.39"} actix = { version = "0.13.1"} actix-web = { version = "4"} -reqwest = { version = "0.11.6", default-features = false, features = ["json", "rustls-tls"] } +# reqwest is required for callback client (test harness communication) +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } env_logger = { version = "0.10.0" } -hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] } log = "0.4.6" +http = "1.0" +bytes = "1.5" + +launchdarkly-sdk-transport = { version = "0.1.0" } [[bin]] name = "sse-test-api" + +[features] +default = ["hyper"] +hyper = ["eventsource-client/hyper", "launchdarkly-sdk-transport/hyper"] +native-tls = ["hyper", "eventsource-client/native-tls"] +hyper-rustls-native-roots = ["hyper", "eventsource-client/hyper-rustls-native-roots"] +hyper-rustls-webpki-roots = ["hyper", "eventsource-client/hyper-rustls-webpki-roots"] diff --git a/contract-tests/src/bin/sse-test-api/stream_entity.rs b/contract-tests/src/bin/sse-test-api/stream_entity.rs index 549a12d..5c202f4 100644 --- a/contract-tests/src/bin/sse-test-api/stream_entity.rs +++ b/contract-tests/src/bin/sse-test-api/stream_entity.rs @@ -7,6 +7,7 @@ use std::{ }; use eventsource_client as es; +use launchdarkly_sdk_transport::HyperTransport; use crate::{Config, EventType}; @@ -102,10 +103,6 @@ impl Inner { reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms)); } - if let Some(read_timeout_ms) = config.read_timeout_ms { - client_builder = client_builder.read_timeout(Duration::from_millis(read_timeout_ms)); - } - if let Some(last_event_id) = &config.last_event_id { client_builder = client_builder.last_event_id(last_event_id.clone()); } @@ -127,8 +124,34 @@ impl Inner { } } + // Build with HyperTransport + let mut transport_builder = HyperTransport::builder(); + + if let Some(timeout_ms) = config.read_timeout_ms { + transport_builder = transport_builder.read_timeout(Duration::from_millis(timeout_ms)); + } + + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] + let transport = transport_builder + .build_https() + .map_err(|e| format!("Failed to build HTTPS transport: {e:?}"))?; + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] + let transport = transport_builder + .build_http() + .map_err(|e| format!("Failed to build HTTP transport: {e:?}"))?; + Ok(Box::new( - client_builder.reconnect(reconnect_options.build()).build(), + client_builder + .reconnect(reconnect_options.build()) + .build_with_transport(transport), )) } } diff --git a/eventsource-client/Cargo.toml b/eventsource-client/Cargo.toml index 198bd5e..f9c8a75 100644 --- a/eventsource-client/Cargo.toml +++ b/eventsource-client/Cargo.toml @@ -11,16 +11,17 @@ keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", " exclude = ["CHANGELOG.md"] [dependencies] +bytes = "1.5" futures = "0.3.21" -hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] } -hyper-rustls = { version = "0.24.1", optional = true } +http = "1.0" log = "0.4.6" pin-project = "1.0.10" tokio = { version = "1.17.0", features = ["time"] } -hyper-timeout = "0.4.1" rand = "0.8.5" base64 = "0.22.1" +launchdarkly-sdk-transport = { version = "0.1.0" } + [dev-dependencies] env_logger = "0.10.0" maplit = "1.0.1" @@ -30,11 +31,13 @@ tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] } test-case = "3.2.1" proptest = "1.0.0" - -[features] -default = ["rustls"] -rustls = ["hyper-rustls", "hyper-rustls/http2"] - [[example]] name = "tail" -required-features = ["rustls"] +required-features = ["hyper"] + +[features] +default = ["hyper"] +hyper = ["launchdarkly-sdk-transport/hyper"] +native-tls = ["hyper", "launchdarkly-sdk-transport/native-tls"] +hyper-rustls-native-roots = ["hyper", "launchdarkly-sdk-transport/hyper-rustls-native-roots"] +hyper-rustls-webpki-roots = ["hyper", "launchdarkly-sdk-transport/hyper-rustls-webpki-roots"] diff --git a/eventsource-client/README.md b/eventsource-client/README.md index c1e8dd9..99cbc90 100644 --- a/eventsource-client/README.md +++ b/eventsource-client/README.md @@ -4,43 +4,148 @@ Client for the [Server-Sent Events] protocol (aka [EventSource]). +This library provides a complete SSE protocol implementation with a built-in HTTP transport powered by hyper v1. The pluggable transport design also allows you to use your own HTTP client (reqwest, custom, etc.) if needed. + [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html [EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource -## Requirements +## Quick Start + +### 1. Add dependencies -Requires tokio. +```toml +[dependencies] +eventsource-client = { version = "0.17", features = ["hyper-rustls-native-roots"] } +futures = "0.3" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +``` -## Usage +**Features:** +- `hyper` - Enables the built-in `HyperTransport` for HTTP support (enabled by default) +- `hyper-rustls-native-roots`, `hyper-rustls-webpki-roots`, or `native-tls` - Adds HTTPS support via rustls (optional) -Example that just prints the type of each event received: +### 2. Use the client ```rust -use eventsource_client as es; +use eventsource_client::{ClientBuilder, SSE}; +use launchdarkly_sdk_transport::HyperTransport; +use futures::TryStreamExt; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create HTTP transport with timeouts + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_https(); // or .build_http() for plain HTTP -let mut client = es::ClientBuilder::for_url("https://example.com/stream")? - .header("Authorization", "Basic username:password")? - .build(); + // Build SSE client + let client = ClientBuilder::for_url("https://example.com/stream")? + .header("Authorization", "Bearer token")? + .build_with_transport(transport); -client - .stream() - .map_ok(|event| println!("got event: {:?}", event)) - .map_err(|err| eprintln!("error streaming events: {:?}", err)); + // Stream events + let mut stream = client.stream(); + + while let Some(event) = stream.try_next().await? { + match event { + SSE::Event(evt) => println!("Event: {}", evt.event_type), + SSE::Comment(c) => println!("Comment: {}", c), + SSE::Connected(_) => println!("Connected!"), + } + } + + Ok(()) +} ``` -(Some boilerplate omitted for clarity; see [examples directory] for complete, -working code.) +## Example + +The `tail` example demonstrates a complete SSE client using the built-in `HyperTransport`: + +**Run with HTTP:** +```bash +cargo run --example tail --features hyper -- http://live-test-scores.herokuapp.com/scores "Bearer token" +``` + +**Run with HTTPS:** +```bash +cargo run --example tail --features hyper-rustls-native-roots -- https://live-test-scores.herokuapp.com/scores "Bearer token" +cargo run --example tail --features hyper-rustls-webpki-roots -- https://live-test-scores.herokuapp.com/scores "Bearer token" +cargo run --example tail --features native-tls -- https://live-test-scores.herokuapp.com/scores "Bearer token" +``` + +The example shows: +- Creating a `HyperTransport` with custom timeouts +- Building an SSE client with authentication headers +- Configuring automatic reconnection with exponential backoff +- Handling different SSE event types (events, comments, connection status) +- Proper error handling for HTTPS URLs without the `hyper-rustls-native-roots` feature + +See [`examples/tail.rs`](https://github.com/launchdarkly/rust-eventsource-client/tree/main/eventsource-client/examples/tail.rs) for the complete implementation. -[examples directory]: https://github.com/launchdarkly/rust-eventsource-client/tree/main/eventsource-client/examples ## Features -* tokio-based streaming client. -* Supports setting custom headers on the HTTP request (e.g. for endpoints - requiring authorization). -* Retry for failed connections. -* Reconnection if connection is interrupted, with exponential backoff. +* **Built-in HTTP transport** - Production-ready `HyperTransport` powered by hyper v1 +* **Configurable timeouts** - Connect, read, and write timeout support +* **HTTPS support** - Optional rustls integration via the `hyper-rustls-*` or `native-tls` features +* **Pluggable transport** - Use a custom HTTP client if needed (reqwest, etc.) +* **Tokio-based streaming** - Efficient async/await support +* **Custom headers** - Full control over HTTP requests +* **Automatic reconnection** - Configurable exponential backoff +* **Retry logic** - Handle transient failures gracefully +* **Redirect following** - Automatic handling of HTTP redirects +* **Last-Event-ID** - Resume streams from last received event + +## Custom HTTP Transport + +While the built-in `HyperTransport` works for most use cases, you can implement the `HttpTransport` trait to use your own HTTP client: + +```rust +use launchdarkly_sdk_transport::{HttpTransport, Request, ResponseFuture}; +use bytes::Bytes; + +#[derive(Clone)] +struct MyTransport { + // Your HTTP client here +} + +impl HttpTransport for MyTransport { + fn request(&self, request: Request>) -> ResponseFuture { + // Implement HTTP request handling + // See the HttpTransport trait documentation for details + todo!() + } +} +``` + +This allows you to: +- Use a different HTTP client (reqwest, custom, etc.) +- Implement custom connection pooling or proxy logic +- Add specialized middleware or observability + +## Architecture + +``` +┌─────────────────────────────────────┐ +│ Your Application │ +└─────────────┬───────────────────────┘ + │ + ▼ +┌─────────────────────────────────────┐ +│ eventsource-client │ +│ (SSE Protocol Implementation) │ +└─────────────┬───────────────────────┘ + │ HttpTransport trait + ▼ +┌─────────────────────────────────────┐ +│ HTTP Transport Layer │ +│ • HyperTransport (built-in) │ +│ • Custom (reqwest, etc.) │ +└─────────────────────────────────────┘ +``` ## Stability -Early stage release for feedback purposes. May contain bugs or performance -issues. API subject to change. +This library is actively maintained. The SSE protocol implementation is stable. Breaking changes follow semantic versioning. diff --git a/eventsource-client/examples/tail.rs b/eventsource-client/examples/tail.rs index 9c93598..992031a 100644 --- a/eventsource-client/examples/tail.rs +++ b/eventsource-client/examples/tail.rs @@ -1,23 +1,107 @@ +//! Example SSE client that tails an event stream +//! +//! This example uses the built-in HyperTransport for HTTP/HTTPS support. +//! +//! To run this example with HTTP support: +//! ```bash +//! cargo run --example tail --features hyper -- http://live-test-scores.herokuapp.com/scores "Bearer token" +//! ``` +//! +//! To run this example with HTTPS support: +//! ```bash +//! cargo run --example tail --features hyper-rustls-native-roots -- https://live-test-scores.herokuapp.com/scores "Bearer token" +//! cargo run --example tail --features hyper-rustls-webpki-roots -- https://live-test-scores.herokuapp.com/scores "Bearer token" +//! cargo run --example tail --features native-tls -- https://live-test-scores.herokuapp.com/scores "Bearer token" +//! ``` + use futures::{Stream, TryStreamExt}; use std::{env, process, time::Duration}; use eventsource_client as es; +use launchdarkly_sdk_transport::HyperTransport; #[tokio::main] #[allow(clippy::result_large_err)] -async fn main() -> Result<(), es::Error> { +async fn main() -> Result<(), Box> { env_logger::init(); let args: Vec = env::args().collect(); if args.len() != 3 { eprintln!("Please pass args: "); + eprintln!("Example: cargo run --example tail --features hyper https://live-test-scores.herokuapp.com/scores 'Bearer token'"); process::exit(1); } let url = &args[1]; let auth_header = &args[2]; + // Run the appropriate version based on URL scheme and features + if url.starts_with("https://") { + #[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + ))] + { + run_with_https(url, auth_header).await?; + } + #[cfg(not(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" + )))] + { + eprintln!("Error: HTTPS URL requires the 'hyper-rustls-native-roots', 'hyper-rustls-webpki-roots', or 'native-tls' features"); + eprintln!( + "Run with: cargo run --example tail --features hyper-rustls-native-roots -- {} '{}'", + url, auth_header + ); + process::exit(1); + } + } else { + run_with_http(url, auth_header).await?; + } + + Ok(()) +} + +async fn run_with_http(url: &str, auth_header: &str) -> Result<(), Box> { + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_http()?; + + let client = es::ClientBuilder::for_url(url)? + .header("Authorization", auth_header)? + .reconnect( + es::ReconnectOptions::reconnect(true) + .retry_initial(false) + .delay(Duration::from_secs(1)) + .backoff_factor(2) + .delay_max(Duration::from_secs(60)) + .build(), + ) + .build_with_transport(transport); + + let mut stream = tail_events(client); + + while let Ok(Some(_)) = stream.try_next().await {} + + Ok(()) +} + +#[cfg(any( + feature = "hyper-rustls-native-roots", + feature = "hyper-rustls-webpki-roots", + feature = "native-tls" +))] +async fn run_with_https(url: &str, auth_header: &str) -> Result<(), Box> { + let transport = HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(30)) + .build_https()?; + let client = es::ClientBuilder::for_url(url)? .header("Authorization", auth_header)? .reconnect( @@ -28,7 +112,7 @@ async fn main() -> Result<(), es::Error> { .delay_max(Duration::from_secs(60)) .build(), ) - .build(); + .build_with_transport(transport); let mut stream = tail_events(client); diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 0aa708b..2db6b31 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -1,16 +1,7 @@ use base64::prelude::*; use futures::{ready, Stream}; -use hyper::{ - body::HttpBody, - client::{ - connect::{Connect, Connection}, - ResponseFuture, - }, - header::{HeaderMap, HeaderName, HeaderValue}, - service::Service, - Body, Request, Uri, -}; +use http::{HeaderMap, HeaderName, HeaderValue, Request, Uri}; use log::{debug, info, trace, warn}; use pin_project::pin_project; use std::{ @@ -20,14 +11,12 @@ use std::{ io::ErrorKind, pin::Pin, str::FromStr, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - time::Sleep, -}; +use tokio::time::Sleep; use crate::{ config::ReconnectOptions, @@ -37,9 +26,7 @@ use crate::{ error::{Error, Result}, event_parser::ConnectionDetails, }; - -use hyper::client::HttpConnector; -use hyper_timeout::TimeoutConnector; +use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture}; use crate::event_parser::EventParser; use crate::event_parser::SSE; @@ -47,11 +34,6 @@ use crate::event_parser::SSE; use crate::retry::{BackoffRetry, RetryStrategy}; use std::error::Error as StdError; -#[cfg(feature = "rustls")] -use hyper_rustls::HttpsConnectorBuilder; - -type BoxError = Box; - /// Represents a [`Pin`]'d [`Send`] + [`Sync`] stream, returned by [`Client`]'s stream method. pub type BoxStream = Pin + Send + Sync>>; @@ -75,9 +57,6 @@ pub struct ClientBuilder { url: Uri, headers: HeaderMap, reconnect_opts: ReconnectOptions, - connect_timeout: Option, - read_timeout: Option, - write_timeout: Option, last_event_id: Option, method: String, body: Option, @@ -99,9 +78,6 @@ impl ClientBuilder { url, headers: header_map, reconnect_opts: ReconnectOptions::default(), - connect_timeout: None, - read_timeout: None, - write_timeout: None, last_event_id: None, method: String::from("GET"), max_redirects: None, @@ -148,25 +124,6 @@ impl ClientBuilder { self.header("Authorization", &value) } - /// Set a connect timeout for the underlying connection. There is no connect timeout by - /// default. - pub fn connect_timeout(mut self, connect_timeout: Duration) -> ClientBuilder { - self.connect_timeout = Some(connect_timeout); - self - } - - /// Set a read timeout for the underlying connection. There is no read timeout by default. - pub fn read_timeout(mut self, read_timeout: Duration) -> ClientBuilder { - self.read_timeout = Some(read_timeout); - self - } - - /// Set a write timeout for the underlying connection. There is no write timeout by default. - pub fn write_timeout(mut self, write_timeout: Duration) -> ClientBuilder { - self.write_timeout = Some(write_timeout); - self - } - /// Configure the client's reconnect behaviour according to the supplied /// [`ReconnectOptions`]. /// @@ -184,60 +141,29 @@ impl ClientBuilder { self } - /// Build with a specific client connector. - pub fn build_with_conn(self, conn: C) -> impl Client - where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into, - { - let mut connector = TimeoutConnector::new(conn); - connector.set_connect_timeout(self.connect_timeout); - connector.set_read_timeout(self.read_timeout); - connector.set_write_timeout(self.write_timeout); - - let client = hyper::Client::builder().build::<_, hyper::Body>(connector); - - ClientImpl { - http: client, - request_props: RequestProps { - url: self.url, - headers: self.headers, - method: self.method, - body: self.body, - reconnect_opts: self.reconnect_opts, - max_redirects: self.max_redirects.unwrap_or(DEFAULT_REDIRECT_LIMIT), - }, - last_event_id: self.last_event_id, - } - } - - /// Build with an HTTP client connector. - pub fn build_http(self) -> impl Client { - self.build_with_conn(HttpConnector::new()) - } - - #[cfg(feature = "rustls")] - /// Build with an HTTPS client connector, using the OS root certificate store. - pub fn build(self) -> impl Client { - let conn = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - self.build_with_conn(conn) - } - - /// Build with the given [`hyper::client::Client`]. - pub fn build_with_http_client(self, http: hyper::Client) -> impl Client + /// Build a client with a custom HTTP transport implementation. + /// + /// # Arguments + /// + /// * `transport` - An implementation of the [`HttpTransport`] trait that will handle + /// HTTP requests. See the `examples/` directory for reference implementations. + /// + /// # Example + /// + /// ```ignore + /// use eventsource_client::ClientBuilder; + /// + /// let transport = MyTransport::new(); + /// let client = ClientBuilder::for_url("https://live-test-scores.herokuapp.com/scores") + /// .expect("failed to create client builder") + /// .build_with_transport(transport); + /// ``` + pub fn build_with_transport(self, transport: T) -> impl Client where - C: Connect + Clone + Send + Sync + 'static, + T: HttpTransport, { ClientImpl { - http, + transport: Arc::new(transport), request_props: RequestProps { url: self.url, headers: self.headers, @@ -263,17 +189,13 @@ struct RequestProps { /// A client implementation that connects to a server using the Server-Sent Events protocol /// and consumes the event stream indefinitely. -/// Can be parameterized with different hyper Connectors, such as HTTP or HTTPS. -struct ClientImpl { - http: hyper::Client, +struct ClientImpl { + transport: Arc, request_props: RequestProps, last_event_id: Option, } -impl Client for ClientImpl -where - C: Connect + Clone + Send + Sync + 'static, -{ +impl Client for ClientImpl { /// Connect to the server and begin consuming the stream. Produces a /// [`Stream`] of [`Event`](crate::Event)s wrapped in [`Result`]. /// @@ -283,7 +205,7 @@ where /// reconnect for retryable errors. fn stream(&self) -> BoxStream> { Box::pin(ReconnectingRequest::new( - self.http.clone(), + Arc::clone(&self.transport), self.request_props.clone(), self.last_event_id.clone(), )) @@ -299,7 +221,7 @@ enum State { #[pin] resp: ResponseFuture, }, - Connected(#[pin] hyper::Body), + Connected(#[pin] ByteStream), WaitingToReconnect(#[pin] Sleep), FollowingRedirect(Option), StreamClosed, @@ -327,8 +249,8 @@ impl Debug for State { #[must_use = "streams do nothing unless polled"] #[pin_project] -pub struct ReconnectingRequest { - http: hyper::Client, +pub struct ReconnectingRequest { + transport: Arc, props: RequestProps, #[pin] state: State, @@ -341,12 +263,12 @@ pub struct ReconnectingRequest { initial_connection: bool, } -impl ReconnectingRequest { +impl ReconnectingRequest { fn new( - http: hyper::Client, + transport: Arc, props: RequestProps, last_event_id: Option, - ) -> ReconnectingRequest { + ) -> ReconnectingRequest { let reconnect_delay = props.reconnect_opts.delay; let delay_max = props.reconnect_opts.delay_max; let backoff_factor = props.reconnect_opts.backoff_factor; @@ -354,7 +276,7 @@ impl ReconnectingRequest { let url = props.url.clone(); ReconnectingRequest { props, - http, + transport, state: State::New, retry_strategy: Box::new(BackoffRetry::new( reconnect_delay, @@ -370,10 +292,7 @@ impl ReconnectingRequest { } } - fn send_request(&self) -> Result - where - C: Connect + Clone + Send + Sync + 'static, - { + fn send_request(&self) -> Result { let mut request_builder = Request::builder() .method(self.props.method.as_str()) .uri(&self.current_url); @@ -391,16 +310,13 @@ impl ReconnectingRequest { } } - let body = match &self.props.body { - Some(body) => Body::from(body.to_string()), - None => Body::empty(), - }; - + // Include the request body if set. Most SSE requests use GET and will have None, + // but some implementations (e.g., using REPORT method) may include a body. let request = request_builder - .body(body) + .body(self.props.body.clone().map(|b| b.into())) .map_err(|e| Error::InvalidParameter(Box::new(e)))?; - Ok(self.http.request(request)) + Ok(self.transport.request(request)) } fn reset_redirects(self: Pin<&mut Self>) { @@ -419,10 +335,7 @@ impl ReconnectingRequest { } } -impl Stream for ReconnectingRequest -where - C: Connect + Clone + Send + Sync + 'static, -{ +impl Stream for ReconnectingRequest { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -477,7 +390,11 @@ where } StateProj::Connecting { retry, resp } => match ready!(resp.poll(cx)) { Ok(resp) => { - debug!("HTTP response: {resp:#?}"); + debug!( + "HTTP response status: {}, headers: {:?}", + resp.status(), + resp.headers() + ); if resp.status().is_success() { self.as_mut().project().retry_strategy.reset(Instant::now()); @@ -504,7 +421,7 @@ where debug!("following redirect {}", self.redirect_count); self.as_mut().project().state.set(State::FollowingRedirect( - resp.headers().get(hyper::header::LOCATION).cloned(), + resp.headers().get("location").cloned(), )); continue; } else { @@ -517,9 +434,13 @@ where } } + let status = resp.status(); + let headers = resp.headers().clone(); + let body = resp.into_body(); + let error = Error::UnexpectedResponse( - Response::new(resp.status(), resp.headers().clone()), - ErrorBody::new(resp.into_body()), + Response::new(status, headers), + ErrorBody::new(body), ); if !*retry { @@ -547,7 +468,7 @@ where warn!("request returned an error: {e}"); if !*retry { self.as_mut().project().state.set(State::StreamClosed); - return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e))))); + return Poll::Ready(Some(Err(Error::Transport(e)))); } let duration = self @@ -572,7 +493,7 @@ where return Poll::Ready(Some(Err(e))); } }, - StateProj::Connected(body) => match ready!(body.poll_data(cx)) { + StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) { Some(Ok(result)) => { this.event_parser.process_bytes(result)?; continue; @@ -590,15 +511,16 @@ where .set(State::WaitingToReconnect(delay(duration, "reconnecting"))); } + // Check if the underlying error is a timeout if let Some(cause) = e.source() { if let Some(downcast) = cause.downcast_ref::() { if let std::io::ErrorKind::TimedOut = downcast.kind() { return Poll::Ready(Some(Err(Error::TimedOut))); } } - } else { - return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e))))); } + + return Poll::Ready(Some(Err(Error::Transport(e)))); } None => { let duration = self @@ -651,15 +573,16 @@ fn delay(dur: Duration, description: &str) -> Sleep { mod private { use crate::client::ClientImpl; + use launchdarkly_sdk_transport::HttpTransport; pub trait Sealed {} - impl Sealed for ClientImpl {} + impl Sealed for ClientImpl {} } #[cfg(test)] mod tests { use crate::ClientBuilder; - use hyper::http::HeaderValue; + use http::HeaderValue; use test_case::test_case; #[test_case("user", "pass", "dXNlcjpwYXNz")] @@ -685,17 +608,55 @@ mod tests { assert_eq!(Some(&expected), actual); } - use std::{pin::pin, str::FromStr, time::Duration}; + use std::{pin::pin, sync::Arc, time::Duration}; - use futures::TryStreamExt; - use hyper::{client::HttpConnector, Body, HeaderMap, Request, Uri}; - use hyper_timeout::TimeoutConnector; + use bytes::Bytes; + use futures::{stream, TryStreamExt}; + use http::HeaderMap; use tokio::time::timeout; use crate::{ client::{RequestProps, State}, ReconnectOptionsBuilder, ReconnectingRequest, }; + use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture, TransportError}; + + // Mock transport for testing + #[derive(Clone)] + struct MockTransport { + fail_request: bool, + } + + impl MockTransport { + fn new(_url: String, fail_request: bool) -> Self { + Self { fail_request } + } + } + + impl HttpTransport for MockTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + if self.fail_request { + // Simulate a connection error + Box::pin(async { + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "connection refused", + ))) + }) + } else { + // Return a 404 response + Box::pin(async { + let byte_stream: ByteStream = + Box::pin(stream::iter(vec![Ok(Bytes::from("not found"))])); + let response = http::Response::builder() + .status(404) + .body(byte_stream) + .unwrap(); + Ok(response) + }) + } + } + } const INVALID_URI: &str = "http://mycrazyunexsistenturl.invaliddomainext"; @@ -703,22 +664,15 @@ mod tests { #[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))] #[tokio::test] async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) { - let default_timeout = Some(Duration::from_secs(1)); - let conn = HttpConnector::new(); - let mut connector = TimeoutConnector::new(conn); - connector.set_connect_timeout(default_timeout); - connector.set_read_timeout(default_timeout); - connector.set_write_timeout(default_timeout); - let reconnect_opts = ReconnectOptionsBuilder::new(false) .backoff_factor(1) .delay(Duration::from_secs(1)) .retry_initial(retry_initial) .build(); - let http = hyper::Client::builder().build::<_, hyper::Body>(connector); + let transport = Arc::new(MockTransport::new(uri.to_string(), true)); let req_props = RequestProps { - url: Uri::from_str(uri).unwrap(), + url: uri.parse().unwrap(), headers: HeaderMap::new(), method: "GET".to_string(), body: None, @@ -726,16 +680,10 @@ mod tests { max_redirects: 10, }; - let mut reconnecting_request = ReconnectingRequest::new(http, req_props, None); + let mut reconnecting_request = ReconnectingRequest::new(transport.clone(), req_props, None); - // sets initial state - let resp = reconnecting_request.http.request( - Request::builder() - .method("GET") - .uri(uri) - .body(Body::empty()) - .unwrap(), - ); + // sets initial state with a failing request + let resp = transport.request(http::Request::builder().uri(uri).body(None).unwrap()); reconnecting_request.state = State::Connecting { retry: reconnecting_request.props.reconnect_opts.retry_initial, diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index b4d60f5..e1c8e39 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -1,4 +1,5 @@ use crate::response::{ErrorBody, Response}; +use launchdarkly_sdk_transport::TransportError; /// Error type for invalid response headers encountered in ResponseDetails. #[derive(Debug)] @@ -36,6 +37,8 @@ pub enum Error { UnexpectedResponse(Response, ErrorBody), /// An error reading from the HTTP response body. HttpStream(Box), + /// An error from the HTTP transport layer. + Transport(TransportError), /// The HTTP response stream ended Eof, /// The HTTP response stream ended unexpectedly (e.g. in the @@ -61,6 +64,7 @@ impl std::fmt::Display for Error { write!(f, "unexpected response: {status}") } HttpStream(err) => write!(f, "http error: {err}"), + Transport(err) => write!(f, "transport error: {err}"), Eof => write!(f, "eof"), UnexpectedEof => write!(f, "unexpected eof"), InvalidLine(line) => write!(f, "invalid line: {line}"), @@ -96,6 +100,7 @@ impl Error { pub fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { Error::HttpStream(err) => Some(err.as_ref()), + Error::Transport(err) => Some(err), _ => None, } } diff --git a/eventsource-client/src/event_parser.rs b/eventsource-client/src/event_parser.rs index c7db771..d58a99b 100644 --- a/eventsource-client/src/event_parser.rs +++ b/eventsource-client/src/event_parser.rs @@ -1,6 +1,6 @@ use std::{collections::VecDeque, convert::TryFrom, str::from_utf8}; -use hyper::body::Bytes; +use bytes::Bytes; use log::{debug, log_enabled, trace}; use pin_project::pin_project; diff --git a/eventsource-client/src/lib.rs b/eventsource-client/src/lib.rs index f55d204..ad2ac3f 100644 --- a/eventsource-client/src/lib.rs +++ b/eventsource-client/src/lib.rs @@ -2,31 +2,52 @@ #![allow(clippy::result_large_err)] //! Client for the [Server-Sent Events] protocol (aka [EventSource]). //! -//! ``` -//! use futures::{TryStreamExt}; -//! # use eventsource_client::Error; -//! use eventsource_client::{Client, SSE}; -//! # #[tokio::main] -//! # async fn main() -> Result<(), eventsource_client::Error> { -//! let mut client = eventsource_client::ClientBuilder::for_url("https://example.com/stream")? -//! .header("Authorization", "Basic username:password")? -//! .build(); -//! -//! let mut stream = Box::pin(client.stream()) -//! .map_ok(|event| match event { -//! SSE::Comment(comment) => println!("got a comment event: {:?}", comment), -//! SSE::Event(evt) => println!("got an event: {}", evt.event_type), -//! SSE::Connected(_) => println!("got connected") -//! }) -//! .map_err(|e| println!("error streaming events: {:?}", e)); -//! # while let Ok(Some(_)) = stream.try_next().await {} -//! # +//! This library provides SSE protocol support but requires you to bring your own +//! HTTP transport. See the `examples/` directory for reference implementations using +//! popular HTTP clients like hyper and reqwest. +//! +//! # Getting Started +//! +//! ```ignore +//! use futures::TryStreamExt; +//! use eventsource_client::{Client, ClientBuilder, SSE}; +//! +//! # async fn example() -> Result<(), Box> { +//! // You need to implement HttpTransport trait for your HTTP client +//! # struct MyTransport; +//! # impl launchdarkly_sdk_transport::HttpTransport for MyTransport { +//! # fn request(&self, _req: http::Request>) -> eventsource_client::ResponseFuture { +//! # unimplemented!() +//! # } +//! # } +//! let transport = MyTransport::new(); +//! +//! let client = ClientBuilder::for_url("https://example.com/stream")? +//! .header("Authorization", "Bearer token")? +//! .build_with_transport(transport); +//! +//! let mut stream = client.stream(); +//! +//! while let Some(event) = stream.try_next().await? { +//! match event { +//! SSE::Event(evt) => println!("Event: {}", evt.event_type), +//! SSE::Comment(comment) => println!("Comment: {}", comment), +//! SSE::Connected(_) => println!("Connected!"), +//! } +//! } //! # Ok(()) //! # } //! ``` //! -//![Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html -//![EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource +//! # Implementing a Transport +//! +//! See the [`transport`] module documentation for details on implementing +//! the [`HttpTransport`] trait. +//! +//! [`HttpTransport`]: HttpTransport +//! +//! [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html +//! [EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource mod client; mod config; diff --git a/eventsource-client/src/response.rs b/eventsource-client/src/response.rs index 4e2eced..25ececc 100644 --- a/eventsource-client/src/response.rs +++ b/eventsource-client/src/response.rs @@ -1,34 +1,34 @@ -use hyper::body::Buf; -use hyper::{header::HeaderValue, Body, HeaderMap, StatusCode}; +use http::{HeaderMap, HeaderValue, StatusCode}; -use crate::{Error, HeaderError}; +use crate::HeaderError; +use launchdarkly_sdk_transport::ByteStream; +/// Represents an error response body as a stream of bytes. +/// +/// The body is provided as a stream so that users can read error details if needed. +/// For large error responses, the stream allows processing without loading the entire +/// response into memory. pub struct ErrorBody { - body: Body, + body: ByteStream, } impl ErrorBody { - pub fn new(body: Body) -> Self { + /// Create a new ErrorBody from a ByteStream + pub fn new(body: ByteStream) -> Self { Self { body } } - /// Returns the body of the response as a vector of bytes. - /// - /// Caution: This method reads the entire body into memory. You should only use this method if - /// you know the response is of a reasonable size. - pub async fn body_bytes(self) -> Result, Error> { - let buf = match hyper::body::aggregate(self.body).await { - Ok(buf) => buf, - Err(err) => return Err(Error::HttpStream(Box::new(err))), - }; - - Ok(buf.chunk().to_vec()) + /// Consume this ErrorBody and return the underlying ByteStream + pub fn into_stream(self) -> ByteStream { + self.body } } impl std::fmt::Debug for ErrorBody { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ErrorBody").finish() + f.debug_struct("ErrorBody") + .field("body", &"") + .finish() } }