Skip to content

Commit 1f38213

Browse files
MagicalTuxclaude
andcommitted
Harden the long-running worker: bounded cache, run backoff, manifest guards
Review pass for latent issues that only surface over long uptimes: - Blob cache grew without bound (only the temp file was ever removed), so a worker filled its disk over time — data blobs run hundreds of MB. Add a size-capped LRU: after each cached download, evict the oldest blobs past --cache-max-gb (default 20). Eviction is safe because fetch_blob loads each blob fully into memory; a running job never needs its file. In-progress .part/.tmp files are spared. - run_loop had no backoff on GPU errors (unlike prefetch_loop), so any persistent fault — OOM, wedged driver, no compatible cubin — spun it, claiming and downloading fragments as fast as the network allowed and burning the work pool. Back off idle_secs after a run error. - A bad manifest could panic the runner thread (block==0 divide-by-zero; oversized tile truncating the u32 grid), and a runner panic quietly exits the daemon. Validate block/record_size and reject grid overflow as handled errors instead. Add unit test for cache eviction. Bump version to 0.1.8. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 3b921a7 commit 1f38213

4 files changed

Lines changed: 114 additions & 6 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "decryptd"
3-
version = "0.1.7"
3+
version = "0.1.8"
44
edition = "2024"
55
license = "Proprietary"
66
authors = ["Karpeles Lab Inc"]

