-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathworker-pool.mjs
More file actions
59 lines (50 loc) · 2.21 KB
/
Copy pathworker-pool.mjs
File metadata and controls
59 lines (50 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Worker pool over node:worker_threads. Lifecycle manager: spawns workers,
// sends them the scheduling SAB, forwards output/error/mainTaskReady
// messages to the scheduler, and terminates workers on destroy. Workers pull
// tasks from the SAB; the pool has no dispatch or queue logic.
import { Worker } from "node:worker_threads";
export class WorkerPool {
constructor(size, workerUrl) {
this._workerUrl = workerUrl;
this.bootTimings = [];
// Incremented each time sendInit() ships a fresh SAB to the workers.
// runBuild() reads `_buildCount > 0` to detect rebuilds when serve.mjs
// reuses the pool across builds.
this._buildCount = 0;
// Callbacks wired by the caller after construction.
this.onWorkerDone = null; // ({ done, output, timing, lane }) => void
this.onWorkerError = null; // ({ taskFailed, message, stack }) => void
this.onPerWorkerTiming = null; // ({ perWorkerTiming, taskIdx, timing, lane }) => void
this.onMainTaskReady = null; // () => void
this._workers = Array.from({ length: size }, (_, i) => this._spawn(i));
}
_spawn(lane) {
const spawnTime = Date.now();
const w = new Worker(this._workerUrl, { workerData: { lane, spawnTime } });
w.on("message", (msg) => {
if (msg.coldBoot) { this.bootTimings.push({ lane, type: "cold", ...msg.coldBoot }); return; }
if (msg.perWorkerTiming) { this.onPerWorkerTiming?.(msg); return; }
if (msg.done != null) { this.onWorkerDone?.(msg); return; }
if (msg.taskFailed != null) { this.onWorkerError?.(msg); return; }
if (msg.mainTaskReady || msg.triggerMainTask != null) { this.onMainTaskReady?.(); return; }
});
w.on("error", (err) => {
this.onWorkerError?.({ taskFailed: -1, message: err.message, stack: err.stack });
});
return w;
}
sendInit(sab, ctx, idMapping) {
for (const w of this._workers) {
w.postMessage({ init: true, sab, ctx, idMapping });
}
this._buildCount++;
}
broadcastDynamicData(payloadSAB, sharedSAB) {
for (const w of this._workers) {
w.postMessage({ dynamicData: true, payloadSAB, sharedSAB });
}
}
destroy() {
return Promise.all(this._workers.map(w => w.terminate()));
}
}