Skip to content

Commit 534066c

Browse files
committed
coalesce concurrent identical GETs
First concurrent caller for a cache key does the relay; subsequent callers subscribe to a broadcast channel and receive the same response. Only applies to cacheable (GET/HEAD) requests without body.
1 parent 52d0031 commit 534066c

1 file changed

Lines changed: 64 additions & 6 deletions

File tree

src/domain_fronter.rs

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
//! TODO: add HTTP/2 multiplexing (`h2` crate) for lower latency.
1010
//! TODO: add parallel range-based downloads.
1111
12-
use std::sync::atomic::{AtomicUsize, Ordering};
12+
use std::collections::HashMap;
13+
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1314
use std::sync::Arc;
1415
use std::time::{Duration, Instant};
1516

@@ -19,7 +20,7 @@ use serde::{Deserialize, Serialize};
1920
use serde_json::Value;
2021
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2122
use tokio::net::TcpStream;
22-
use tokio::sync::Mutex;
23+
use tokio::sync::{broadcast, Mutex};
2324
use tokio::time::timeout;
2425
use tokio_rustls::client::TlsStream;
2526
use tokio_rustls::TlsConnector;
@@ -69,6 +70,8 @@ pub struct DomainFronter {
6970
tls_connector: TlsConnector,
7071
pool: Arc<Mutex<Vec<PoolEntry>>>,
7172
cache: Arc<ResponseCache>,
73+
inflight: Arc<Mutex<HashMap<String, broadcast::Sender<Vec<u8>>>>>,
74+
coalesced: AtomicU64,
7275
}
7376

7477
/// Request payload sent to Apps Script (single, non-batch).
@@ -129,13 +132,19 @@ impl DomainFronter {
129132
tls_connector,
130133
pool: Arc::new(Mutex::new(Vec::new())),
131134
cache: Arc::new(ResponseCache::with_default()),
135+
inflight: Arc::new(Mutex::new(HashMap::new())),
136+
coalesced: AtomicU64::new(0),
132137
})
133138
}
134139

135140
pub fn cache(&self) -> &ResponseCache {
136141
&self.cache
137142
}
138143

144+
pub fn coalesced_count(&self) -> u64 {
145+
self.coalesced.load(Ordering::Relaxed)
146+
}
147+
139148
fn next_script_id(&self) -> &str {
140149
let idx = self.script_idx.fetch_add(1, Ordering::Relaxed);
141150
&self.script_ids[idx % self.script_ids.len()]
@@ -187,15 +196,64 @@ impl DomainFronter {
187196
headers: &[(String, String)],
188197
body: &[u8],
189198
) -> Vec<u8> {
190-
let cacheable = is_cacheable_method(method) && body.is_empty();
191-
let key = if cacheable { Some(cache_key(method, url)) } else { None };
199+
let coalescible = is_cacheable_method(method) && body.is_empty();
200+
let key = if coalescible { Some(cache_key(method, url)) } else { None };
201+
192202
if let Some(ref k) = key {
193203
if let Some(hit) = self.cache.get(k) {
194204
tracing::debug!("cache hit: {}", url);
195205
return hit;
196206
}
197207
}
198208

209+
// Coalesce concurrent identical requests: only the first caller actually
210+
// hits the relay; waiters subscribe to the same broadcast channel.
211+
let waiter = if let Some(ref k) = key {
212+
let mut inflight = self.inflight.lock().await;
213+
match inflight.get(k) {
214+
Some(tx) => {
215+
let rx = tx.subscribe();
216+
self.coalesced.fetch_add(1, Ordering::Relaxed);
217+
tracing::debug!("coalesced: {}", url);
218+
Some(rx)
219+
}
220+
None => {
221+
let (tx, _) = broadcast::channel(1);
222+
inflight.insert(k.clone(), tx);
223+
None
224+
}
225+
}
226+
} else {
227+
None
228+
};
229+
230+
if let Some(mut rx) = waiter {
231+
match rx.recv().await {
232+
Ok(bytes) => return bytes,
233+
Err(_) => return error_response(502, "coalesced request dropped"),
234+
}
235+
}
236+
237+
let bytes = self.relay_uncoalesced(method, url, headers, body, key.as_deref()).await;
238+
239+
if let Some(ref k) = key {
240+
let mut inflight = self.inflight.lock().await;
241+
if let Some(tx) = inflight.remove(k) {
242+
let _ = tx.send(bytes.clone());
243+
}
244+
}
245+
246+
bytes
247+
}
248+
249+
async fn relay_uncoalesced(
250+
&self,
251+
method: &str,
252+
url: &str,
253+
headers: &[(String, String)],
254+
body: &[u8],
255+
cache_key_opt: Option<&str>,
256+
) -> Vec<u8> {
199257
let bytes = match timeout(
200258
Duration::from_secs(REQUEST_TIMEOUT_SECS),
201259
self.do_relay_with_retry(method, url, headers, body),
@@ -213,10 +271,10 @@ impl DomainFronter {
213271
}
214272
};
215273

216-
if let Some(k) = key {
274+
if let Some(k) = cache_key_opt {
217275
if let Some(ttl) = parse_ttl(&bytes, url) {
218276
tracing::debug!("cache store: {} ttl={}s", url, ttl.as_secs());
219-
self.cache.put(k, bytes.clone(), ttl);
277+
self.cache.put(k.to_string(), bytes.clone(), ttl);
220278
}
221279
}
222280
bytes

0 commit comments

Comments
 (0)