Skip to content

Commit bf19d43

Browse files
authored
feat: Implement Hyper specific HttpTransport (#105)
1 parent ec12251 commit bf19d43

File tree

12 files changed

+934
-145
lines changed

12 files changed

+934
-145
lines changed

.github/actions/ci/action.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ runs:
1212
shell: bash
1313
run: cargo test --all-features -p eventsource-client
1414

15+
- name: Run slower integration tests
16+
shell: bash
17+
run: cargo test --all-features -p eventsource-client --lib -- --ignored
18+
1519
- name: Run clippy checks
1620
shell: bash
1721
run: cargo clippy --all-features -p eventsource-client -- -D warnings

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ start-contract-test-service-bg:
1111
@make start-contract-test-service >$(TEMP_TEST_OUTPUT) 2>&1 &
1212

1313
run-contract-tests:
14-
@curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/v2.0.0/downloader/run.sh \
14+
@curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/main/downloader/run.sh \
1515
| VERSION=v2 PARAMS="-url http://localhost:8080 -debug -stop-service-at-end $(SKIPFLAGS) $(TEST_HARNESS_PARAMS)" sh
1616

1717
contract-tests: build-contract-tests start-contract-test-service-bg run-contract-tests

contract-tests/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ license = "Apache-2.0"
77
[dependencies]
88
futures = { version = "0.3.21" }
99
serde = { version = "1.0", features = ["derive"] }
10-
eventsource-client = { path = "../eventsource-client" }
10+
eventsource-client = { path = "../eventsource-client", features = ["hyper", "hyper-rustls"] }
1111
serde_json = { version = "1.0.39"}
1212
actix = { version = "0.13.1"}
1313
actix-web = { version = "4"}
14-
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] }
14+
# reqwest is required for callback client (test harness communication)
15+
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
1516
env_logger = { version = "0.10.0" }
1617
log = "0.4.6"
1718
http = "1.0"

contract-tests/src/bin/sse-test-api/stream_entity.rs

Lines changed: 17 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,16 @@
11
use actix_web::rt::task::JoinHandle;
2-
use futures::{StreamExt, TryStreamExt};
2+
use futures::TryStreamExt;
33
use log::error;
44
use std::{
55
sync::{Arc, Mutex},
66
time::Duration,
77
};
88

99
use eventsource_client as es;
10-
use eventsource_client::{ByteStream, HttpTransport, ResponseFuture, TransportError};
10+
use eventsource_client::HyperTransport;
1111

1212
use crate::{Config, EventType};
1313

