Skip to content

Commit b2d9a6f

Browse files
committed
feat(screenscraper): persist concurrency in redis and scale down on lower maxthreads
1 parent 08512b6 commit b2d9a6f

1 file changed

Lines changed: 127 additions & 16 deletions

File tree

  • service/src/providers/screenscraper

service/src/providers/screenscraper/mod.rs

Lines changed: 127 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ struct QuotaSnapshot {
112112
max_requests_per_day: u64,
113113
requests_ko_today: u64,
114114
max_requests_ko_per_day: u64,
115+
#[serde(default)]
116+
concurrency: usize,
115117
}
116118

117119
pub struct ScreenScraperClient {
@@ -405,8 +407,12 @@ impl ScreenScraperClient {
405407
if let Some(resp) = env.response.as_ref()
406408
&& let Some(account_ref) = account.as_ref()
407409
{
410+
if let Some(user) = resp.ssuser.as_ref()
411+
&& let Some(target) = parse_maxthreads(user)
412+
{
413+
apply_concurrency(account_ref, target);
414+
}
408415
self.update_quota_from(account_ref, &resp.ssuser);
409-
self.update_concurrency_from(account_ref, &resp.ssuser);
410416
}
411417
let header_signals_failure = env
412418
.header
@@ -572,14 +578,20 @@ impl ScreenScraperClient {
572578
}
573579
}
574580

581+
let snap_raw: Result<Option<String>, _> = conn.get(&a.quota_redis_key).await;
582+
let snapshot = match snap_raw {
583+
Ok(Some(raw)) => serde_json::from_str::<QuotaSnapshot>(&raw).ok(),
584+
_ => None,
585+
};
586+
if let Some(snapshot) = snapshot.as_ref() {
587+
apply_concurrency(a, snapshot.concurrency);
588+
}
589+
575590
if a.exhausted_until_unix.load(Ordering::Relaxed) > now {
576591
continue;
577592
}
578593

579-
let snap_raw: Result<Option<String>, _> = conn.get(&a.quota_redis_key).await;
580-
if let Ok(Some(raw)) = snap_raw
581-
&& let Ok(snapshot) = serde_json::from_str::<QuotaSnapshot>(&raw)
582-
{
594+
if let Some(snapshot) = snapshot {
583595
let ok_hit = soft_limit_reached(
584596
Some(snapshot.requests_today),
585597
Some(snapshot.max_requests_per_day),
@@ -653,6 +665,7 @@ impl ScreenScraperClient {
653665
max_requests_per_day: parsed_u64(user.maxrequestsperday.as_deref()).unwrap_or(0),
654666
requests_ko_today: parsed_u64(user.requestskotoday.as_deref()).unwrap_or(0),
655667
max_requests_ko_per_day: parsed_u64(user.maxrequestskoperday.as_deref()).unwrap_or(0),
668+
concurrency: account.concurrency.load(Ordering::Relaxed),
656669
};
657670
self.persist_quota_snapshot(account, &snapshot);
658671

@@ -715,24 +728,52 @@ impl ScreenScraperClient {
715728
let _: Result<(), _> = conn.set_ex(&key, value, ttl).await;
716729
});
717730
}
731+
}
718732

719-
fn update_concurrency_from(&self, account: &Account, user: &Option<SsUser>) {
720-
let Some(user) = user else { return };
721-
let Some(target) = parse_maxthreads(user) else {
722-
return;
723-
};
724-
let current = account.concurrency.load(Ordering::Relaxed);
725-
if target > current {
726-
account.permits.add_permits(target - current);
727-
account.concurrency.store(target, Ordering::Relaxed);
728-
crate::metrics::set_screenscraper_concurrency(target as i64);
733+
fn apply_concurrency(account: &Account, target: usize) {
734+
if target == 0 {
735+
return;
736+
}
737+
let target = target.min(MAX_CONCURRENCY);
738+
let current = account.concurrency.load(Ordering::Relaxed);
739+
if target > current {
740+
account.permits.add_permits(target - current);
741+
account.concurrency.store(target, Ordering::Relaxed);
742+
crate::metrics::set_screenscraper_concurrency(target as i64);
743+
info!("screenscraper concurrency raised to {target} (was {current})");
744+
} else if target < current {
745+
let asked = current - target;
746+
let removed = account.permits.forget_permits(asked);
747+
account.concurrency.store(target, Ordering::Relaxed);
748+
crate::metrics::set_screenscraper_concurrency(target as i64);
749+
if removed < asked {
750+
drain_excess_permits(account.permits.clone(), asked - removed);
729751
info!(
730-
"screenscraper concurrency raised to {target} from ssuser.maxthreads (was {current})"
752+
"screenscraper concurrency lowered to {target} (was {current}); forgot {removed} of {asked} immediately, draining remaining {} as in-flight requests finish",
753+
asked - removed
754+
);
755+
} else {
756+
info!(
757+
"screenscraper concurrency lowered to {target} (was {current}); forgot {asked} permits"
731758
);
732759
}
733760
}
734761
}
735762

763+
fn drain_excess_permits(permits: Arc<Semaphore>, mut remaining: usize) {
764+
tokio::spawn(async move {
765+
while remaining > 0 {
766+
match permits.clone().acquire_owned().await {
767+
Ok(p) => {
768+
p.forget();
769+
remaining -= 1;
770+
}
771+
Err(_) => break,
772+
}
773+
}
774+
});
775+
}
776+
736777
enum Outcome {
737778
Done(StatusCode, Option<String>, String),
738779
Backoff429,
@@ -1196,10 +1237,79 @@ mod tests {
11961237
max_requests_per_day: 20_000,
11971238
requests_ko_today: 630,
11981239
max_requests_ko_per_day: 2_000,
1240+
concurrency: 4,
11991241
};
12001242
let raw = serde_json::to_string(&snap).expect("serialize");
12011243
let back: QuotaSnapshot = serde_json::from_str(&raw).expect("deserialize");
12021244
assert_eq!(snap, back);
1245+
assert!(raw.contains("\"concurrency\":4"));
1246+
}
1247+
1248+
#[test]
1249+
fn quota_snapshot_deserialises_legacy_payload_without_concurrency() {
1250+
let legacy = r#"{
1251+
"requests_today": 100,
1252+
"max_requests_per_day": 1000,
1253+
"requests_ko_today": 5,
1254+
"max_requests_ko_per_day": 100
1255+
}"#;
1256+
let snap: QuotaSnapshot = serde_json::from_str(legacy).expect("deserialize");
1257+
assert_eq!(snap.concurrency, 0);
1258+
assert_eq!(snap.requests_today, 100);
1259+
}
1260+
1261+
#[test]
1262+
fn apply_concurrency_scales_up_via_add_permits() {
1263+
let acc = Account::new("u".into(), "p".into());
1264+
assert_eq!(acc.concurrency.load(Ordering::Relaxed), 1);
1265+
assert_eq!(acc.permits.available_permits(), 1);
1266+
apply_concurrency(&acc, 5);
1267+
assert_eq!(acc.concurrency.load(Ordering::Relaxed), 5);
1268+
assert_eq!(acc.permits.available_permits(), 5);
1269+
}
1270+
1271+
#[test]
1272+
fn apply_concurrency_scales_down_via_forget_permits() {
1273+
let acc = Account::new("u".into(), "p".into());
1274+
apply_concurrency(&acc, 5);
1275+
assert_eq!(acc.permits.available_permits(), 5);
1276+
apply_concurrency(&acc, 2);
1277+
assert_eq!(acc.concurrency.load(Ordering::Relaxed), 2);
1278+
assert_eq!(acc.permits.available_permits(), 2);
1279+
}
1280+
1281+
#[test]
1282+
fn apply_concurrency_no_op_when_target_zero() {
1283+
let acc = Account::new("u".into(), "p".into());
1284+
apply_concurrency(&acc, 3);
1285+
apply_concurrency(&acc, 0);
1286+
assert_eq!(acc.concurrency.load(Ordering::Relaxed), 3);
1287+
assert_eq!(acc.permits.available_permits(), 3);
1288+
}
1289+
1290+
#[test]
1291+
fn apply_concurrency_clamps_to_max() {
1292+
let acc = Account::new("u".into(), "p".into());
1293+
apply_concurrency(&acc, MAX_CONCURRENCY + 4);
1294+
assert_eq!(acc.concurrency.load(Ordering::Relaxed), MAX_CONCURRENCY);
1295+
assert_eq!(acc.permits.available_permits(), MAX_CONCURRENCY);
1296+
}
1297+
1298+
#[test]
1299+
fn apply_concurrency_scales_down_while_permit_held_settles_after_drop() {
1300+
let acc = Account::new("u".into(), "p".into());
1301+
apply_concurrency(&acc, 4);
1302+
let held = acc
1303+
.permits
1304+
.clone()
1305+
.try_acquire_owned()
1306+
.expect("permit available");
1307+
assert_eq!(acc.permits.available_permits(), 3);
1308+
apply_concurrency(&acc, 1);
1309+
assert_eq!(acc.concurrency.load(Ordering::Relaxed), 1);
1310+
assert_eq!(acc.permits.available_permits(), 0);
1311+
drop(held);
1312+
assert_eq!(acc.permits.available_permits(), 1);
12031313
}
12041314

12051315
#[test]
@@ -1210,6 +1320,7 @@ mod tests {
12101320
max_requests_per_day: max,
12111321
requests_ko_today: 0,
12121322
max_requests_ko_per_day: 0,
1323+
concurrency: 0,
12131324
};
12141325
let from_snapshot =
12151326
soft_limit_reached(Some(snap.requests_today), Some(snap.max_requests_per_day));

0 commit comments

Comments
 (0)