src/cuda.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,17 @@ pub fn run_job(
238238
tile: u64,
239239
mut progress: impl FnMut(u64, u64),
240240
) -> Result<Vec<u8>, String> {
241+
// Validate the publisher-supplied launch params up front: a bad manifest is a
242+
// handled error, never a panic (a panic here unwinds the runner thread and
243+
// takes the whole daemon down). `block == 0` would divide-by-zero below;
244+
// `record_size == 0` makes the output layout meaningless.
245+
if block == 0 {
246+
return Err("manifest block size is 0".into());
247+
}
248+
if record_size == 0 {
249+
return Err("manifest record_size is 0".into());
250+
}
251+
241252
let func = gpu.function(entry)?;
242253
let d_data = DeviceBuf::from_slice(data)?;
243254
let d_out = DeviceBuf::alloc(record_size as usize * out_cap as usize)?;
@@ -263,7 +274,12 @@ pub fn run_job(
263274
&mut a_oc as *mut _ as *mut c_void,
264275
&mut a_cap as *mut _ as *mut c_void,
265276
];
266-
let grid = count.div_ceil(block as u64) as u32;
277+
// A too-large tile relative to block can overflow the u32 grid dimension;
278+
// reject it rather than silently truncating (which would under-compute).
279+
let grid_u64 = count.div_ceil(block as u64);
280+
let grid = u32::try_from(grid_u64).map_err(|_| {
281+
format!("grid {grid_u64} exceeds u32 (tile too large for block {block})")
282+
})?;
267283
check(
268284
unsafe {
269285
cuLaunchKernel(

src/main.rs

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ mod gui;
3333

3434
use std::collections::HashMap;
3535
use std::io::{Cursor, Read};
36-
use std::path::PathBuf;
36+
use std::path::{Path, PathBuf};
3737
use std::sync::atomic::{AtomicUsize, Ordering};
3838
use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
3939
use std::sync::{Arc, Mutex};
@@ -66,6 +66,11 @@ struct RunArgs {
6666
/// the next job and upload of finished results always overlap the GPU run.
6767
#[arg(long, default_value_t = 1)]
6868
jobs: usize,
69+
/// Maximum on-disk blob cache size, in GB. Once exceeded, the oldest cached
70+
/// blobs are evicted. Eviction is safe: a running job holds its blob in memory,
71+
/// so the only cost of dropping a file is a re-download on a later cache miss.
72+
#[arg(long, default_value_t = 20)]
73+
cache_max_gb: u64,
6974
}
7075

7176
// ------------------------------------------------------------- pullOne response
@@ -245,10 +250,53 @@ fn fetch_blob(args: &RunArgs, d: &DataRef) -> Result<Vec<u8>> {
245250
let bytes = std::fs::read(&target)?;
246251
if is_temp {
247252
let _ = std::fs::remove_file(&target);
253+
} else {
254+
// We just added a finalized blob; keep the cache under its size cap.
255+
prune_cache(&cache, args.cache_max_gb.saturating_mul(1 << 30));
248256
}
249257
Ok(bytes)
250258
}
251259

260+
/// Keep the blob cache under `max_bytes` by evicting the oldest finalized entries
261+
/// (by mtime). Best-effort — any IO error just leaves that file in place. Skips
262+
/// rsurl's in-progress `.part`/`.tmp` files so an active download is never
263+
/// disturbed; everything else is fair game, since `fetch_blob` reads each blob
264+
/// fully into memory and no running job depends on its file surviving.
265+
fn prune_cache(cache: &Path, max_bytes: u64) {
266+
let Ok(rd) = std::fs::read_dir(cache) else {
267+
return;
268+
};
269+
let mut entries: Vec<(std::time::SystemTime, u64, PathBuf)> = Vec::new();
270+
let mut total: u64 = 0;
271+
for e in rd.flatten() {
272+
let path = e.path();
273+
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
274+
if name.ends_with(".part") || name.ends_with(".tmp") {
275+
continue; // an in-progress download rsurl still owns
276+
}
277+
let Ok(meta) = e.metadata() else { continue };
278+
if !meta.is_file() {
279+
continue;
280+
}
281+
let mtime = meta.modified().unwrap_or(std::time::UNIX_EPOCH);
282+
total = total.saturating_add(meta.len());
283+
entries.push((mtime, meta.len(), path));
284+
}
285+
if total <= max_bytes {
286+
return;
287+
}
288+
entries.sort_by_key(|(mtime, _, _)| *mtime); // oldest first
289+
for (_, len, path) in entries {
290+
if total <= max_bytes {
291+
break;
292+
}
293+
if std::fs::remove_file(&path).is_ok() {
294+
total = total.saturating_sub(len);
295+
eprintln!("[decryptd] cache: evicted {} ({len} B)", path.display());
296+
}
297+
}
298+
}
299+
252300
/// Decode an RFC 2397 `data:` URI into its raw bytes. Handles the two payload
253301
/// encodings: `;base64` (the platform's case — base64 over the gzip/xz blob) and
254302
/// the default percent-encoding. The media type in the header is ignored; the
@@ -654,6 +702,7 @@ fn prefetch_loop(
654702
/// GPU stage: the serialized step. One per `--jobs`; each takes a ready job, runs it,
655703
/// and hands the result to the upload stage.
656704
fn run_loop(
705+
args: Arc<RunArgs>,
657706
ready: Arc<Mutex<Receiver<ReadyJob>>>,
658707
inflight: InFlight,
659708
done: SyncSender<FinishedJob>,
@@ -677,6 +726,11 @@ fn run_loop(
677726
Err(e) => {
678727
eprintln!("[decryptd] run error: {e:#}");
679728
inflight.lock().unwrap().remove(&frag_id);
729+
// Back off before taking the next fragment. Without this a
730+
// persistent GPU fault (OOM, driver wedged, no compatible cubin)
731+
// spins here, claiming + downloading fragments as fast as the
732+
// network allows and burning the work pool for nothing.
733+
thread::sleep(Duration::from_secs(args.idle_secs));
680734
}
681735
}
682736
}
@@ -798,10 +852,15 @@ fn run_worker(args: RunArgs, status: Status) -> Result<()> {
798852
}
799853
let mut runners = Vec::new();
800854
for _ in 0..jobs {
801-
let (ready_rx, inflight, done_tx) = (ready_rx.clone(), inflight.clone(), done_tx.clone());
855+
let (args, ready_rx, inflight, done_tx) = (
856+
args.clone(),
857+
ready_rx.clone(),
858+
inflight.clone(),
859+
done_tx.clone(),
860+
);
802861
let status = status.clone();
803862
runners.push(thread::spawn(move || {
804-
run_loop(ready_rx, inflight, done_tx, status)
863+
run_loop(args, ready_rx, inflight, done_tx, status)
805864
}));
806865
}
807866
drop(done_tx);
@@ -844,4 +903,37 @@ mod tests {
844903
decode_data_url("data:application/octet-stream;BASE64,aGVs\nbG8=").expect("decode");
845904
assert_eq!(bytes, b"hello");
846905
}
906+
907+
#[test]
908+
fn prune_cache_evicts_down_to_cap_and_spares_in_progress() {
909+
// Unique scratch dir so parallel test runs don't collide.
910+
let dir = std::env::temp_dir().join(format!("decryptd-prune-{}", std::process::id()));
911+
let _ = std::fs::remove_dir_all(&dir);
912+
std::fs::create_dir_all(&dir).unwrap();
913+
914+
// Three finalized 10-byte blobs (30 B) plus an in-progress .part that
915+
// eviction must never touch even though we're over the cap.
916+
for name in ["aaa", "bbb", "ccc"] {
917+
std::fs::write(dir.join(name), [0u8; 10]).unwrap();
918+
}
919+
std::fs::write(dir.join("pending-download.tmp.part"), [0u8; 10]).unwrap();
920+
921+
// Cap at 15 B: must evict finalized blobs until <= 15 (keeps exactly one),
922+
// leaving the .part alone.
923+
prune_cache(&dir, 15);
924+
925+
let finalized = std::fs::read_dir(&dir)
926+
.unwrap()
927+
.flatten()
928+
.filter_map(|e| e.file_name().into_string().ok())
929+
.filter(|n| !n.ends_with(".part"))
930+
.count();
931+
assert_eq!(finalized, 1, "should evict down to one finalized blob");
932+
assert!(
933+
dir.join("pending-download.tmp.part").exists(),
934+
"in-progress .part must survive eviction"
935+
);
936+
937+
let _ = std::fs::remove_dir_all(&dir);
938+
}
847939
}

0 commit comments

Comments
 (0)