14-
// Simple reqwest-based transport implementation
15-
#[derive(Clone)]
16-
struct ReqwestTransport {
17-
client: reqwest::Client,
18-
}
19-
20-
impl ReqwestTransport {
21-
fn new(timeout: Option<Duration>) -> Result<Self, reqwest::Error> {
22-
let mut builder = reqwest::Client::builder();
23-
24-
if let Some(timeout) = timeout {
25-
builder = builder.timeout(timeout);
26-
}
27-
28-
let client = builder.build()?;
29-
Ok(Self { client })
30-
}
31-
}
32-
33-
impl HttpTransport for ReqwestTransport {
34-
fn request(&self, request: http::Request<Option<String>>) -> ResponseFuture {
35-
let (parts, body_opt) = request.into_parts();
36-
37-
let mut req_builder = self
38-
.client
39-
.request(parts.method.clone(), parts.uri.to_string());
40-
41-
for (name, value) in parts.headers.iter() {
42-
req_builder = req_builder.header(name, value);
43-
}
44-
45-
if let Some(body) = body_opt {
46-
req_builder = req_builder.body(body);
47-
}
48-
49-
let req = match req_builder.build() {
50-
Ok(r) => r,
51-
Err(e) => return Box::pin(async move { Err(TransportError::new(e)) }),
52-
};
53-
54-
let client = self.client.clone();
55-
56-
Box::pin(async move {
57-
let resp = client.execute(req).await.map_err(TransportError::new)?;
58-
59-
let status = resp.status();
60-
let headers = resp.headers().clone();
61-
62-
let byte_stream: ByteStream = Box::pin(
63-
resp.bytes_stream()
64-
.map(|result| result.map_err(TransportError::new)),
65-
);
66-
67-
let mut response_builder = http::Response::builder().status(status);
68-
69-
for (name, value) in headers.iter() {
70-
response_builder = response_builder.header(name, value);
71-
}
72-
73-
let response = response_builder
74-
.body(byte_stream)
75-
.map_err(TransportError::new)?;
76-
77-
Ok(response)
78-
})
79-
}
80-
}
81-
8214
pub(crate) struct Inner {
8315
callback_counter: Mutex<i32>,
8416
callback_url: String,
@@ -116,7 +48,7 @@ impl Inner {
11648
Ok(None) => break,
11749
Err(e) => {
11850
let failure = EventType::Error {
119-
error: format!("Error: {:?}", e),
51+
error: format!("Error: {e:?}"),
12052
};
12153

12254
if !self.send_message(failure, &client).await {
@@ -131,7 +63,7 @@ impl Inner {
13163
let json = match serde_json::to_string(&event_type) {
13264
Ok(s) => s,
13365
Err(e) => {
134-
error!("Failed to json encode event type {:?}", e);
66+
error!("Failed to json encode event type {e:?}");
13567
return false;
13668
}
13769
};
@@ -142,7 +74,7 @@ impl Inner {
14274

14375
match client
14476
.post(format!("{}/{}", self.callback_url, counter_val))
145-
.body(format!("{}\n", json))
77+
.body(format!("{json}\n"))
14678
.send()
14779
.await
14880
{
@@ -151,7 +83,7 @@ impl Inner {
15183
*counter = counter_val + 1
15284
}
15385
Err(e) => {
154-
error!("Failed to send post back to test harness {:?}", e);
86+
error!("Failed to send post back to test harness {e:?}");
15587
return false;
15688
}
15789
};
@@ -162,7 +94,7 @@ impl Inner {
16294
fn build_client(config: &Config) -> Result<Box<dyn es::Client>, String> {
16395
let mut client_builder = match es::ClientBuilder::for_url(&config.stream_url) {
16496
Ok(cb) => cb,
165-
Err(e) => return Err(format!("Failed to create client builder {:?}", e)),
97+
Err(e) => return Err(format!("Failed to create client builder {e:?}")),
16698
};
16799

168100
let mut reconnect_options = es::ReconnectOptions::reconnect(true);
@@ -171,13 +103,6 @@ impl Inner {
171103
reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms));
172104
}
173105

174-
// Create transport with timeout configuration
175-
let timeout = config.read_timeout_ms.map(Duration::from_millis);
176-
let transport = match ReqwestTransport::new(timeout) {
177-
Ok(t) => t,
178-
Err(e) => return Err(format!("Failed to create transport {:?}", e)),
179-
};
180-
181106
if let Some(last_event_id) = &config.last_event_id {
182107
client_builder = client_builder.last_event_id(last_event_id.clone());
183108
}
@@ -194,11 +119,20 @@ impl Inner {
194119
for (name, value) in headers {
195120
client_builder = match client_builder.header(name, value) {
196121
Ok(cb) => cb,
197-
Err(e) => return Err(format!("Unable to set header {:?}", e)),
122+
Err(e) => return Err(format!("Unable to set header {e:?}")),
198123
};
199124
}
200125
}
201126

127+
// Build with HyperTransport
128+
let mut transport_builder = HyperTransport::builder();
129+
130+
if let Some(timeout_ms) = config.read_timeout_ms {
131+
transport_builder = transport_builder.read_timeout(Duration::from_millis(timeout_ms));
132+
}
133+
134+
let transport = transport_builder.build_https();
135+
202136
Ok(Box::new(
203137
client_builder
204138
.reconnect(reconnect_options.build())

eventsource-client/Cargo.toml

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ tokio = { version = "1.17.0", features = ["time"] }
2020
rand = "0.8.5"
2121
base64 = "0.22.1"
2222

23+
#
24+
# Dependencies for hyper transport
25+
#
26+
hyper = { version = "1.0", features = ["client", "http1", "http2"], optional = true }
27+
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "tokio"], optional = true }
28+
http-body-util = { version = "0.1", optional = true }
29+
hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "webpki-roots"], optional = true }
30+
hyper-timeout = { version = "0.5", optional = true }
31+
tower = { version = "0.4", optional = true }
32+
2333
[dev-dependencies]
2434
env_logger = "0.10.0"
2535
maplit = "1.0.1"
@@ -29,7 +39,7 @@ tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] }
2939
test-case = "3.2.1"
3040
proptest = "1.0.0"
3141

32-
33-
3442
[features]
35-
default = []
43+
default = ["hyper"]
44+
hyper = ["dep:hyper", "dep:hyper-util", "dep:http-body-util", "dep:hyper-timeout", "dep:tower"]
45+
hyper-rustls = ["dep:hyper-rustls"]

0 commit comments

Comments
 (0)