Skip to content

Commit a5beef7

Browse files
MagicalTuxclaude
andcommitted
Send a persistent UUIDv7 worker id with every pullOne
Generate a UUIDv7 on first run, persist it to <workdir>/worker-id, and pass it as worker=<id> in the pullOne body on every claim so the platform can attribute a worker's fragments across restarts. An existing non-empty file is honored verbatim (an operator can pin a chosen id); only a missing or blank file triggers generation. The id is minted once per process and shared by every GPU's prefetch thread. Unit test covers generation (v7), reuse, and pinned-id passthrough. Verified end-to-end: id minted + persisted and pullOne accepted with the param. Bump version to 0.1.11. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 9da0ea8 commit a5beef7

3 files changed

Lines changed: 63 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "decryptd"
3-
version = "0.1.10"
3+
version = "0.1.11"
44
edition = "2024"
55
license = "Proprietary"
66
authors = ["Karpeles Lab Inc"]
@@ -25,6 +25,9 @@ base64 = "0.22"
2525
clap = { version = "4", features = ["derive", "env"] }
2626
serde = { version = "1", features = ["derive"] }
2727
serde_json = "1"
28+
# Persistent per-worker id (UUIDv7, time-ordered) sent with every pullOne so the
29+
# platform can attribute a worker's work across restarts.
30+
uuid = { version = "1", features = ["v7"] }
2831
purecrypto = { version = "0.6.7", features = ["hazmat-secp256k1"] }
2932
# Pinned to the same major as klbfw's rsurl (0.1) — two rsurl versions in one binary
3033
# collide on its C FFI symbols (duplicate-symbol link error).

src/main.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use clap::Parser;
4747
use klbfw::{Config, RestContext};
4848
use serde::Deserialize;
4949
use serde_json::{Value, json};
50+
use uuid::Uuid;
5051

