Skip to content

Commit ec12251

Browse files
authored
feat!: Replace hyper-specific interfaces with generic trait (#104)
1 parent 3893466 commit ec12251

13 files changed

Lines changed: 497 additions & 293 deletions

File tree

.github/workflows/ci.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
name: Run CI
22
on:
33
push:
4-
branches: [main]
4+
branches:
5+
- "main"
6+
- "feat/**"
57
paths-ignore:
68
- "**.md" # Do not need to run CI for markdown changes.
79
pull_request:
8-
branches: [main]
10+
branches:
11+
- "main"
12+
- "feat/**"
913
paths-ignore:
1014
- "**.md"
1115

.github/workflows/release-please.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ name: Run Release Please
33
on:
44
push:
55
branches:
6-
- main
6+
- "main"
7+
- "feat/**"
78

89
jobs:
910
release-package:

contract-tests/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ eventsource-client = { path = "../eventsource-client" }
1111
serde_json = { version = "1.0.39"}
1212
actix = { version = "0.13.1"}
1313
actix-web = { version = "4"}
14-
reqwest = { version = "0.11.6", default-features = false, features = ["json", "rustls-tls"] }
14+
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] }
1515
env_logger = { version = "0.10.0" }
16-
hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] }
1716
log = "0.4.6"
17+
http = "1.0"
18+
bytes = "1.5"
1819

1920
[[bin]]
2021
name = "sse-test-api"

contract-tests/src/bin/sse-test-api/stream_entity.rs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,84 @@
11
use actix_web::rt::task::JoinHandle;
2-
use futures::TryStreamExt;
2+
use futures::{StreamExt, TryStreamExt};
33
use log::error;
44
use std::{
55
sync::{Arc, Mutex},
66
time::Duration,
77
};
88

99
use eventsource_client as es;
10+
use eventsource_client::{ByteStream, HttpTransport, ResponseFuture, TransportError};
1011

1112
use crate::{Config, EventType};
1213

14+
// Simple reqwest-based transport implementation
15+
#[derive(Clone)]
16+
struct ReqwestTransport {
17+
client: reqwest::Client,
18+
}
19+
20+
impl ReqwestTransport {
21+
fn new(timeout: Option<Duration>) -> Result<Self, reqwest::Error> {
22+
let mut builder = reqwest::Client::builder();
23+
24+
if let Some(timeout) = timeout {
25+
builder = builder.timeout(timeout);
26+
}
27+
28+
let client = builder.build()?;
29+
Ok(Self { client })
30+
}
31+
}
32+
33+
impl HttpTransport for ReqwestTransport {
34+
fn request(&self, request: http::Request<Option<String>>) -> ResponseFuture {
35+
let (parts, body_opt) = request.into_parts();
36+
37+
let mut req_builder = self
38+
.client
39+
.request(parts.method.clone(), parts.uri.to_string());
40+
41+
for (name, value) in parts.headers.iter() {
42+
req_builder = req_builder.header(name, value);
43+
}
44+
45+
if let Some(body) = body_opt {
46+
req_builder = req_builder.body(body);
47+
}
48+
49+
let req = match req_builder.build() {
50+
Ok(r) => r,
51+
Err(e) => return Box::pin(async move { Err(TransportError::new(e)) }),
52+
};
53+
54+
let client = self.client.clone();
55+
56+
Box::pin(async move {
57+
let resp = client.execute(req).await.map_err(TransportError::new)?;
58+
59+
let status = resp.status();
60+
let headers = resp.headers().clone();
61+
62+
let byte_stream: ByteStream = Box::pin(
63+
resp.bytes_stream()
64+
.map(|result| result.map_err(TransportError::new)),
65+
);
66+
67+
let mut response_builder = http::Response::builder().status(status);
68+
69+
for (name, value) in headers.iter() {
70+
response_builder = response_builder.header(name, value);
71+
}
72+
73+
let response = response_builder
74+
.body(byte_stream)
75+
.map_err(TransportError::new)?;
76+
77+
Ok(response)
78+
})
79+
}
80+
}
81+
1382
pub(crate) struct Inner {
1483
callback_counter: Mutex<i32>,
1584
callback_url: String,
@@ -102,9 +171,12 @@ impl Inner {
102171
reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms));
103172
}
104173

105-
if let Some(read_timeout_ms) = config.read_timeout_ms {
106-
client_builder = client_builder.read_timeout(Duration::from_millis(read_timeout_ms));
107-
}
174+
// Create transport with timeout configuration
175+
let timeout = config.read_timeout_ms.map(Duration::from_millis);
176+
let transport = match ReqwestTransport::new(timeout) {
177+
Ok(t) => t,
178+
Err(e) => return Err(format!("Failed to create transport {:?}", e)),
179+
};
108180

109181
if let Some(last_event_id) = &config.last_event_id {
110182
client_builder = client_builder.last_event_id(last_event_id.clone());
@@ -128,7 +200,9 @@ impl Inner {
128200
}
129201

130202
Ok(Box::new(
131-
client_builder.reconnect(reconnect_options.build()).build(),
203+
client_builder
204+
.reconnect(reconnect_options.build())
205+
.build_with_transport(transport),
132206
))
133207
}
134208
}

eventsource-client/Cargo.toml

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", "
1111
exclude = ["CHANGELOG.md"]
1212

1313
[dependencies]
14+
bytes = "1.5"
1415
futures = "0.3.21"
15-
hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] }
16-
hyper-rustls = { version = "0.24.1", optional = true }
16+
http = "1.0"
1717
log = "0.4.6"
1818
pin-project = "1.0.10"
1919
tokio = { version = "1.17.0", features = ["time"] }
20-
hyper-timeout = "0.4.1"
2120
rand = "0.8.5"
2221
base64 = "0.22.1"
2322

