Skip to content

Commit 89a71f4

Browse files
committed
feat(relay): steer deployments with rolling quota ledger
Apps Script quota is consumed per relay invocation, but a plain round-robin selector has no memory of how heavily this client has used each deployment inside the recent quota window. When multiple script IDs are configured, continuing to select an already saturated deployment while another configured deployment is still locally underused wastes available capacity and increases the chance of quota-related relay stalls. DomainFronter now keeps a per-script local ledger of selection timestamps in a rolling 24-hour window. Before choosing a script ID, the selector prunes expired observations and prefers non-blacklisted deployments whose local call count remains below the free-tier request budget. Both the single-request selector and the parallel fan-out selector use the same ledger so Apps Script batches and relay fan-out draw from the same local capacity model. The ledger records selections at dispatch time. That deliberately accounts for concurrent fan-out attempts and for requests that may still complete server-side after the Rust future is dropped. The ledger is a local steering signal rather than an authoritative Google quota reading: if every non-blacklisted deployment is locally saturated, the selector still returns a deployment instead of creating a client-side outage. This preserves connectivity for paid Workspace quotas, shared deployments whose external usage is invisible to this process, and cases where the local estimate is conservative. Selection remains decoupled from the existing failure blacklist. Blacklisted deployments are still skipped first; the rolling quota ledger only orders otherwise healthy deployments by locally observed capacity. If all deployments are blacklisted, the existing earliest-cooldown recovery path is preserved and the selected deployment is recorded in the ledger. The guide now describes the local rolling 24-hour ledger in the Full Mode deployment-scaling section, including the fact that it steers away from deployments this client has already driven near the free-tier request budget. Unit coverage exercises saturated deployment skipping, expired observation pruning, all-saturated connectivity fallback, and parallel selection preferring unsaturated deployments.
1 parent 0b49eda commit 89a71f4

3 files changed

Lines changed: 189 additions & 7 deletions

File tree

