Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/actions/ci/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ runs:
shell: bash
run: cargo test --all-features -p eventsource-client

- name: Run slower integration tests
shell: bash
run: cargo test --all-features -p eventsource-client --lib -- --ignored

- name: Run clippy checks
shell: bash
run: cargo clippy --all-features -p eventsource-client -- -D warnings
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ start-contract-test-service-bg:
@make start-contract-test-service >$(TEMP_TEST_OUTPUT) 2>&1 &

run-contract-tests:
@curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/v2.0.0/downloader/run.sh \
@curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/main/downloader/run.sh \
| VERSION=v2 PARAMS="-url http://localhost:8080 -debug -stop-service-at-end $(SKIPFLAGS) $(TEST_HARNESS_PARAMS)" sh

contract-tests: build-contract-tests start-contract-test-service-bg run-contract-tests
Expand Down
5 changes: 3 additions & 2 deletions contract-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ license = "Apache-2.0"
[dependencies]
futures = { version = "0.3.21" }
serde = { version = "1.0", features = ["derive"] }
eventsource-client = { path = "../eventsource-client" }
eventsource-client = { path = "../eventsource-client", features = ["hyper", "hyper-rustls"] }
serde_json = { version = "1.0.39"}
actix = { version = "0.13.1"}
actix-web = { version = "4"}
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] }
# reqwest is required for callback client (test harness communication)
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
env_logger = { version = "0.10.0" }
log = "0.4.6"
http = "1.0"
Expand Down
100 changes: 17 additions & 83 deletions contract-tests/src/bin/sse-test-api/stream_entity.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,16 @@
use actix_web::rt::task::JoinHandle;
use futures::{StreamExt, TryStreamExt};
use futures::TryStreamExt;
use log::error;
use std::{
sync::{Arc, Mutex},
time::Duration,
};

use eventsource_client as es;
use eventsource_client::{ByteStream, HttpTransport, ResponseFuture, TransportError};
use eventsource_client::HyperTransport;

use crate::{Config, EventType};

// Simple reqwest-based transport implementation
#[derive(Clone)]
struct ReqwestTransport {
client: reqwest::Client,
}

impl ReqwestTransport {
fn new(timeout: Option<Duration>) -> Result<Self, reqwest::Error> {
let mut builder = reqwest::Client::builder();

if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let client = builder.build()?;
Ok(Self { client })
}
}

impl HttpTransport for ReqwestTransport {
fn request(&self, request: http::Request<Option<String>>) -> ResponseFuture {
let (parts, body_opt) = request.into_parts();

let mut req_builder = self
.client
.request(parts.method.clone(), parts.uri.to_string());

for (name, value) in parts.headers.iter() {
req_builder = req_builder.header(name, value);
}

if let Some(body) = body_opt {
req_builder = req_builder.body(body);
}

let req = match req_builder.build() {
Ok(r) => r,
Err(e) => return Box::pin(async move { Err(TransportError::new(e)) }),
};

let client = self.client.clone();

Box::pin(async move {
let resp = client.execute(req).await.map_err(TransportError::new)?;

let status = resp.status();
let headers = resp.headers().clone();

let byte_stream: ByteStream = Box::pin(
resp.bytes_stream()
.map(|result| result.map_err(TransportError::new)),
);

let mut response_builder = http::Response::builder().status(status);

for (name, value) in headers.iter() {
response_builder = response_builder.header(name, value);
}

let response = response_builder
.body(byte_stream)
.map_err(TransportError::new)?;

Ok(response)
})
}
}

pub(crate) struct Inner {
callback_counter: Mutex<i32>,
callback_url: String,
Expand Down Expand Up @@ -116,7 +48,7 @@ impl Inner {
Ok(None) => break,
Err(e) => {
let failure = EventType::Error {
error: format!("Error: {:?}", e),
error: format!("Error: {e:?}"),
};

if !self.send_message(failure, &client).await {
Expand All @@ -131,7 +63,7 @@ impl Inner {
let json = match serde_json::to_string(&event_type) {
Ok(s) => s,
Err(e) => {
error!("Failed to json encode event type {:?}", e);
error!("Failed to json encode event type {e:?}");
return false;
}
};
Expand All @@ -142,7 +74,7 @@ impl Inner {

match client
.post(format!("{}/{}", self.callback_url, counter_val))
.body(format!("{}\n", json))
.body(format!("{json}\n"))
.send()
.await
{
Expand All @@ -151,7 +83,7 @@ impl Inner {
*counter = counter_val + 1
}
Err(e) => {
error!("Failed to send post back to test harness {:?}", e);
error!("Failed to send post back to test harness {e:?}");
return false;
}
};
Expand All @@ -162,7 +94,7 @@ impl Inner {
fn build_client(config: &Config) -> Result<Box<dyn es::Client>, String> {
let mut client_builder = match es::ClientBuilder::for_url(&config.stream_url) {
Ok(cb) => cb,
Err(e) => return Err(format!("Failed to create client builder {:?}", e)),
Err(e) => return Err(format!("Failed to create client builder {e:?}")),
};

let mut reconnect_options = es::ReconnectOptions::reconnect(true);
Expand All @@ -171,13 +103,6 @@ impl Inner {
reconnect_options = reconnect_options.delay(Duration::from_millis(delay_ms));
}

// Create transport with timeout configuration
let timeout = config.read_timeout_ms.map(Duration::from_millis);
let transport = match ReqwestTransport::new(timeout) {
Ok(t) => t,
Err(e) => return Err(format!("Failed to create transport {:?}", e)),
};

if let Some(last_event_id) = &config.last_event_id {
client_builder = client_builder.last_event_id(last_event_id.clone());
}
Expand All @@ -194,11 +119,20 @@ impl Inner {
for (name, value) in headers {
client_builder = match client_builder.header(name, value) {
Ok(cb) => cb,
Err(e) => return Err(format!("Unable to set header {:?}", e)),
Err(e) => return Err(format!("Unable to set header {e:?}")),
};
}
}

// Build with HyperTransport
let mut transport_builder = HyperTransport::builder();

if let Some(timeout_ms) = config.read_timeout_ms {
transport_builder = transport_builder.read_timeout(Duration::from_millis(timeout_ms));
}

let transport = transport_builder.build_https();

Ok(Box::new(
client_builder
.reconnect(reconnect_options.build())
Expand Down
16 changes: 13 additions & 3 deletions eventsource-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ tokio = { version = "1.17.0", features = ["time"] }
rand = "0.8.5"
base64 = "0.22.1"

#
# Dependencies for hyper transport
#
hyper = { version = "1.0", features = ["client", "http1", "http2"], optional = true }
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "tokio"], optional = true }
http-body-util = { version = "0.1", optional = true }
hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "webpki-roots"], optional = true }
hyper-timeout = { version = "0.5", optional = true }
tower = { version = "0.4", optional = true }

[dev-dependencies]
env_logger = "0.10.0"
maplit = "1.0.1"
Expand All @@ -29,7 +39,7 @@ tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] }
test-case = "3.2.1"
proptest = "1.0.0"



[features]
default = []
default = ["hyper"]
hyper = ["dep:hyper", "dep:hyper-util", "dep:http-body-util", "dep:hyper-timeout", "dep:tower"]
hyper-rustls = ["dep:hyper-rustls"]
Loading