Skip to content

Commit c179d9b

Browse files
committed
[bitreq]: Add blocking Client with connection pooling
Previously, we added a `Client` allowing for connection reuse behind the `async` feature. Here we do the same for the blocking, i.e., non-`async` path. The client pools are exposed as additive capabilities: the blocking pool is always available when `std` is, and the async pool is available when the `async` feature is enabled. Unlike the async path, the blocking pool keys the cache by `(https, host, port, proxy)` and stores the raw keep-alive stream alongside an expiry `Instant` parsed from the server's `Keep-Alive: timeout=N` header (defaulting to 60s). Stream reuse is gated on both `Connection: keep-alive` and the `BufReader` having no trailing bytes, to avoid corrupting the next response on the socket. Redirect handling (301/302/303/307, including 303 method downgrade) is driven by `Connection::send_pooled`, which consults the provided `&Client` for a pooled connection on each hop rather than duplicating the redirect loop in `client.rs`. Keep-alive header parsing lives in `connection.rs` so that "connection lifetime" concerns stay in one module. Additional divergence between the sync and async paths (pipelining, per-request IDs, dropped-reader tracking in `AsyncConnection`) intentionally remains out of scope. Co-Authored-By: HAL 9000
1 parent b128a97 commit c179d9b

4 files changed

Lines changed: 400 additions & 120 deletions

File tree

bitreq/src/client.rs

Lines changed: 148 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,29 @@
1-
//! Connection pooling client for HTTP requests.
1+
//! Connection pooling [`Client`] for HTTP requests.
22
//!
3-
//! The `Client` caches connections to avoid repeated TCP handshakes and TLS negotiations.
3+
//! The [`Client`] caches connections to avoid repeated TCP handshakes and TLS negotiations.
44
//!
5-
//! Due to std limitations, `Client` currently only supports async requests.
6-
7-
#![cfg(feature = "async")]
5+
//! A blocking connection pool is always available. When the `async` feature is enabled, an
6+
//! additional async connection pool is exposed via [`Client::send_async`] and
7+
//! [`RequestExt::send_async_with_client`]. Both pools share a single idle-cache budget
8+
//! governed by a unified LRU.
89
910
use std::collections::{hash_map, HashMap, VecDeque};
1011
use std::sync::{Arc, Mutex};
12+
use std::time::Instant;
1113

14+
#[cfg(feature = "async")]
1215
use crate::connection::AsyncConnection;
16+
use crate::connection::{Connection, HttpStream};
1317
use crate::request::{OwnedConnectionParams as ConnectionKey, ParsedRequest};
1418
use crate::{Error, Request, Response};
1519

1620
/// A client that caches connections for reuse.
1721
///
18-
/// The client maintains a cache of up to `max_idle` connections, evicting the least
19-
/// recently used entry when the cache is full.
22+
/// The client maintains a cache of up to `max_idle` total connections — shared across
23+
/// the blocking and (when enabled) async paths — evicting the least recently used
24+
/// entry when the cache is full. A cached blocking connection is reused when the
25+
/// server indicated `Connection: keep-alive` and the keep-alive timeout has not yet
26+
/// expired.
2027
///
2128
/// # Bound applies to cached entries, not live connections
2229
///
@@ -27,97 +34,199 @@ use crate::{Error, Request, Response};
2734
/// the cache on put-back once it is full. To bound concurrency — rather than idle
2835
/// reuse — a caller must arrange a separate semaphore on top.
2936
///
30-
/// # Example
37+
/// # Examples
3138
///
39+
/// Blocking:
3240
/// ```no_run
33-
/// # async fn request() {
41+
/// # fn main() -> Result<(), bitreq::Error> {
3442
/// use bitreq::{Client, RequestExt};
3543
///
3644
/// let client = Client::new(10); // Cache up to 10 idle connections
45+
/// let response = bitreq::get("http://example.com").send_with_client(&client)?;
46+
/// # Ok(()) }
47+
/// ```
48+
///
49+
/// Async (requires the `async` feature):
50+
#[cfg_attr(feature = "async", doc = "```no_run")]
51+
#[cfg_attr(not(feature = "async"), doc = "```ignore")]
52+
/// # async fn request() -> Result<(), bitreq::Error> {
53+
/// use bitreq::{Client, RequestExt};
54+
///
55+
/// let client = Client::new(10);
3756
/// let response = bitreq::get("https://example.com")
3857
/// .send_async_with_client(&client)
39-
/// .await;
40-
/// # }
58+
/// .await?;
59+
/// # Ok(()) }
4160
/// ```
4261
#[derive(Clone)]
4362
pub struct Client {
44-
r#async: Arc<Mutex<ClientImpl<AsyncConnection>>>,
63+
state: Arc<Mutex<ClientState>>,
4564
}
4665