@@ -31,10 +30,6 @@ test-case = "3.2.1"
3130
proptest = "1.0.0"
3231

3332

34-
[features]
35-
default = ["rustls"]
36-
rustls = ["hyper-rustls", "hyper-rustls/http2"]
3733

38-
[[example]]
39-
name = "tail"
40-
required-features = ["rustls"]
34+
[features]
35+
default = []

eventsource-client/README.md

Lines changed: 103 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,125 @@
44

55
Client for the [Server-Sent Events] protocol (aka [EventSource]).
66

7+
This library focuses on the SSE protocol implementation. You provide the HTTP transport layer (hyper, reqwest, etc.), giving you full control over HTTP configuration like timeouts, TLS, and connection pooling.
8+
79
[Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html
810
[EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource
911

1012
## Requirements
1113

12-
Requires tokio.
14+
* Tokio async runtime
15+
* An HTTP client library (hyper, reqwest, or custom)
16+
17+
## Quick Start
18+
19+
### 1. Add dependencies
1320

14-
## Usage
21+
```toml
22+
[dependencies]
23+
eventsource-client = "0.17"
24+
reqwest = { version = "0.12", features = ["stream"] } # or hyper v1
25+
futures = "0.3"
26+
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
27+
```
28+
29+
### 2. Implement HttpTransport
1530

16-
Example that just prints the type of each event received:
31+
Use one of our example implementations:
1732

1833
```rust
19-
use eventsource_client as es;
34+
// See examples/reqwest_transport.rs for complete implementation
35+
use eventsource_client::{HttpTransport, ResponseFuture};
2036

21-
let mut client = es::ClientBuilder::for_url("https://example.com/stream")?
22-
.header("Authorization", "Basic username:password")?
23-
.build();
37+
struct ReqwestTransport {
38+
client: reqwest::Client,
39+
}
2440

25-
client
26-
.stream()
27-
.map_ok(|event| println!("got event: {:?}", event))
28-
.map_err(|err| eprintln!("error streaming events: {:?}", err));
41+
impl HttpTransport for ReqwestTransport {
42+
fn request(&self, request: http::Request<()>) -> ResponseFuture {
43+
// Convert request and call HTTP client
44+
// See examples/ for full implementation
45+
}
46+
}
2947
```
3048

31-
(Some boilerplate omitted for clarity; see [examples directory] for complete,
32-
working code.)
49+
### 3. Use the client
50+
51+
```rust
52+
use eventsource_client::{ClientBuilder, SSE};
53+
use futures::TryStreamExt;
54+
55+
#[tokio::main]
56+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
57+
// Create HTTP transport
58+
let transport = ReqwestTransport::new()?;
59+
60+
// Build SSE client
61+
let client = ClientBuilder::for_url("https://example.com/stream")?
62+
.header("Authorization", "Bearer token")?
63+
.build_with_transport(transport);
64+
65+
// Stream events
66+
let mut stream = client.stream();
67+
68+
while let Some(event) = stream.try_next().await? {
69+
match event {
70+
SSE::Event(evt) => println!("Event: {}", evt.event_type),
71+
SSE::Comment(c) => println!("Comment: {}", c),
72+
SSE::Connected(_) => println!("Connected!"),
73+
}
74+
}
75+
76+
Ok(())
77+
}
78+
```
3379

34-
[examples directory]: https://github.com/launchdarkly/rust-eventsource-client/tree/main/eventsource-client/examples
3580
## Features
3681

37-
* tokio-based streaming client.
38-
* Supports setting custom headers on the HTTP request (e.g. for endpoints
39-
requiring authorization).
40-
* Retry for failed connections.
41-
* Reconnection if connection is interrupted, with exponential backoff.
82+
* **Pluggable HTTP transport** - Use any HTTP client (hyper, reqwest, or custom)
83+
* **Tokio-based streaming** - Efficient async/await support
84+
* **Custom headers** - Full control over HTTP requests
85+
* **Automatic reconnection** - Configurable exponential backoff
86+
* **Retry logic** - Handle transient failures gracefully
87+
* **Redirect following** - Automatic handling of HTTP redirects
88+
* **Last-Event-ID** - Resume streams from last received event
89+
90+
## Migration from v0.16
91+
92+
If you're upgrading from v0.16 (which used hyper 0.14 internally), see [MIGRATION.md](MIGRATION.md) for a detailed migration guide.
93+
94+
Key changes:
95+
- You must now provide an HTTP transport implementation
96+
- Removed `build()`, `build_http()`, and other hyper-specific methods
97+
- Use `build_with_transport(transport)` instead
98+
- Timeout configuration moved to your HTTP transport
99+
100+
## Why Pluggable Transport?
101+
102+
1. **Use latest HTTP clients** - Not locked to a specific HTTP library version
103+
2. **Full control** - Configure timeouts, TLS, proxies, etc. exactly as needed
104+
3. **Smaller library** - Focused on SSE protocol, not HTTP implementation
105+
4. **Flexibility** - Swap HTTP clients without changing SSE code
106+
107+
## Architecture
108+
109+
```
110+
┌─────────────────────────────────────┐
111+
│ Your Application │
112+
└─────────────┬───────────────────────┘
113+
114+
115+
┌─────────────────────────────────────┐
116+
│ eventsource-client │
117+
│ (SSE Protocol Implementation) │
118+
└─────────────┬───────────────────────┘
119+
│ HttpTransport trait
120+
121+
┌─────────────────────────────────────┐
122+
│ Your HTTP Client │
123+
│ (hyper, reqwest, custom, etc.) │
124+
└─────────────────────────────────────┘
125+
```
42126

43127
## Stability
44128

eventsource-client/examples/tail.rs

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)