5152
/// Run as a worker: claim Decrypt/Job fragments, run the kernel, submit results.
5253
#[derive(Parser)]
@@ -533,11 +534,16 @@ impl Drop for RunGuard {
533534
fn claim_and_fetch(
534535
args: &RunArgs,
535536
downloads: &Downloads,
537+
worker_id: &str,
536538
ctx: &RestContext,
537539
inflight: &InFlight,
538540
) -> Result<Option<ReadyJob>> {
539541
let resp = ctx
540-
.do_request("Decrypt/Job:pullOne", "POST", json!({}))
542+
.do_request(
543+
"Decrypt/Job:pullOne",
544+
"POST",
545+
json!({ "worker": worker_id }),
546+
)
541547
.map_err(|e| anyhow!("Decrypt/Job:pullOne: {e}"))?;
542548
// pullOne returns null data when there are no open jobs with fragments to issue.
543549
let Some(data) = resp.raw().filter(|v| !v.is_null()) else {
@@ -760,6 +766,7 @@ fn submit_job(ctx: &RestContext, response_key: &str, job: &FinishedJob) -> Resul
760766
fn prefetch_loop(
761767
args: Arc<RunArgs>,
762768
downloads: Downloads,
769+
worker_id: String,
763770
ctx: RestContext,
764771
inflight: InFlight,
765772
ready: SyncSender<ReadyJob>,
@@ -768,7 +775,7 @@ fn prefetch_loop(
768775
loop {
769776
// Don't claim new work while paused.
770777
status.wait_while_paused();
771-
match claim_and_fetch(&args, &downloads, &ctx, &inflight) {
778+
match claim_and_fetch(&args, &downloads, &worker_id, &ctx, &inflight) {
772779
Ok(Some(job)) => {
773780
if ready.send(job).is_err() {
774781
return; // pipeline shut down
@@ -925,11 +932,31 @@ fn select_gpus(spec: &Option<String>, count: i32) -> Result<Vec<i32>> {
925932
Ok(ords)
926933
}
927934

935+
/// Load this worker's persistent id from `<workdir>/worker-id`, or mint a new
936+
/// UUIDv7 and save it. Sent as `worker=<id>` on every pullOne so the platform can
937+
/// attribute a worker's fragments across restarts. Any non-empty existing content
938+
/// is kept verbatim (so an operator can pin a chosen id); only a missing or blank
939+
/// file triggers generation.
940+
fn load_or_create_worker_id(workdir: &Path) -> Result<String> {
941+
let path = workdir.join("worker-id");
942+
if let Ok(existing) = std::fs::read_to_string(&path) {
943+
let existing = existing.trim();
944+
if !existing.is_empty() {
945+
return Ok(existing.to_string());
946+
}
947+
}
948+
let id = Uuid::now_v7().to_string();
949+
std::fs::write(&path, &id).with_context(|| format!("writing {}", path.display()))?;
950+
eprintln!("[decryptd] generated worker id {id} ({})", path.display());
951+
Ok(id)
952+
}
953+
928954
fn run_worker(args: RunArgs, status: Status) -> Result<()> {
929955
std::fs::create_dir_all(&args.workdir)?;
930956
let ctx = RestContext::with_config(Config::new("https".to_string(), args.host.clone()))
931957
.with_debug(std::env::var("DECRYPTD_DEBUG").is_ok());
932958
let jobs = args.jobs.max(1);
959+
let worker_id = load_or_create_worker_id(&args.workdir)?;
933960

934961
let count = cuda::device_count().map_err(|e| anyhow!("enumerating GPUs: {e}"))?;
935962
if count < 1 {
@@ -943,7 +970,7 @@ fn run_worker(args: RunArgs, status: Status) -> Result<()> {
943970
if args.once {
944971
// Single fragment on the first selected GPU.
945972
let ord = gpus[0];
946-
return match claim_and_fetch(&args, &downloads, &ctx, &inflight)? {
973+
return match claim_and_fetch(&args, &downloads, &worker_id, &ctx, &inflight)? {
947974
Some(job) => {
948975
let key = inflight.lock().unwrap().get(&job.frag_id).cloned();
949976
let key = key.ok_or_else(|| anyhow!("fragment lost its response key"))?;
@@ -957,7 +984,7 @@ fn run_worker(args: RunArgs, status: Status) -> Result<()> {
957984
}
958985

959986
eprintln!(
960-
"[decryptd] host={} GPUs={gpus:?} jobs={jobs}/GPU ({} runner(s) total)",
987+
"[decryptd] host={} worker={worker_id} GPUs={gpus:?} jobs={jobs}/GPU ({} runner(s) total)",
961988
args.host,
962989
gpus.len() * jobs
963990
);
@@ -976,14 +1003,17 @@ fn run_worker(args: RunArgs, status: Status) -> Result<()> {
9761003
let done_rx = Arc::new(Mutex::new(done_rx));
9771004

9781005
{
979-
let (args, downloads, ctx, inflight, status) = (
1006+
let (args, downloads, ctx, inflight, status, worker_id) = (
9801007
args.clone(),
9811008
downloads.clone(),
9821009
ctx.clone(),
9831010
inflight.clone(),
9841011
status.clone(),
1012+
worker_id.clone(),
9851013
);
986-
thread::spawn(move || prefetch_loop(args, downloads, ctx, inflight, ready_tx, status));
1014+
thread::spawn(move || {
1015+
prefetch_loop(args, downloads, worker_id, ctx, inflight, ready_tx, status)
1016+
});
9871017
}
9881018
for _ in 0..jobs {
9891019
let (ctx, inflight, done_rx) = (ctx.clone(), inflight.clone(), done_rx.clone());
@@ -1043,6 +1073,27 @@ mod tests {
10431073
assert_eq!(bytes, b"hello");
10441074
}
10451075

1076+
#[test]
1077+
fn worker_id_persists_and_is_reused() {
1078+
let dir = std::env::temp_dir().join(format!("decryptd-wid-{}", std::process::id()));
1079+
let _ = std::fs::remove_dir_all(&dir);
1080+
std::fs::create_dir_all(&dir).unwrap();
1081+
1082+
// First call mints a UUIDv7 and writes it.
1083+
let id1 = load_or_create_worker_id(&dir).unwrap();
1084+
assert_eq!(id1.len(), 36, "UUID string form");
1085+
assert_eq!(Uuid::parse_str(&id1).unwrap().get_version_num(), 7);
1086+
// Second call returns the same persisted id (no regeneration).
1087+
let id2 = load_or_create_worker_id(&dir).unwrap();
1088+
assert_eq!(id1, id2);
1089+
1090+
// A pinned (non-UUID) id is honored verbatim, trimmed of whitespace.
1091+
std::fs::write(dir.join("worker-id"), " my-pinned-id\n").unwrap();
1092+
assert_eq!(load_or_create_worker_id(&dir).unwrap(), "my-pinned-id");
1093+
1094+
let _ = std::fs::remove_dir_all(&dir);
1095+
}
1096+
10461097
#[test]
10471098
fn pause_gate_blocks_until_resumed() {
10481099
let status = Status::default();

0 commit comments

Comments
 (0)