Skip to content

Commit 3d65628

Browse files
BrainSlugs83Copilot
andcommitted
fix: embed pool waits for worker restart, restarts on code-0 exit
Extract worker lifecycle management into embed-pool.js with: - embed() awaits pending restart instead of rejecting immediately - Worker restarts on ALL exit codes, not just non-zero - shuttingDown guard prevents restart after explicit shutdown - Configurable workerReadyTimeout for restart wait (default 30s) - Timer cleanup in rejectAllPending to prevent leaks Includes 8 new unit tests covering both bugs and fix behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 07890df commit 3d65628

File tree

6 files changed

+381
-77
lines changed

6 files changed

+381
-77
lines changed

embed-pool.js

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/**
2+
* Encapsulates embed-worker lifecycle management.
3+
*
4+
* @param {() => import("node:events").EventEmitter} workerFactory Creates a worker-like object.
5+
* @param {object} [opts]
6+
* @param {number} [opts.embedTimeout=60000] Per-embed timeout in ms.
7+
* @param {number} [opts.restartDelay=2000] Delay before restarting a crashed worker.
8+
* @param {number} [opts.workerReadyTimeout=30000] Max time to wait for a restarting worker.
9+
*/
10+
export function createEmbedPool(workerFactory, opts = {}) {
11+
const EMBED_TIMEOUT_MS = opts.embedTimeout ?? 60_000;
12+
const RESTART_DELAY_MS = opts.restartDelay ?? 2000;
13+
const WORKER_READY_TIMEOUT_MS = opts.workerReadyTimeout ?? 30_000;
14+
15+
let worker = null;
16+
let workerAlive = false;
17+
let shuttingDown = false;
18+
let embedIdCounter = 0;
19+
const pendingEmbeds = new Map();
20+
21+
let workerReadyResolve = null;
22+
let workerReadyPromise = null;
23+
24+
function rejectAllPending(reason) {
25+
for (const [, { reject, timer }] of pendingEmbeds) {
26+
clearTimeout(timer);
27+
reject(new Error(reason));
28+
}
29+
pendingEmbeds.clear();
30+
}
31+
32+
function scheduleRestart(code) {
33+
if (shuttingDown) return;
34+
process.stderr.write(`[vector-memory] Worker exited (code ${code}) — restarting in ${RESTART_DELAY_MS}ms\n`);
35+
workerReadyPromise = new Promise(resolve => { workerReadyResolve = resolve; });
36+
setTimeout(() => {
37+
initWorker();
38+
if (workerReadyResolve) {
39+
workerReadyResolve();
40+
workerReadyResolve = null;
41+
}
42+
}, RESTART_DELAY_MS);
43+
}
44+
45+
function initWorker() {
46+
worker = workerFactory();
47+
workerAlive = true;
48+
workerReadyPromise = null;
49+
50+
worker.on("message", (msg) => {
51+
if (msg.type === "ready") return;
52+
if (msg.type === "error") {
53+
process.stderr.write(`[vector-memory] Embedding model error: ${msg.message}\n`);
54+
return;
55+
}
56+
const pending = pendingEmbeds.get(msg.id);
57+
if (pending) {
58+
clearTimeout(pending.timer);
59+
pendingEmbeds.delete(msg.id);
60+
pending.resolve(msg.embedding);
61+
}
62+
});
63+
64+
worker.on("error", (err) => {
65+
process.stderr.write(`[vector-memory] Worker crashed: ${err.message}\n`);
66+
workerAlive = false;
67+
rejectAllPending("Worker crashed: " + err.message);
68+
});
69+
70+
worker.on("exit", (code) => {
71+
workerAlive = false;
72+
if (code !== 0) {
73+
rejectAllPending("Worker exited with code " + code);
74+
}
75+
scheduleRestart(code);
76+
});
77+
}
78+
79+
async function embed(text) {
80+
// If the worker is down but a restart is pending, wait for it
81+
if (!workerAlive && workerReadyPromise) {
82+
let timeoutId;
83+
const timeout = new Promise((_, reject) => {
84+
timeoutId = setTimeout(
85+
() => reject(new Error("Embed worker restart timed out")),
86+
WORKER_READY_TIMEOUT_MS
87+
);
88+
});
89+
try {
90+
await Promise.race([workerReadyPromise, timeout]);
91+
} finally {
92+
clearTimeout(timeoutId);
93+
}
94+
}
95+
96+
if (!workerAlive) {
97+
throw new Error("Embed worker is not running");
98+
}
99+
100+
return new Promise((resolve, reject) => {
101+
const id = embedIdCounter++;
102+
const timer = setTimeout(() => {
103+
pendingEmbeds.delete(id);
104+
reject(new Error("Embedding timed out after " + EMBED_TIMEOUT_MS + "ms"));
105+
}, EMBED_TIMEOUT_MS);
106+
pendingEmbeds.set(id, { resolve, reject, timer });
107+
try {
108+
worker.postMessage({ id, text });
109+
} catch (err) {
110+
clearTimeout(timer);
111+
pendingEmbeds.delete(id);
112+
reject(err);
113+
}
114+
});
115+
}
116+
117+
function shutdown() {
118+
shuttingDown = true;
119+
rejectAllPending("Pool shutting down");
120+
if (worker) {
121+
worker.terminate();
122+
worker = null;
123+
workerAlive = false;
124+
}
125+
if (workerReadyResolve) {
126+
workerReadyResolve();
127+
workerReadyResolve = null;
128+
}
129+
}
130+
131+
return { embed, initWorker, shutdown, isAlive: () => workerAlive };
132+
}

index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const PID_FILE = join(COPILOT_DIR, "vector-memory.pid");
2727

2828
const SERVER_DEPS_DIR = join(__dirname, ".server");
2929
const SERVER_DEPS_JSON = join(__dirname, "server-deps.json");
30-
const SERVER_FILES = ["vector-memory-server.js", "embed-worker.js", "lib.js"];
30+
const SERVER_FILES = ["vector-memory-server.js", "embed-worker.js", "embed-pool.js", "lib.js"];
3131

3232
/** Check if server deps are available (either in node_modules or .server/) */
3333
function serverDepsInstalled() {

package-lock.json

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"index.js",
1111
"vector-memory-server.js",
1212
"embed-worker.js",
13+
"embed-pool.js",
1314
"lib.js",
1415
"server-deps.json",
1516
"LICENSE",
@@ -50,7 +51,7 @@
5051
"offline"
5152
],
5253
"scripts": {
53-
"lint": "eslint index.js vector-memory-server.js embed-worker.js lib.js",
54+
"lint": "eslint index.js vector-memory-server.js embed-worker.js embed-pool.js lib.js",
5455
"test": "node --test test.js",
5556
"test:coverage": "node --test --experimental-test-coverage --test-coverage-lines=100 --test-coverage-branches=100 --test-coverage-functions=100 --test-coverage-exclude=test.js test.js",
5657
"check": "npm run lint && npm run test:coverage"

0 commit comments

Comments
 (0)