Skip to content

Commit 0bcddd8

Browse files
committed
feat!: Replace hyper-specific interfaces with generic trait
1 parent 3893466 commit 0bcddd8

13 files changed

Lines changed: 557 additions & 295 deletions

File tree

.github/workflows/ci.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
name: Run CI
22
on:
33
push:
4-
branches: [main]
4+
branches:
5+
- "main"
6+
- "feat/**"
57
paths-ignore:
68
- "**.md" # Do not need to run CI for markdown changes.
79
pull_request:
8-
branches: [main]
10+
branches:
11+
- "main"
12+
- "feat/**"
913
paths-ignore:
1014
- "**.md"
1115

.github/workflows/release-please.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ name: Run Release Please
33
on:
44
push:
55
branches:
6-
- main
6+
- "main"
7+
- "feat/**"
78

89
jobs:
910
release-package:

contract-tests/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ eventsource-client = { path = "../eventsource-client" }
1111
serde_json = { version = "1.0.39"}
1212
actix = { version = "0.13.1"}
1313
actix-web = { version = "4"}
14-
reqwest = { version = "0.11.6", default-features = false, features = ["json", "rustls-tls"] }
14+
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] }
1515
env_logger = { version = "0.10.0" }
16-
hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] }
1716
log = "0.4.6"
17+
http = "1.0"
18+
bytes = "1.5"
1819

1920
[[bin]]
2021
name = "sse-test-api"

contract-tests/src/bin/sse-test-api/stream_entity.rs

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,99 @@
11
use actix_web::rt::task::JoinHandle;
2-
use futures::TryStreamExt;
2+
use futures::{StreamExt, TryStreamExt};
33
use log::error;
44
use std::{
5+
future::Future,
6+
pin::Pin,
57
sync::{Arc, Mutex},
68
time::Duration,
79
};
810

911
use eventsource_client as es;
12+
use eventsource_client::{ByteStream, HttpTransport, TransportError};
1013

1114
use crate::{Config, EventType};
1215

16+
// Simple reqwest-based transport implementation
17+
#[derive(Clone)]
18+
struct ReqwestTransport {
19+
client: reqwest::Client,
20+
}
21+
22+
impl ReqwestTransport {
23+
fn new(timeout: Option<Duration>) -> Result<Self, reqwest::Error> {
24+
let mut builder = reqwest::Client::builder();
25+
26+
if let Some(timeout) = timeout {
27+
builder = builder.timeout(timeout);
28+
}
29+
30+
let client = builder.build()?;
31+
Ok(Self { client })
32+
}
33+
}
34+
35+
impl HttpTransport for ReqwestTransport {
36+
fn request(
37+
&self,
38+
request: http::Request<Option<String>>,
39+
) -> Pin<
40+
Box<
41+
dyn Future<Output = Result<http::Response<ByteStream>, TransportError>>
42+
+ Send
43+
+ Sync
44+
+ 'static,
45+
>,
46+
> {
47+
let (parts, body_opt) = request.into_parts();
48+
49+
let mut req_builder = self
50+
.client
51+
.request(parts.method.clone(), parts.uri.to_string());
52+
53+
for (name, value) in parts.headers.iter() {
54+
req_builder = req_builder.header(name, value);
55+
}
56+
57+
if let Some(body) = body_opt {
58+
req_builder = req_builder.body(body);
59+
}
60+
61+
let req = match req_builder.build() {
62+
Ok(r) => r,
63+
Err(e) => return Box::pin(async move { Err(TransportError::new(e)) }),
64+
};
65+
66+
let client = self.client.clone();
67+
68+
Box::pin(async move {
69+
let resp = client
70+
.execute(req)
71+
.await
72+
.map_err(|e| TransportError::new(e))?;
73+
74+
let status = resp.status();
75+
let headers = resp.headers().clone();
76+
77+
let byte_stream: ByteStream = Box::pin(
78+
resp.bytes_stream()
79+
.map(|result| result.map_err(|e| TransportError::new(e))),
80+
);
81+
82+
let mut response_builder = http::Response::builder().status(status);
83+
84+
for (name, value) in headers.iter() {
85+
response_builder = response_builder.header(name, value);
86+
}
87+
88+
let response = response_builder
89+
.body(byte_stream)
90+
.map_err(|e| TransportError::new(e))?;
91+
92+
Ok(response)
93+
})
94+
}
95+
}
96+
1397
pub(crate) struct Inner {
1498
callback_counter: Mutex<i32>,
1599
callback_url: String,
@@ -102,9 +186,12 @@ impl Inner {
102186
reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms));
103187
}
104188