47-
struct ClientImpl<T> {
48-
connections: HashMap<ConnectionKey, Arc<T>>,
49-
lru_order: VecDeque<ConnectionKey>,
66+
struct ClientState {
67+
blocking_connections: HashMap<ConnectionKey, PoolEntry>,
68+
#[cfg(feature = "async")]
69+
async_connections: HashMap<ConnectionKey, Arc<AsyncConnection>>,
70+
/// Unified LRU across both pools. The oldest entry is at the front.
71+
lru_order: VecDeque<LruKey>,
5072
max_idle: usize,
5173
}
5274

75+
#[derive(Clone, PartialEq, Eq)]
76+
enum LruKey {
77+
Blocking(ConnectionKey),
78+
#[cfg(feature = "async")]
79+
Async(ConnectionKey),
80+
}
81+
82+
pub(crate) struct PoolEntry {
83+
pub(crate) stream: HttpStream,
84+
pub(crate) expires_at: Instant,
85+
}
86+
5387
impl Client {
54-
/// Creates a new `Client` with the specified idle-cache size.
88+
/// Creates a new `Client` with the specified total idle-cache size.
5589
///
56-
/// # Arguments
57-
///
58-
/// * `max_idle` - Maximum number of cached idle connections. When this limit is
59-
/// reached, the least recently used connection is evicted. See the [type-level
60-
/// docs](Client) for why this does not bound the number of live connections.
90+
/// The cache is shared across the blocking and (when enabled) async paths. When
91+
/// the total number of cached connections exceeds `max_idle`, the least recently
92+
/// used entry is evicted regardless of which pool it lives in. See the
93+
/// [type-level docs](Client) for why this does not bound the number of live
94+
/// connections.
6195
pub fn new(max_idle: usize) -> Self {
6296
Client {
63-
r#async: Arc::new(Mutex::new(ClientImpl {
64-
connections: HashMap::new(),
97+
state: Arc::new(Mutex::new(ClientState {
98+
blocking_connections: HashMap::new(),
99+
#[cfg(feature = "async")]
100+
async_connections: HashMap::new(),
65101
lru_order: VecDeque::new(),
66102
max_idle,
67103
})),
68104
}
69105
}
70106

107+
/// Sends a request using a cached connection if available.
108+
pub fn send(&self, request: Request) -> Result<Response, Error> {
109+
let parsed = ParsedRequest::new(request)?;
110+
let key: ConnectionKey = parsed.connection_params().into();
111+
let connection = match self.take_connection(&key) {
112+
Some(conn) => conn,
113+
None => Connection::new(parsed.connection_params(), parsed.timeout_at)?,
114+
};
115+
connection.send_pooled(self, parsed)
116+
}
117+
118+
/// Takes a pooled [`Connection`] for `key`, if one exists and has not expired.
119+
pub(crate) fn take_connection(&self, key: &ConnectionKey) -> Option<Connection> {
120+
let mut state = self.state.lock().unwrap();
121+
let entry = state.blocking_connections.remove(key)?;
122+
let lru_key = LruKey::Blocking(key.clone());
123+
if let Some(pos) = state.lru_order.iter().position(|k| k == &lru_key) {
124+
state.lru_order.remove(pos);
125+
}
126+
if entry.expires_at > Instant::now() {
127+
Some(Connection::from_stream(entry.stream))
128+
} else {
129+
None
130+
}
131+
}
132+
133+
/// Puts a stream back into the pool under `key`, with the given expiry.
134+
pub(crate) fn put_stream(&self, key: ConnectionKey, stream: HttpStream, expires_at: Instant) {
135+
let mut state = self.state.lock().unwrap();
136+
if let hash_map::Entry::Vacant(entry) = state.blocking_connections.entry(key.clone()) {
137+
entry.insert(PoolEntry { stream, expires_at });
138+
state.lru_order.push_back(LruKey::Blocking(key));
139+
state.evict_if_over_capacity();
140+
}
141+
}
142+
71143
/// Sends a request asynchronously using a cached connection if available.
144+
#[cfg(feature = "async")]
72145
pub async fn send_async(&self, request: Request) -> Result<Response, Error> {
73146
let parsed_request = ParsedRequest::new(request)?;
74147
let key = parsed_request.connection_params();
75-
let owned_key = key.into();
148+
let owned_key: ConnectionKey = key.into();
76149

77-
// Try to get cached connection
78150
let conn_opt = {
79-
let state = self.r#async.lock().unwrap();
80-
81-
if let Some(conn) = state.connections.get(&owned_key) {
82-
Some(Arc::clone(conn))
151+
let mut state = self.state.lock().unwrap();
152+
if let Some(conn) = state.async_connections.get(&owned_key) {
153+
let conn = Arc::clone(conn);
154+
// Refresh LRU position so this hit is treated as the most recent use.
155+
let lru_key = LruKey::Async(owned_key.clone());
156+
if let Some(pos) = state.lru_order.iter().position(|k| k == &lru_key) {
157+
state.lru_order.remove(pos);
158+
state.lru_order.push_back(lru_key);
159+
}
160+
Some(conn)
83161
} else {
84162
None
85163
}
86164
};
165+
87166
let conn = if let Some(conn) = conn_opt {
88167
conn
89168
} else {
90169
let connection = AsyncConnection::new(key, parsed_request.timeout_at).await?;
91170
let connection = Arc::new(connection);
92171

93-
let mut state = self.r#async.lock().unwrap();
94-
if let hash_map::Entry::Vacant(entry) = state.connections.entry(owned_key) {
172+
let mut state = self.state.lock().unwrap();
173+
if let hash_map::Entry::Vacant(entry) = state.async_connections.entry(owned_key.clone())
174+
{
95175
entry.insert(Arc::clone(&connection));
96-
state.lru_order.push_back(key.into());
97-
if state.connections.len() > state.max_idle {
98-
if let Some(oldest_key) = state.lru_order.pop_front() {
99-
state.connections.remove(&oldest_key);
100-
}
101-
}
176+
state.lru_order.push_back(LruKey::Async(owned_key));
177+
state.evict_if_over_capacity();
102178
}
103179
connection
104180
};
105181

106-
// Send the request
107182
conn.send(parsed_request).await
108183
}
109184
}
110185

111-
/// Extension trait for `Request` to use with `Client`.
186+
impl ClientState {
187+
fn total_len(&self) -> usize {
188+
let total = self.blocking_connections.len();
189+
#[cfg(feature = "async")]
190+
let total = total + self.async_connections.len();
191+
total
192+
}
193+
194+
fn evict_if_over_capacity(&mut self) {
195+
while self.total_len() > self.max_idle {
196+
let oldest = match self.lru_order.pop_front() {
197+
Some(k) => k,
198+
None => return,
199+
};
200+
match oldest {
201+
LruKey::Blocking(k) => {
202+
self.blocking_connections.remove(&k);
203+
}
204+
#[cfg(feature = "async")]
205+
LruKey::Async(k) => {
206+
self.async_connections.remove(&k);
207+
}
208+
}
209+
}
210+
}
211+
}
212+
213+
/// Extension trait for [`Request`] to use with [`Client`].
112214
pub trait RequestExt {
215+
/// Sends this request using the provided client's connection pool.
216+
fn send_with_client(self, client: &Client) -> Result<Response, Error>;
217+
113218
/// Sends this request asynchronously using the provided client's connection pool.
219+
#[cfg(feature = "async")]
114220
fn send_async_with_client(
115221
self,
116222
client: &Client,
117223
) -> impl std::future::Future<Output = Result<Response, Error>>;
118224
}
119225

120226
impl RequestExt for Request {
227+
fn send_with_client(self, client: &Client) -> Result<Response, Error> { client.send(self) }
228+
229+
#[cfg(feature = "async")]
121230
fn send_async_with_client(
122231
self,
123232
client: &Client,

0 commit comments

Comments
 (0)