From b128a9758c5e143e2141db1aee6f74e9b1795ad7 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 20 Apr 2026 13:07:36 +0200 Subject: [PATCH 1/3] [bitreq]: Rename `Client` capacity to `max_idle` and document its semantics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous `capacity` parameter name suggested a hard cap on the number of connections a `Client` may keep open, but it never was one: the cache only bounds *idle* entries that have already been put back. Under any burst of concurrent requests, the cache is bypassed entirely — each in-flight request that does not find a cached connection opens a fresh socket. Only when those requests complete and try to return their connection to the cache does the limit kick in, and surplus entries are simply dropped. This means the cache is inherently racy with respect to capacity: we do not (and this commit does not attempt to) gate concurrent connection establishment, so the number of live sockets is unbounded regardless of what the user passes. Rename the parameter to `max_idle` and spell out this nuance in the type-level docs, so that callers understand the bound applies to the idle cache — not to overall concurrency — and can layer their own semaphore if they need to cap live connections. Co-Authored-By: HAL 9000 --- bitreq/src/client.rs | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/bitreq/src/client.rs b/bitreq/src/client.rs index b5de6f2fb..204d80b84 100644 --- a/bitreq/src/client.rs +++ b/bitreq/src/client.rs @@ -15,8 +15,17 @@ use crate::{Error, Request, Response}; /// A client that caches connections for reuse. /// -/// The client maintains a pool of up to `capacity` connections, evicting -/// the least recently used connection when the cache is full. +/// The client maintains a cache of up to `max_idle` connections, evicting the least +/// recently used entry when the cache is full. +/// +/// # Bound applies to cached entries, not live connections +/// +/// `max_idle` bounds the number of connections held in the cache, not the number of +/// connections the client may have open at any one time. Concurrent requests whose +/// cached connection is absent (or currently checked out for another in-flight +/// request) each open a fresh socket; any surplus streams simply fail to re-enter +/// the cache on put-back once it is full. To bound concurrency — rather than idle +/// reuse — a caller must arrange a separate semaphore on top. /// /// # Example /// @@ -24,7 +33,7 @@ use crate::{Error, Request, Response}; /// # async fn request() { /// use bitreq::{Client, RequestExt}; /// -/// let client = Client::new(10); // Cache up to 10 connections +/// let client = Client::new(10); // Cache up to 10 idle connections /// let response = bitreq::get("https://example.com") /// .send_async_with_client(&client) /// .await; @@ -38,22 +47,23 @@ pub struct Client { struct ClientImpl { connections: HashMap>, lru_order: VecDeque, - capacity: usize, + max_idle: usize, } impl Client { - /// Creates a new `Client` with the specified connection cache capacity. + /// Creates a new `Client` with the specified idle-cache size. /// /// # Arguments /// - /// * `capacity` - Maximum number of cached connections. When this limit is - /// reached, the least recently used connection is evicted. - pub fn new(capacity: usize) -> Self { + /// * `max_idle` - Maximum number of cached idle connections. When this limit is + /// reached, the least recently used connection is evicted. See the [type-level + /// docs](Client) for why this does not bound the number of live connections. + pub fn new(max_idle: usize) -> Self { Client { r#async: Arc::new(Mutex::new(ClientImpl { connections: HashMap::new(), lru_order: VecDeque::new(), - capacity, + max_idle, })), } } @@ -84,7 +94,7 @@ impl Client { if let hash_map::Entry::Vacant(entry) = state.connections.entry(owned_key) { entry.insert(Arc::clone(&connection)); state.lru_order.push_back(key.into()); - if state.connections.len() > state.capacity { + if state.connections.len() > state.max_idle { if let Some(oldest_key) = state.lru_order.pop_front() { state.connections.remove(&oldest_key); } From c179d9b83e2ca63016995e6f0e3a6241c3d602fc Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 20 Apr 2026 12:22:54 +0200 Subject: [PATCH 2/3] [bitreq]: Add blocking `Client` with connection pooling Previously, we added a `Client` allowing for connection reuse behind the `async` feature. Here we do the same for the blocking, i.e., non-`async` path. The client pools are exposed as additive capabilities: the blocking pool is always available when `std` is, and the async pool is available when the `async` feature is enabled. Unlike the async path, the blocking pool keys the cache by `(https, host, port, proxy)` and stores the raw keep-alive stream alongside an expiry `Instant` parsed from the server's `Keep-Alive: timeout=N` header (defaulting to 60s). Stream reuse is gated on both `Connection: keep-alive` and the `BufReader` having no trailing bytes, to avoid corrupting the next response on the socket. Redirect handling (301/302/303/307, including 303 method downgrade) is driven by `Connection::send_pooled`, which consults the provided `&Client` for a pooled connection on each hop rather than duplicating the redirect loop in `client.rs`. Keep-alive header parsing lives in `connection.rs` so that "connection lifetime" concerns stay in one module. Additional divergence between the sync and async paths (pipelining, per-request IDs, dropped-reader tracking in `AsyncConnection`) intentionally remains out of scope. Co-Authored-By: HAL 9000 --- bitreq/src/client.rs | 187 ++++++++++++++++++++++++------- bitreq/src/connection.rs | 235 +++++++++++++++++++++++++++++---------- bitreq/src/lib.rs | 7 +- bitreq/src/response.rs | 91 +++++++++++---- 4 files changed, 400 insertions(+), 120 deletions(-) diff --git a/bitreq/src/client.rs b/bitreq/src/client.rs index 204d80b84..402280383 100644 --- a/bitreq/src/client.rs +++ b/bitreq/src/client.rs @@ -1,22 +1,29 @@ -//! Connection pooling client for HTTP requests. +//! Connection pooling [`Client`] for HTTP requests. //! -//! The `Client` caches connections to avoid repeated TCP handshakes and TLS negotiations. +//! The [`Client`] caches connections to avoid repeated TCP handshakes and TLS negotiations. //! -//! Due to std limitations, `Client` currently only supports async requests. - -#![cfg(feature = "async")] +//! A blocking connection pool is always available. When the `async` feature is enabled, an +//! additional async connection pool is exposed via [`Client::send_async`] and +//! [`RequestExt::send_async_with_client`]. Both pools share a single idle-cache budget +//! governed by a unified LRU. use std::collections::{hash_map, HashMap, VecDeque}; use std::sync::{Arc, Mutex}; +use std::time::Instant; +#[cfg(feature = "async")] use crate::connection::AsyncConnection; +use crate::connection::{Connection, HttpStream}; use crate::request::{OwnedConnectionParams as ConnectionKey, ParsedRequest}; use crate::{Error, Request, Response}; /// A client that caches connections for reuse. /// -/// The client maintains a cache of up to `max_idle` connections, evicting the least -/// recently used entry when the cache is full. +/// The client maintains a cache of up to `max_idle` total connections — shared across +/// the blocking and (when enabled) async paths — evicting the least recently used +/// entry when the cache is full. A cached blocking connection is reused when the +/// server indicated `Connection: keep-alive` and the keep-alive timeout has not yet +/// expired. /// /// # Bound applies to cached entries, not live connections /// @@ -27,90 +34,189 @@ use crate::{Error, Request, Response}; /// the cache on put-back once it is full. To bound concurrency — rather than idle /// reuse — a caller must arrange a separate semaphore on top. /// -/// # Example +/// # Examples /// +/// Blocking: /// ```no_run -/// # async fn request() { +/// # fn main() -> Result<(), bitreq::Error> { /// use bitreq::{Client, RequestExt}; /// /// let client = Client::new(10); // Cache up to 10 idle connections +/// let response = bitreq::get("http://example.com").send_with_client(&client)?; +/// # Ok(()) } +/// ``` +/// +/// Async (requires the `async` feature): +#[cfg_attr(feature = "async", doc = "```no_run")] +#[cfg_attr(not(feature = "async"), doc = "```ignore")] +/// # async fn request() -> Result<(), bitreq::Error> { +/// use bitreq::{Client, RequestExt}; +/// +/// let client = Client::new(10); /// let response = bitreq::get("https://example.com") /// .send_async_with_client(&client) -/// .await; -/// # } +/// .await?; +/// # Ok(()) } /// ``` #[derive(Clone)] pub struct Client { - r#async: Arc>>, + state: Arc>, } -struct ClientImpl { - connections: HashMap>, - lru_order: VecDeque, +struct ClientState { + blocking_connections: HashMap, + #[cfg(feature = "async")] + async_connections: HashMap>, + /// Unified LRU across both pools. The oldest entry is at the front. + lru_order: VecDeque, max_idle: usize, } +#[derive(Clone, PartialEq, Eq)] +enum LruKey { + Blocking(ConnectionKey), + #[cfg(feature = "async")] + Async(ConnectionKey), +} + +pub(crate) struct PoolEntry { + pub(crate) stream: HttpStream, + pub(crate) expires_at: Instant, +} + impl Client { - /// Creates a new `Client` with the specified idle-cache size. + /// Creates a new `Client` with the specified total idle-cache size. /// - /// # Arguments - /// - /// * `max_idle` - Maximum number of cached idle connections. When this limit is - /// reached, the least recently used connection is evicted. See the [type-level - /// docs](Client) for why this does not bound the number of live connections. + /// The cache is shared across the blocking and (when enabled) async paths. When + /// the total number of cached connections exceeds `max_idle`, the least recently + /// used entry is evicted regardless of which pool it lives in. See the + /// [type-level docs](Client) for why this does not bound the number of live + /// connections. pub fn new(max_idle: usize) -> Self { Client { - r#async: Arc::new(Mutex::new(ClientImpl { - connections: HashMap::new(), + state: Arc::new(Mutex::new(ClientState { + blocking_connections: HashMap::new(), + #[cfg(feature = "async")] + async_connections: HashMap::new(), lru_order: VecDeque::new(), max_idle, })), } } + /// Sends a request using a cached connection if available. + pub fn send(&self, request: Request) -> Result { + let parsed = ParsedRequest::new(request)?; + let key: ConnectionKey = parsed.connection_params().into(); + let connection = match self.take_connection(&key) { + Some(conn) => conn, + None => Connection::new(parsed.connection_params(), parsed.timeout_at)?, + }; + connection.send_pooled(self, parsed) + } + + /// Takes a pooled [`Connection`] for `key`, if one exists and has not expired. + pub(crate) fn take_connection(&self, key: &ConnectionKey) -> Option { + let mut state = self.state.lock().unwrap(); + let entry = state.blocking_connections.remove(key)?; + let lru_key = LruKey::Blocking(key.clone()); + if let Some(pos) = state.lru_order.iter().position(|k| k == &lru_key) { + state.lru_order.remove(pos); + } + if entry.expires_at > Instant::now() { + Some(Connection::from_stream(entry.stream)) + } else { + None + } + } + + /// Puts a stream back into the pool under `key`, with the given expiry. + pub(crate) fn put_stream(&self, key: ConnectionKey, stream: HttpStream, expires_at: Instant) { + let mut state = self.state.lock().unwrap(); + if let hash_map::Entry::Vacant(entry) = state.blocking_connections.entry(key.clone()) { + entry.insert(PoolEntry { stream, expires_at }); + state.lru_order.push_back(LruKey::Blocking(key)); + state.evict_if_over_capacity(); + } + } + /// Sends a request asynchronously using a cached connection if available. + #[cfg(feature = "async")] pub async fn send_async(&self, request: Request) -> Result { let parsed_request = ParsedRequest::new(request)?; let key = parsed_request.connection_params(); - let owned_key = key.into(); + let owned_key: ConnectionKey = key.into(); - // Try to get cached connection let conn_opt = { - let state = self.r#async.lock().unwrap(); - - if let Some(conn) = state.connections.get(&owned_key) { - Some(Arc::clone(conn)) + let mut state = self.state.lock().unwrap(); + if let Some(conn) = state.async_connections.get(&owned_key) { + let conn = Arc::clone(conn); + // Refresh LRU position so this hit is treated as the most recent use. + let lru_key = LruKey::Async(owned_key.clone()); + if let Some(pos) = state.lru_order.iter().position(|k| k == &lru_key) { + state.lru_order.remove(pos); + state.lru_order.push_back(lru_key); + } + Some(conn) } else { None } }; + let conn = if let Some(conn) = conn_opt { conn } else { let connection = AsyncConnection::new(key, parsed_request.timeout_at).await?; let connection = Arc::new(connection); - let mut state = self.r#async.lock().unwrap(); - if let hash_map::Entry::Vacant(entry) = state.connections.entry(owned_key) { + let mut state = self.state.lock().unwrap(); + if let hash_map::Entry::Vacant(entry) = state.async_connections.entry(owned_key.clone()) + { entry.insert(Arc::clone(&connection)); - state.lru_order.push_back(key.into()); - if state.connections.len() > state.max_idle { - if let Some(oldest_key) = state.lru_order.pop_front() { - state.connections.remove(&oldest_key); - } - } + state.lru_order.push_back(LruKey::Async(owned_key)); + state.evict_if_over_capacity(); } connection }; - // Send the request conn.send(parsed_request).await } } -/// Extension trait for `Request` to use with `Client`. +impl ClientState { + fn total_len(&self) -> usize { + let total = self.blocking_connections.len(); + #[cfg(feature = "async")] + let total = total + self.async_connections.len(); + total + } + + fn evict_if_over_capacity(&mut self) { + while self.total_len() > self.max_idle { + let oldest = match self.lru_order.pop_front() { + Some(k) => k, + None => return, + }; + match oldest { + LruKey::Blocking(k) => { + self.blocking_connections.remove(&k); + } + #[cfg(feature = "async")] + LruKey::Async(k) => { + self.async_connections.remove(&k); + } + } + } + } +} + +/// Extension trait for [`Request`] to use with [`Client`]. pub trait RequestExt { + /// Sends this request using the provided client's connection pool. + fn send_with_client(self, client: &Client) -> Result; + /// Sends this request asynchronously using the provided client's connection pool. + #[cfg(feature = "async")] fn send_async_with_client( self, client: &Client, @@ -118,6 +224,9 @@ pub trait RequestExt { } impl RequestExt for Request { + fn send_with_client(self, client: &Client) -> Result { client.send(self) } + + #[cfg(feature = "async")] fn send_async_with_client( self, client: &Client, diff --git a/bitreq/src/connection.rs b/bitreq/src/connection.rs index f8b98c133..08e3bd80c 100644 --- a/bitreq/src/connection.rs +++ b/bitreq/src/connection.rs @@ -1,3 +1,4 @@ +use alloc::collections::BTreeMap; use core::time::Duration; #[cfg(feature = "async")] use std::future::Future; @@ -23,9 +24,7 @@ use tokio::net::TcpStream as AsyncTcpStream; use tokio::sync::Mutex as AsyncMutex; use crate::request::{ConnectionParams, OwnedConnectionParams, ParsedRequest}; -#[cfg(feature = "async")] -use crate::Response; -use crate::{Error, Method, ResponseLazy}; +use crate::{Error, Method, Response, ResponseLazy}; type UnsecuredStream = TcpStream; @@ -51,6 +50,17 @@ impl HttpStream { pub(crate) fn create_buffer(buffer: Vec) -> HttpStream { HttpStream::Buffer(std::io::Cursor::new(buffer)) } + + /// Updates the timeout deadline used for read/write operations on this stream. + pub(crate) fn set_timeout_at(&mut self, timeout_at: Option) { + match self { + HttpStream::Unsecured(_, t) => *t = timeout_at, + #[cfg(feature = "rustls")] + HttpStream::Secured(_, t) => *t = timeout_at, + #[cfg(feature = "async")] + HttpStream::Buffer(_) => {} + } + } } fn timeout_err() -> io::Error { @@ -569,43 +579,18 @@ impl AsyncConnection { conn.readable_request_id.fetch_add(1, Ordering::Release); } - if let Some(header) = response.headers.get("keep-alive") { - for param in header.split(',') { - if let Some((k, v)) = param.trim().split_once('=') { - if let Ok(v) = v.parse::() { - match k.trim() { - "timeout" => { - let timeout_secs = (v as u64).saturating_sub(1); - *conn.socket_new_requests_timeout.lock().unwrap() = - Instant::now() - .checked_add(Duration::from_secs(timeout_secs)) - .unwrap_or(Instant::now()); - } - "max" => { - conn.next_request_id.fetch_max( - usize::MAX.saturating_sub(v), - Ordering::AcqRel, - ); - } - _ => { - // If we can't parse the keep-alive header, don't send any - // new requests over this socket, but don't give up on - // reading pending responses. - conn.next_request_id.store(usize::MAX, Ordering::Release); - } - } - } else { - // If we can't parse the keep-alive header, don't send any new - // requests over this socket, but don't give up on reading pending - // responses. - conn.next_request_id.store(usize::MAX, Ordering::Release); - } - } else { - // If we can't parse the keep-alive header, don't send any new requests - // over this socket, but don't give up on reading pending responses. - conn.next_request_id.store(usize::MAX, Ordering::Release); - } - } + let ka = parse_keep_alive(&response.headers); + if let Some(timeout) = ka.timeout { + *conn.socket_new_requests_timeout.lock().unwrap() = timeout; + } + if let Some(max) = ka.max_requests { + conn.next_request_id + .fetch_max(usize::MAX.saturating_sub(max), Ordering::AcqRel); + } + if ka.malformed { + // If we can't parse the keep-alive header, don't send any new requests + // over this socket, but don't give up on reading pending responses. + conn.next_request_id.store(usize::MAX, Ordering::Release); } // Now that we've processed the response, if the future is cancelled there's no @@ -748,27 +733,165 @@ impl Connection { Self::tcp_connect(params.host, params.port, timeout_at) } + /// Creates a `Connection` from an existing [`HttpStream`]. + /// + /// Used by [`Client`](crate::Client) to wrap a pooled stream for reuse. + pub(crate) fn from_stream(stream: HttpStream) -> Connection { Connection { stream } } + + /// Writes the request and reads the response metadata, consuming this connection. + /// + /// The returned [`ResponseLazy`] owns the underlying stream and iterates over the body + /// on demand. Shared by [`Connection::send`] and [`Connection::send_pooled`]. + fn write_and_read_lazy(mut self, request: &ParsedRequest) -> Result { + self.stream.set_timeout_at(request.timeout_at); + + #[cfg(feature = "log")] + log::trace!("Writing HTTP request."); + self.stream.write_all(&request.as_bytes())?; + + #[cfg(feature = "log")] + log::trace!("Reading HTTP response."); + ResponseLazy::from_stream( + self.stream, + request.config.max_headers_size, + request.config.max_status_line_len, + request.config.max_body_size, + ) + } + /// Sends the [`Request`](struct.Request.html), consumes this - /// connection, and returns a [`Response`](struct.Response.html). - pub(crate) fn send(mut self, request: ParsedRequest) -> Result { + /// connection, and returns a [`ResponseLazy`](struct.ResponseLazy.html). + pub(crate) fn send(self, request: ParsedRequest) -> Result { enforce_timeout(request.timeout_at, move || { - // Send request - #[cfg(feature = "log")] - log::trace!("Writing HTTP request."); - self.stream.write_all(&request.as_bytes())?; - - // Receive response - #[cfg(feature = "log")] - log::trace!("Reading HTTP response."); - let response = ResponseLazy::from_stream( - self.stream, - request.config.max_headers_size, - request.config.max_status_line_len, - request.config.max_body_size, - )?; + let response = self.write_and_read_lazy(&request)?; handle_redirects(request, response) }) } + + /// Sends the [`Request`](struct.Request.html) using `client`'s connection pool, + /// following redirects and returning a buffered [`Response`]. + /// + /// The underlying stream is returned to the client's pool on keep-alive. On + /// redirect, a pooled connection to the new host is reused if available; + /// otherwise a fresh connection is created. + pub(crate) fn send_pooled( + self, + client: &crate::Client, + mut request: ParsedRequest, + ) -> Result { + let mut connection = self; + loop { + let (response, recovered, req) = connection.send_and_buffer(request)?; + request = req; + + if let Some(stream) = recovered { + let key: OwnedConnectionParams = request.connection_params().into(); + // If the server returned a malformed `Keep-Alive:` header, don't + // reuse the connection — match the async path's policy. + let ka = parse_keep_alive(&response.headers); + if !ka.malformed { + let expires_at = + ka.timeout.unwrap_or_else(|| Instant::now() + Duration::from_secs(60)); + client.put_stream(key, stream, expires_at); + } + } + + let status_code = response.status_code; + match get_redirect(request, status_code, response.headers.get("location")) { + NextHop::Redirect(result) => { + let (next_request, _needs_new_conn) = result?; + request = next_request; + let key: OwnedConnectionParams = request.connection_params().into(); + connection = match client.take_connection(&key) { + Some(c) => c, + None => Connection::new(request.connection_params(), request.timeout_at)?, + }; + } + NextHop::Destination(_) => return Ok(response), + } + } + } + + /// Writes the request, reads and drains the response body, and recovers the + /// underlying stream when the server indicated `Connection: keep-alive` and the + /// buffered reader has no trailing bytes. + fn send_and_buffer( + self, + request: ParsedRequest, + ) -> Result<(Response, Option, ParsedRequest), Error> { + enforce_timeout(request.timeout_at, move || { + let is_head = request.config.method == Method::Head; + let max_body_size = request.config.max_body_size; + + let mut response_lazy = self.write_and_read_lazy(&request)?; + request.url.write_base_url_to(&mut response_lazy.url).unwrap(); + request.url.write_resource_to(&mut response_lazy.url).unwrap(); + + let keep_alive = response_lazy + .headers + .get("connection") + .is_some_and(|h| h.eq_ignore_ascii_case("keep-alive")); + let status_code = response_lazy.status_code; + // We only read the body for responses that carry one. For HEAD/204/304 we skip + // the body entirely, so we cannot know whether the socket itself is clean — a + // non-compliant server could have sent bytes we never drained. Don't pool the + // stream in that case. + let body_was_drained = !is_head && status_code != 204 && status_code != 304; + let (response, buf_reader) = response_lazy.drain_with_stream(is_head, max_body_size)?; + // Additionally require that the `BufReader` has no prefetched bytes so the next + // response read from this connection starts on a clean byte boundary. + let recovered = if keep_alive && body_was_drained && buf_reader.buffer().is_empty() { + Some(buf_reader.into_inner()) + } else { + None + }; + Ok((response, recovered, request)) + }) + } +} + +/// Parsed values from a `Keep-Alive:` response header. +#[derive(Default)] +pub(crate) struct KeepAlive { + /// The absolute [`Instant`] corresponding to the header's `timeout=N` value, if + /// present and valid. `None` means the header was absent or had no `timeout=N`. + pub(crate) timeout: Option, + /// The `max=N` value, if present and valid. + pub(crate) max_requests: Option, + /// `true` if the header was present but contained a parameter we could not parse + /// (missing `=`, unparseable number, or unknown key). Callers should treat the + /// connection as not-reusable for new requests in this case. + pub(crate) malformed: bool, +} + +/// Parses the `Keep-Alive:` response header. Shared between the blocking and async +/// paths so that both agree on the meaning of `timeout=N`, `max=N`, and malformed +/// parameters. +pub(crate) fn parse_keep_alive(headers: &BTreeMap) -> KeepAlive { + let mut result = KeepAlive::default(); + let header = match headers.get("keep-alive") { + Some(h) => h, + None => return result, + }; + for param in header.split(',') { + let Some((k, v)) = param.trim().split_once('=') else { + result.malformed = true; + continue; + }; + match (k.trim(), v.parse::()) { + ("timeout", Ok(secs)) => { + let timeout_secs = (secs as u64).saturating_sub(1); + result.timeout = Instant::now().checked_add(Duration::from_secs(timeout_secs)); + } + ("max", Ok(n)) => { + result.max_requests = Some(n); + } + _ => { + result.malformed = true; + } + } + } + result } fn handle_redirects( diff --git a/bitreq/src/lib.rs b/bitreq/src/lib.rs index 1f214dfa3..5537959a3 100644 --- a/bitreq/src/lib.rs +++ b/bitreq/src/lib.rs @@ -58,8 +58,9 @@ //! [`send_lazy_async()`](struct.Request.html#method.send_lazy_async) methods //! that return futures for non-blocking operation. //! -//! It also enables [`Client`](struct.Client.html) to reuse TCP connections -//! across requests. +//! When this feature is enabled, [`Client`](struct.Client.html) also exposes an +//! async connection pool via [`send_async_with_client`](trait.RequestExt.html#method.send_async_with_client) +//! alongside its always-available blocking pool. //! //! ## `async-https` or `async-https-rustls` //! @@ -263,7 +264,7 @@ mod request; mod response; mod url; -#[cfg(feature = "async")] +#[cfg(feature = "std")] pub use client::{Client, RequestExt}; pub use error::*; #[cfg(feature = "proxy")] diff --git a/bitreq/src/response.rs b/bitreq/src/response.rs index b234de107..5134aff9d 100644 --- a/bitreq/src/response.rs +++ b/bitreq/src/response.rs @@ -3,7 +3,7 @@ use core::str; #[cfg(feature = "async")] use std::future::Future; #[cfg(feature = "std")] -use std::io::{self, BufReader, Bytes, Read}; +use std::io::{self, BufReader, Read}; #[cfg(feature = "async")] use tokio::io::{AsyncRead, AsyncReadExt}; @@ -53,25 +53,12 @@ pub struct Response { impl Response { #[cfg(feature = "std")] pub(crate) fn create( - mut parent: ResponseLazy, + parent: ResponseLazy, is_head: bool, max_body_size: Option, ) -> Result { - let mut body = Vec::new(); - if !is_head && parent.status_code != 204 && parent.status_code != 304 { - for byte in &mut parent { - let (byte, length) = byte?; - if max_body_size.is_some_and(|max| body.len().saturating_add(length) > max) { - return Err(Error::BodyOverflow); - } - body.reserve(length); - body.push(byte); - } - } - - let ResponseLazy { status_code, reason_phrase, headers, url, .. } = parent; - - Ok(Response { status_code, reason_phrase, headers, url, body }) + let (response, _stream) = parent.drain_with_stream(is_head, max_body_size)?; + Ok(response) } #[cfg(feature = "async")] @@ -308,15 +295,50 @@ pub struct ResponseLazy { /// ). pub url: String, - stream: HttpStreamBytes, + stream: StreamBytes, state: HttpStreamState, max_trailing_headers_size: Option, max_body_size: Option, bytes_read: usize, } +/// A byte iterator over an [`HttpStream`] that allows recovering the inner stream. +/// +/// This is equivalent to [`std::io::Bytes`] but provides [`into_buf_reader`] to +/// extract the underlying [`BufReader`] (and ultimately the [`HttpStream`]) after +/// the response has been fully read, enabling connection reuse. +/// +/// [`into_buf_reader`]: StreamBytes::into_buf_reader #[cfg(feature = "std")] -type HttpStreamBytes = Bytes>; +struct StreamBytes { + inner: BufReader, +} + +#[cfg(feature = "std")] +impl StreamBytes { + fn new(stream: HttpStream, capacity: usize) -> Self { + StreamBytes { inner: BufReader::with_capacity(capacity, stream) } + } + + fn into_buf_reader(self) -> BufReader { self.inner } +} + +#[cfg(feature = "std")] +impl Iterator for StreamBytes { + type Item = Result; + + fn next(&mut self) -> Option { + let mut byte = 0; + loop { + return match self.inner.read(core::slice::from_mut(&mut byte)) { + Ok(0) => None, + Ok(..) => Some(Ok(byte)), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => Some(Err(e)), + }; + } + } +} #[cfg(feature = "std")] impl ResponseLazy { @@ -326,7 +348,7 @@ impl ResponseLazy { max_status_line_len: Option, max_body_size: Option, ) -> Result { - let mut stream = BufReader::with_capacity(BACKING_READ_BUFFER_LENGTH, stream).bytes(); + let mut stream = StreamBytes::new(stream, BACKING_READ_BUFFER_LENGTH); let ResponseMetadata { status_code, reason_phrase, @@ -356,7 +378,7 @@ impl ResponseLazy { reason_phrase: response.reason_phrase, headers: response.headers, url: response.url, - stream: BufReader::with_capacity(1, http_stream).bytes(), + stream: StreamBytes::new(http_stream, 1), state: HttpStreamState::EndOnClose, max_trailing_headers_size: None, // Body was already fully loaded and size-checked by send_async @@ -366,6 +388,31 @@ impl ResponseLazy { } } +#[cfg(feature = "std")] +impl ResponseLazy { + /// Drains the body and assembles a [`Response`], also returning the underlying + /// [`BufReader`] for potential connection reuse. + pub(crate) fn drain_with_stream( + mut self, + is_head: bool, + max_body_size: Option, + ) -> Result<(Response, BufReader), Error> { + let mut body = Vec::new(); + if !is_head && self.status_code != 204 && self.status_code != 304 { + for byte in &mut self { + let (byte, length) = byte?; + if max_body_size.is_some_and(|max| body.len().saturating_add(length) > max) { + return Err(Error::BodyOverflow); + } + body.reserve(length); + body.push(byte); + } + } + let ResponseLazy { status_code, reason_phrase, headers, url, stream, .. } = self; + Ok((Response { status_code, reason_phrase, headers, url, body }, stream.into_buf_reader())) + } +} + #[cfg(feature = "std")] impl Iterator for ResponseLazy { type Item = Result<(u8, usize), Error>; @@ -700,7 +747,7 @@ macro_rules! define_read_methods { } #[cfg(feature = "std")] -define_read_methods!((read_until_closed, read_with_content_length, read_trailers, read_chunked, read_metadata, read_line)<>, HttpStreamBytes); +define_read_methods!((read_until_closed, read_with_content_length, read_trailers, read_chunked, read_metadata, read_line)<>, StreamBytes); #[cfg(feature = "async")] define_read_methods!((read_until_closed_async, read_with_content_length_async, read_trailers_async, read_chunked_async, read_metadata_async, read_line_async), R, async, await); From 66a33f670cd6cfd6ba68cd58128b0039a8c16595 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 20 Apr 2026 12:22:58 +0200 Subject: [PATCH 3/3] [bitreq]: Test blocking `Client` path in integration tests Exercise the blocking `Client::send` path alongside the existing one-shot `send` / `send_lazy` comparisons, so that every integration test asserts that the pooled and non-pooled blocking paths produce identical responses. The assertion block now runs regardless of the `async` feature, since the blocking client is always available. Co-Authored-By: HAL 9000 --- bitreq/tests/setup.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/bitreq/tests/setup.rs b/bitreq/tests/setup.rs index 234da30af..645555c30 100644 --- a/bitreq/tests/setup.rs +++ b/bitreq/tests/setup.rs @@ -201,7 +201,6 @@ pub fn setup() { pub fn url(req: &str) -> String { format!("http://localhost:35562{}", req) } -#[cfg(feature = "async")] static CLIENT: std::sync::OnceLock = std::sync::OnceLock::new(); #[cfg(feature = "async")] static RUNTIME: std::sync::OnceLock = std::sync::OnceLock::new(); @@ -224,6 +223,25 @@ pub async fn maybe_make_request( (res, lazy_res) => panic!("{res:?} != {}", lazy_res.is_err()), } + // Test blocking Client path + { + let client = CLIENT.get_or_init(|| bitreq::Client::new(100)); + let client_response = client.send(request.clone()); + match (&response, client_response) { + (Ok(resp), Ok(client_resp)) => { + assert_eq!(client_resp.status_code, resp.status_code); + assert_eq!(client_resp.reason_phrase, resp.reason_phrase); + assert_eq!(client_resp.as_bytes(), resp.as_bytes()); + } + (Err(e), Err(client_e)) => { + assert_eq!(format!("{e:?}"), format!("{client_e:?}")); + } + (res, client_res) => { + panic!("{res:?} != {client_res:?}"); + } + } + } + #[cfg(feature = "async")] { if let Ok(resp) = &response {