105-
if let Some(read_timeout_ms) = config.read_timeout_ms {
106-
client_builder = client_builder.read_timeout(Duration::from_millis(read_timeout_ms));
107-
}
189+
// Create transport with timeout configuration
190+
let timeout = config.read_timeout_ms.map(Duration::from_millis);
191+
let transport = match ReqwestTransport::new(timeout) {
192+
Ok(t) => t,
193+
Err(e) => return Err(format!("Failed to create transport {:?}", e)),
194+
};
108195

109196
if let Some(last_event_id) = &config.last_event_id {
110197
client_builder = client_builder.last_event_id(last_event_id.clone());
@@ -128,7 +215,9 @@ impl Inner {
128215
}
129216

130217
Ok(Box::new(
131-
client_builder.reconnect(reconnect_options.build()).build(),
218+
client_builder
219+
.reconnect(reconnect_options.build())
220+
.build_with_transport(transport),
132221
))
133222
}
134223
}

eventsource-client/Cargo.toml

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", "
1111
exclude = ["CHANGELOG.md"]
1212

1313
[dependencies]
14+
bytes = "1.5"
1415
futures = "0.3.21"
15-
hyper = { version = "0.14.19", features = ["client", "http1", "tcp"] }
16-
hyper-rustls = { version = "0.24.1", optional = true }
16+
http = "1.0"
1717
log = "0.4.6"
1818
pin-project = "1.0.10"
1919
tokio = { version = "1.17.0", features = ["time"] }
20-
hyper-timeout = "0.4.1"
2120
rand = "0.8.5"
2221
base64 = "0.22.1"
2322

@@ -31,10 +30,6 @@ test-case = "3.2.1"
3130
proptest = "1.0.0"
3231

3332

34-
[features]
35-
default = ["rustls"]
36-
rustls = ["hyper-rustls", "hyper-rustls/http2"]
3733

38-
[[example]]
39-
name = "tail"
40-
required-features = ["rustls"]
34+
[features]
35+
default = []

eventsource-client/README.md

Lines changed: 105 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,127 @@
44

55
Client for the [Server-Sent Events] protocol (aka [EventSource]).
66