docs/guide.fa.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ HTTP / HTTPS مثل قبل از Apps Script می‌رود (تغییری نمی
225225
| ۶ | ۱۸۰ | توصیه‌شده برای استفادهٔ سنگین |
226226
| ۱۲ | ۳۶۰ | چند حساب — حداکثر توان |
227227

228-
بیشتر Deployment = همزمانی بیشتر = تأخیر کمتر هر سشن. هر بَچ بین IDها چرخش می‌کند و بار به‌طور یکنواخت توزیع می‌شود، احتمال رسیدن به سقف سهمیهٔ یک Deployment کاهش می‌یابد.
228+
بیشتر Deployment = همزمانی بیشتر = تأخیر کمتر هر سشن. انتخاب هر بَچ از بین IDهای تنظیم‌شده با یک ledger محلی rolling 24-hour انجام می‌شود؛ بار پخش می‌شود و کلاینت از Deploymentهایی که همین دستگاه نزدیک سقف request سهمیهٔ رایگان برده دوری می‌کند.
229229

230230
**محافظ‌های منابع:**
231231
- **حداکثر ۵۰ op** در هر بَچ — اگر سشن‌های فعال بیشتر باشند، مالتی‌پلکسر چند بَچ می‌فرستد

docs/guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ max_concurrent = 30 × number_of_deployment_ids
225225
| 6 | 180 | Recommended for heavy use |
226226
| 12 | 360 | Multi-account power setup |
227227

228-
More deployments = more total concurrency = lower per-session latency. Each batch round-robins across your IDs, spreading load and reducing the chance of hitting any single deployment's quota ceiling.
228+
More deployments = more total concurrency = lower per-session latency. Each batch is selected from the configured IDs with a local rolling 24-hour ledger, spreading load and steering away from deployments this client has already driven near the free-tier request budget.
229229

230230
**Resource guards:**
231231
- **50 ops max** per batch — if more sessions are active, the mux splits into multiple batches

src/domain_fronter.rs

Lines changed: 187 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//! buffered `relay_parallel_range` compatibility wrapper for callers that
1414
//! want a `Vec<u8>` back.
1515
16-
use std::collections::HashMap;
16+
use std::collections::{HashMap, VecDeque};
1717
// AtomicU64 via portable-atomic: native on 64-bit / armv7, spinlock-
1818
// backed on mipsel (MIPS32 has no 64-bit atomic instructions). API
1919
// is identical to std::sync::atomic::AtomicU64 so call sites need
@@ -147,6 +147,12 @@ const H1_OPEN_TIMEOUT_SECS: u64 = 8;
147147
/// request to wake back up — most painful on YouTube / streaming where
148148
/// the first chunk after a quiet pause stalls the player.
149149
const H1_KEEPALIVE_INTERVAL_SECS: u64 = 240;
150+
/// Conservative local estimate of the Apps Script UrlFetchApp free-tier
151+
/// request budget per deployment account. This is not an authoritative Google
152+
/// quota read; it is a client-side selector guard that avoids concentrating
153+
/// traffic on a deployment this process has already used heavily.
154+
const SCRIPT_QUOTA_FREE_TIER_CALLS: usize = 20_000;
155+
const SCRIPT_QUOTA_WINDOW: Duration = Duration::from_secs(24 * 60 * 60);
150156
/// Largest response body Apps Script's `UrlFetchApp` will deliver before
151157
/// the script gets killed mid-execution. The hard wire ceiling is ~50 MiB;
152158
/// after base64 / envelope overhead and edge variance, the practical raw
@@ -357,6 +363,11 @@ pub struct DomainFronter {
357363
inflight: Arc<Mutex<HashMap<String, broadcast::Sender<Vec<u8>>>>>,
358364
coalesced: AtomicU64,
359365
blacklist: Arc<std::sync::Mutex<HashMap<String, Instant>>>,
366+
/// Per-deployment local call ledger used by `next_script_id` /
367+
/// `next_script_ids` to avoid selecting an already saturated deployment
368+
/// while another configured script still has locally-observed capacity.
369+
/// Entries are pruned on selection against a rolling 24-hour window.
370+
script_quota_ledger: Arc<std::sync::Mutex<HashMap<String, VecDeque<Instant>>>>,
360371
/// Per-deployment rolling timeout counter. Maps `script_id` →
361372
/// `(window_start, strike_count)`. Reset when the window expires
362373
/// or when a batch succeeds. Triggers a short-cooldown blacklist
@@ -620,6 +631,7 @@ impl DomainFronter {
620631
inflight: Arc::new(Mutex::new(HashMap::new())),
621632
coalesced: AtomicU64::new(0),
622633
blacklist: Arc::new(std::sync::Mutex::new(HashMap::new())),
634+
script_quota_ledger: Arc::new(std::sync::Mutex::new(HashMap::new())),
623635
script_timeouts: Arc::new(std::sync::Mutex::new(HashMap::new())),
624636
relay_calls: AtomicU64::new(0),
625637
relay_failures: AtomicU64::new(0),
@@ -802,21 +814,39 @@ impl DomainFronter {
802814
let mut bl = self.blacklist.lock().unwrap();
803815
let now = Instant::now();
804816
bl.retain(|_, until| *until > now);
817+
let mut quota = self.script_quota_ledger.lock().unwrap();
818+
prune_script_quota_ledger(&mut quota, now);
805819

820+
let mut saturated_fallback: Option<String> = None;
806821
for _ in 0..n {
807822
let idx = self.script_idx.fetch_add(1, Ordering::Relaxed);
808823
let sid = &self.script_ids[idx % n];
809824
if !bl.contains_key(sid) {
810-
return sid.clone();
825+
if script_has_local_quota_capacity(&quota, sid) {
826+
record_script_quota_call_locked(&mut quota, sid, now);
827+
return sid.clone();
828+
}
829+
saturated_fallback.get_or_insert_with(|| sid.clone());
811830
}
812831
}
832+
// If every non-blacklisted deployment is locally saturated, preserve
833+
// connectivity instead of hard-failing. Paid Workspace quotas and
834+
// traffic from other clients are not visible to this process, so this
835+
// ledger is a steering signal, not an authoritative quota gate.
836+
if let Some(sid) = saturated_fallback {
837+
record_script_quota_call_locked(&mut quota, &sid, now);
838+
return sid;
839+
}
813840
// All blacklisted: pick whichever comes off cooldown soonest.
814841
if let Some((sid, _)) = bl.iter().min_by_key(|(_, t)| **t) {
815842
let sid = sid.clone();
816843
bl.remove(&sid);
844+
record_script_quota_call_locked(&mut quota, &sid, now);
817845
return sid;
818846
}
819-
self.script_ids[0].clone()
847+
let sid = self.script_ids[0].clone();
848+
record_script_quota_call_locked(&mut quota, &sid, now);
849+
sid
820850
}
821851

822852
/// Pick `want` distinct non-blacklisted script IDs for a parallel fan-out
@@ -831,20 +861,34 @@ impl DomainFronter {
831861
let mut bl = self.blacklist.lock().unwrap();
832862
let now = Instant::now();
833863
bl.retain(|_, until| *until > now);
864+
let mut quota = self.script_quota_ledger.lock().unwrap();
865+
prune_script_quota_ledger(&mut quota, now);
834866

835867
let mut picked: Vec<String> = Vec::with_capacity(want);
868+
let mut saturated_fallback: Vec<String> = Vec::with_capacity(want);
836869
for _ in 0..n {
837870
if picked.len() >= want {
838871
break;
839872
}
840873
let idx = self.script_idx.fetch_add(1, Ordering::Relaxed);
841874
let sid = &self.script_ids[idx % n];
842875
if !bl.contains_key(sid) && !picked.iter().any(|p| p == sid) {
843-
picked.push(sid.clone());
876+
if script_has_local_quota_capacity(&quota, sid) {
877+
picked.push(sid.clone());
878+
} else if !saturated_fallback.iter().any(|p| p == sid) {
879+
saturated_fallback.push(sid.clone());
880+
}
844881
}
845882
}
846883
if picked.is_empty() {
847-
picked.push(self.script_ids[0].clone());
884+
if let Some(sid) = saturated_fallback.into_iter().next() {
885+
picked.push(sid);
886+
} else {
887+
picked.push(self.script_ids[0].clone());
888+
}
889+
}
890+
for sid in &picked {
891+
record_script_quota_call_locked(&mut quota, sid, now);
848892
}
849893
picked
850894
}
@@ -3879,6 +3923,40 @@ fn add_random_pad(map: &mut serde_json::Map<String, Value>) {
38793923
map.insert("_pad".into(), Value::String(B64.encode(&buf)));
38803924
}
38813925

3926+
fn prune_script_quota_ledger(
3927+
ledger: &mut HashMap<String, VecDeque<Instant>>,
3928+
now: Instant,
3929+
) {
3930+
let cutoff = now.checked_sub(SCRIPT_QUOTA_WINDOW).unwrap_or(now);
3931+
ledger.retain(|_, calls| {
3932+
while calls.front().map(|ts| *ts <= cutoff).unwrap_or(false) {
3933+
calls.pop_front();
3934+
}
3935+
!calls.is_empty()
3936+
});
3937+
}
3938+
3939+
fn script_has_local_quota_capacity(
3940+
ledger: &HashMap<String, VecDeque<Instant>>,
3941+
script_id: &str,
3942+
) -> bool {
3943+
ledger
3944+
.get(script_id)
3945+
.map(|calls| calls.len() < SCRIPT_QUOTA_FREE_TIER_CALLS)
3946+
.unwrap_or(true)
3947+
}
3948+
3949+
fn record_script_quota_call_locked(
3950+
ledger: &mut HashMap<String, VecDeque<Instant>>,
3951+
script_id: &str,
3952+
now: Instant,
3953+
) {
3954+
ledger
3955+
.entry(script_id.to_string())
3956+
.or_default()
3957+
.push_back(now);
3958+
}
3959+
38823960
/// "YYYY-MM-DD" of the current Pacific Time date. Used as the daily-reset
38833961
/// boundary for `today_calls` / `today_bytes` because **Apps Script's
38843962
/// quota counter resets at midnight Pacific Time, not UTC** — that's
@@ -6606,6 +6684,110 @@ hello";
66066684
DomainFronter::new(&cfg).expect("test fronter must construct")
66076685
}
66086686

6687+
fn fronter_for_script_ids(script_ids: &[&str]) -> DomainFronter {
6688+
let script_ids_json = serde_json::to_string(script_ids).unwrap();
6689+
let json = format!(
6690+
r#"{{
6691+
"mode": "apps_script",
6692+
"google_ip": "127.0.0.1",
6693+
"front_domain": "www.google.com",
6694+
"script_id": {},
6695+
"auth_key": "test_auth_key",
6696+
"listen_host": "127.0.0.1",
6697+
"listen_port": 8085,
6698+
"log_level": "info",
6699+
"verify_ssl": true
6700+
}}"#,
6701+
script_ids_json
6702+
);
6703+
let cfg: Config = serde_json::from_str(&json).unwrap();
6704+
DomainFronter::new(&cfg).expect("test fronter must construct")
6705+
}
6706+
6707+
fn seed_script_quota(fronter: &DomainFronter, script_id: &str, count: usize, at: Instant) {
6708+
let mut ledger = fronter.script_quota_ledger.lock().unwrap();
6709+
ledger
6710+
.entry(script_id.to_string())
6711+
.or_default()
6712+
.extend(std::iter::repeat(at).take(count));
6713+
}
6714+
6715+
#[test]
6716+
fn next_script_id_skips_locally_saturated_deployment() {
6717+
let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B"]);
6718+
seed_script_quota(
6719+
&fronter,
6720+
"SCRIPT_A",
6721+
SCRIPT_QUOTA_FREE_TIER_CALLS,
6722+
Instant::now(),
6723+
);
6724+
6725+
let selected = fronter.next_script_id();
6726+
6727+
assert_eq!(selected, "SCRIPT_B");
6728+
let ledger = fronter.script_quota_ledger.lock().unwrap();
6729+
assert_eq!(
6730+
ledger.get("SCRIPT_B").map(|calls| calls.len()),
6731+
Some(1),
6732+
"selection must be recorded in the local rolling ledger"
6733+
);
6734+
}
6735+
6736+
#[test]
6737+
fn script_quota_prune_removes_expired_observations() {
6738+
let recorded_at = Instant::now();
6739+
let prune_at = recorded_at
6740+
.checked_add(SCRIPT_QUOTA_WINDOW + Duration::from_secs(1))
6741+
.expect("test clock must support a 24h monotonic addition");
6742+
let mut ledger = HashMap::new();
6743+
ledger.insert(
6744+
"SCRIPT_A".to_string(),
6745+
std::iter::repeat(recorded_at)
6746+
.take(SCRIPT_QUOTA_FREE_TIER_CALLS)
6747+
.collect::<VecDeque<_>>(),
6748+
);
6749+
6750+
prune_script_quota_ledger(&mut ledger, prune_at);
6751+
6752+
assert!(
6753+
ledger.is_empty(),
6754+
"rolling quota ledger should discard observations outside the 24h window"
6755+
);
6756+
}
6757+
6758+
#[test]
6759+
fn next_script_id_preserves_connectivity_when_all_scripts_are_locally_saturated() {
6760+
let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B"]);
6761+
let now = Instant::now();
6762+
seed_script_quota(&fronter, "SCRIPT_A", SCRIPT_QUOTA_FREE_TIER_CALLS, now);
6763+
seed_script_quota(&fronter, "SCRIPT_B", SCRIPT_QUOTA_FREE_TIER_CALLS, now);
6764+
6765+
let selected = fronter.next_script_id();
6766+
6767+
assert_eq!(selected, "SCRIPT_A");
6768+
let ledger = fronter.script_quota_ledger.lock().unwrap();
6769+
assert_eq!(
6770+
ledger.get("SCRIPT_A").map(|calls| calls.len()),
6771+
Some(SCRIPT_QUOTA_FREE_TIER_CALLS + 1),
6772+
"local saturation is a steering signal, not a hard outage trigger"
6773+
);
6774+
}
6775+
6776+
#[test]
6777+
fn parallel_script_selection_prefers_unsaturated_deployments() {
6778+
let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B", "SCRIPT_C"]);
6779+
seed_script_quota(
6780+
&fronter,
6781+
"SCRIPT_A",
6782+
SCRIPT_QUOTA_FREE_TIER_CALLS,
6783+
Instant::now(),
6784+
);
6785+
6786+
let selected = fronter.next_script_ids(2);
6787+
6788+
assert_eq!(selected, vec!["SCRIPT_B".to_string(), "SCRIPT_C".to_string()]);
6789+
}
6790+
66096791
#[tokio::test(flavor = "current_thread")]
66106792
async fn force_http1_disables_h2_at_construction() {
66116793
// The kill switch: force_http1=true must mark the fronter as

0 commit comments

Comments
 (0)