7+
This library focuses on the SSE protocol implementation. You provide the HTTP transport layer (hyper, reqwest, etc.), giving you full control over HTTP configuration like timeouts, TLS, and connection pooling.
8+
79
[Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html
810
[EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource
911

1012
## Requirements
1113

12-
Requires tokio.
14+
* Tokio async runtime
15+
* An HTTP client library (hyper, reqwest, or custom)
16+
17+
## Quick Start
18+
19+
### 1. Add dependencies
1320

14-
## Usage
21+
```toml
22+
[dependencies]
23+
eventsource-client = "0.17"
24+
reqwest = { version = "0.12", features = ["stream"] } # or hyper v1
25+
futures = "0.3"
26+
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
27+
```
28+
29+
### 2. Implement HttpTransport
1530

16-
Example that just prints the type of each event received:
31+
Use one of our example implementations:
1732

1833
```rust
19-
use eventsource_client as es;
34+
// See examples/reqwest_transport.rs for complete implementation
35+
use eventsource_client::{HttpTransport, ByteStream, TransportError};
2036

21-
let mut client = es::ClientBuilder::for_url("https://example.com/stream")?
22-
.header("Authorization", "Basic username:password")?
23-
.build();
37+
struct ReqwestTransport {
38+
client: reqwest::Client,
39+
}
2440

25-
client
26-
.stream()
27-
.map_ok(|event| println!("got event: {:?}", event))
28-
.map_err(|err| eprintln!("error streaming events: {:?}", err));
41+
impl HttpTransport for ReqwestTransport {
42+
fn request(&self, request: http::Request<()>)
43+
-> Pin<Box<dyn Future<Output = Result<http::Response<ByteStream>, TransportError>> + Send + Sync>>
44+
{
45+
// Convert request and call HTTP client
46+
// See examples/ for full implementation
47+
}
48+
}
2949
```
3050

31-
(Some boilerplate omitted for clarity; see [examples directory] for complete,
32-
working code.)
51+
### 3. Use the client
52+
53+
```rust
54+
use eventsource_client::{ClientBuilder, SSE};
55+
use futures::TryStreamExt;
56+
57+
#[tokio::main]
58+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
59+
// Create HTTP transport
60+
let transport = ReqwestTransport::new()?;
61+
62+
// Build SSE client
63+
let client = ClientBuilder::for_url("https://example.com/stream")?
64+
.header("Authorization", "Bearer token")?
65+
.build_with_transport(transport);
66+
67+
// Stream events
68+
let mut stream = client.stream();
69+
70+
while let Some(event) = stream.try_next().await? {
71+
match event {
72+
SSE::Event(evt) => println!("Event: {}", evt.event_type),
73+
SSE::Comment(c) => println!("Comment: {}", c),
74+
SSE::Connected(_) => println!("Connected!"),
75+
}
76+
}
77+
78+
Ok(())
79+
}
80+
```
3381

34-
[examples directory]: https://github.com/launchdarkly/rust-eventsource-client/tree/main/eventsource-client/examples
3582
## Features
3683

37-
* tokio-based streaming client.
38-
* Supports setting custom headers on the HTTP request (e.g. for endpoints
39-
requiring authorization).
40-
* Retry for failed connections.
41-
* Reconnection if connection is interrupted, with exponential backoff.
84+
* **Pluggable HTTP transport** - Use any HTTP client (hyper, reqwest, or custom)
85+
* **Tokio-based streaming** - Efficient async/await support
86+
* **Custom headers** - Full control over HTTP requests
87+
* **Automatic reconnection** - Configurable exponential backoff
88+
* **Retry logic** - Handle transient failures gracefully
89+
* **Redirect following** - Automatic handling of HTTP redirects
90+
* **Last-Event-ID** - Resume streams from last received event
91+
92+
## Migration from v0.16
93+
94+
If you're upgrading from v0.16 (which used hyper 0.14 internally), see [MIGRATION.md](MIGRATION.md) for a detailed migration guide.
95+
96+
Key changes:
97+
- You must now provide an HTTP transport implementation
98+
- Removed `build()`, `build_http()`, and other hyper-specific methods
99+
- Use `build_with_transport(transport)` instead
100+
- Timeout configuration moved to your HTTP transport
101+
102+
## Why Pluggable Transport?
103+
104+
1. **Use latest HTTP clients** - Not locked to a specific HTTP library version
105+
2. **Full control** - Configure timeouts, TLS, proxies, etc. exactly as needed
106+
3. **Smaller library** - Focused on SSE protocol, not HTTP implementation
107+
4. **Flexibility** - Swap HTTP clients without changing SSE code
108+
109+
## Architecture
110+
111+
```
112+
┌─────────────────────────────────────┐
113+
│ Your Application │
114+
└─────────────┬───────────────────────┘
115+
116+
117+
┌─────────────────────────────────────┐
118+
│ eventsource-client │
119+
│ (SSE Protocol Implementation) │
120+
└─────────────┬───────────────────────┘
121+
│ HttpTransport trait
122+
123+
┌─────────────────────────────────────┐
124+
│ Your HTTP Client │
125+
│ (hyper, reqwest, custom, etc.) │
126+
└─────────────────────────────────────┘
127+
```
42128

43129
## Stability
44130

0 commit comments

Comments
 (0)