Skip to content

Commit d89425c

Browse files
MikeyJensenCopilot
andcommitted
fix: reject in-flight embeds on code-0 exit + exponential backoff for failed restarts
- rejectAllPending now fires on ALL exit codes (code-0 previously orphaned in-flight embeds until the 60s embed timeout) - Failed workerFactory() calls in scheduleRestart now retry with exponential backoff (delay doubles each failure, capped at maxRestartDelay) - Backoff resets to base delay after a successful restart - Removed unreachable shuttingDown guard inside setTimeout callback (clearTimeout in shutdown() already prevents the callback from firing) 7 new tests (68 total), 100% line/branch/function coverage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 82f4e72 commit d89425c

File tree

2 files changed

+209
-19
lines changed

2 files changed

+209
-19
lines changed

embed-pool.js

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,19 @@
66
* @param {number} [opts.embedTimeout=60000] Per-embed timeout in ms.
77
* @param {number} [opts.restartDelay=2000] Delay before restarting a crashed worker.
88
* @param {number} [opts.workerReadyTimeout=30000] Max time to wait for a restarting worker.
9+
* @param {number} [opts.maxRestartDelay=60000] Backoff cap for repeated restart failures.
910
*/
1011
export function createEmbedPool(workerFactory, opts = {}) {
1112
const EMBED_TIMEOUT_MS = opts.embedTimeout ?? 60_000;
1213
const RESTART_DELAY_MS = opts.restartDelay ?? 2000;
1314
const WORKER_READY_TIMEOUT_MS = opts.workerReadyTimeout ?? 30_000;
15+
const MAX_RESTART_DELAY_MS = opts.maxRestartDelay ?? 60_000;
1416

1517
let worker = null;
1618
let workerAlive = false;
1719
let shuttingDown = false;
1820
let embedIdCounter = 0;
21+
let currentRestartDelay = RESTART_DELAY_MS;
1922
const pendingEmbeds = new Map();
2023

2124
let workerReadyResolve = null;
@@ -33,20 +36,25 @@ export function createEmbedPool(workerFactory, opts = {}) {
3336

3437
function scheduleRestart(code) {
3538
if (shuttingDown) return;
36-
process.stderr.write(`[vector-memory] Worker exited (code ${code}) — restarting in ${RESTART_DELAY_MS}ms\n`);
39+
const delay = currentRestartDelay;
40+
process.stderr.write(`[vector-memory] Worker exited (code ${code}) — restarting in ${delay}ms\n`);
3741
workerReadyPromise = new Promise(resolve => { workerReadyResolve = resolve; });
3842
restartTimer = setTimeout(() => {
3943
restartTimer = null;
4044
try {
4145
initWorker();
46+
currentRestartDelay = RESTART_DELAY_MS;
4247
} catch (err) {
4348
process.stderr.write(`[vector-memory] Worker restart failed: ${err.message}\n`);
49+
currentRestartDelay = Math.min(currentRestartDelay * 2, MAX_RESTART_DELAY_MS);
50+
scheduleRestart(code);
51+
return;
4452
}
4553
if (workerReadyResolve) {
4654
workerReadyResolve();
4755
workerReadyResolve = null;
4856
}
49-
}, RESTART_DELAY_MS);
57+
}, delay);
5058
}
5159

5260
function initWorker() {
@@ -76,15 +84,12 @@ export function createEmbedPool(workerFactory, opts = {}) {
7684

7785
worker.on("exit", (code) => {
7886
workerAlive = false;
79-
if (code !== 0) {
80-
rejectAllPending("Worker exited with code " + code);
81-
}
87+
rejectAllPending("Worker exited with code " + code);
8288
scheduleRestart(code);
8389
});
8490
}
8591

8692
async function embed(text) {
87-
// If the worker is down but a restart is pending, wait for it
8893
if (!workerAlive && workerReadyPromise) {
8994
let timeoutId;
9095
const timeout = new Promise((_, reject) => {

test.js

Lines changed: 198 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -664,32 +664,27 @@ describe("createEmbedPool", () => {
664664
pool.shutdown();
665665
});
666666

667-
it("BUG: workerFactory() throwing in scheduleRestart leaves pool permanently hung", async () => {
667+
it("BUG: workerFactory() throwing in scheduleRestart retries with backoff", async () => {
668668
let callCount = 0;
669669
const workers = [];
670670
const factory = () => {
671671
callCount++;
672-
if (callCount >= 2) throw new Error("Worker constructor exploded");
672+
if (callCount === 2) throw new Error("Worker constructor exploded");
673673
const w = new MockWorker();
674674
workers.push(w);
675675
return w;
676676
};
677-
const pool = createEmbedPool(factory, { restartDelay: 50, workerReadyTimeout: 200 });
677+
const pool = createEmbedPool(factory, { restartDelay: 50, workerReadyTimeout: 2000, maxRestartDelay: 200 });
678678
pool.initWorker();
679679

680-
// Worker exits cleanly — scheduleRestart fires, but second initWorker() throws
680+
// Worker exits cleanly — first restart throws, second should succeed via backoff
681681
workers[0].emit("exit", 0);
682682

683-
// Wait for the restart attempt to fire and throw
684-
await new Promise(r => setTimeout(r, 100));
685-
686-
// Now try to embed — should fail fast, NOT hang until workerReadyTimeout
687-
const start = Date.now();
688-
await assert.rejects(() => pool.embed("hello"), /not running|constructor exploded/i);
689-
const elapsed = Date.now() - start;
683+
// Wait for retry
684+
await new Promise(r => setTimeout(r, 500));
690685

691-
// If this takes close to workerReadyTimeout (200ms), the promise was stuck
692-
assert.ok(elapsed < 150, `embed() took ${elapsed}ms — pool is hung on a never-resolving promise`);
686+
assert.ok(callCount >= 3, `Expected at least 3 factory calls, got ${callCount}`);
687+
assert.equal(pool.isAlive(), true, "pool should recover via backoff retry");
693688
pool.shutdown();
694689
});
695690

@@ -730,4 +725,194 @@ describe("createEmbedPool", () => {
730725
assert.ok(result instanceof Error);
731726
assert.match(result.message, /not running|shutting down/i);
732727
});
728+
729+
// === BUG: code-0 exit orphans in-flight embeds ===
730+
731+
it("BUG: code-0 exit rejects in-flight embeds (not orphaned for 60s)", async () => {
732+
const factory = mockWorkerFactory();
733+
const pool = createEmbedPool(factory, { embedTimeout: 5000, restartDelay: 50 });
734+
pool.initWorker();
735+
736+
// Start an embed (worker won't respond)
737+
const embedPromise = pool.embed("hello").catch(e => e);
738+
await new Promise(r => setTimeout(r, 10));
739+
740+
// Worker exits cleanly — in-flight embed should be rejected promptly
741+
const start = Date.now();
742+
factory.workers[0].emit("exit", 0);
743+
744+
const result = await embedPromise;
745+
const elapsed = Date.now() - start;
746+
747+
assert.ok(result instanceof Error, "in-flight embed should have been rejected");
748+
assert.ok(elapsed < 1000,
749+
`embed took ${elapsed}ms to reject — orphaned until embedTimeout instead of rejected on exit`);
750+
pool.shutdown();
751+
});
752+
753+
// === BUG: failed restart = permanent death (no retry) ===
754+
755+
it("BUG: failed restart retries with backoff instead of dying permanently", async () => {
756+
let callCount = 0;
757+
const workers = [];
758+
const factory = () => {
759+
callCount++;
760+
// Fail on attempts 2 and 3, succeed on attempt 4
761+
if (callCount >= 2 && callCount <= 3) throw new Error("ONNX load failed");
762+
const w = new MockWorker();
763+
workers.push(w);
764+
return w;
765+
};
766+
const pool = createEmbedPool(factory, {
767+
restartDelay: 30,
768+
workerReadyTimeout: 5000,
769+
maxRestartDelay: 200,
770+
});
771+
pool.initWorker();
772+
773+
// Worker exits — first restart attempt will fail, second will fail, third should succeed
774+
workers[0].emit("exit", 0);
775+
776+
// Wait for backoff retries to play out
777+
await new Promise(r => setTimeout(r, 1500));
778+
779+
assert.ok(callCount >= 4,
780+
`Expected at least 4 factory calls (1 init + 2 failures + 1 success), got ${callCount}`);
781+
assert.equal(pool.isAlive(), true, "pool should have recovered after transient failures");
782+
pool.shutdown();
783+
});
784+
785+
it("POSITIVE: backoff delay increases on consecutive failures", async () => {
786+
let callCount = 0;
787+
const timestamps = [];
788+
const workers = [];
789+
const factory = () => {
790+
callCount++;
791+
timestamps.push(Date.now());
792+
if (callCount >= 2) throw new Error("still broken");
793+
const w = new MockWorker();
794+
workers.push(w);
795+
return w;
796+
};
797+
const pool = createEmbedPool(factory, {
798+
restartDelay: 50,
799+
workerReadyTimeout: 5000,
800+
maxRestartDelay: 400,
801+
});
802+
pool.initWorker();
803+
804+
workers[0].emit("exit", 0);
805+
// Wait for several backoff attempts
806+
await new Promise(r => setTimeout(r, 2000));
807+
808+
// Should have multiple attempts with increasing gaps
809+
assert.ok(callCount >= 4, `Expected at least 4 attempts, got ${callCount}`);
810+
811+
// Verify delays are increasing (backoff)
812+
for (let i = 2; i < timestamps.length - 1; i++) {
813+
const gap1 = timestamps[i] - timestamps[i - 1];
814+
const gap2 = timestamps[i + 1] - timestamps[i];
815+
assert.ok(gap2 >= gap1 * 0.8, // allow 20% timing jitter
816+
`Expected increasing delays but gap ${i}: ${gap1}ms, gap ${i+1}: ${gap2}ms`);
817+
}
818+
pool.shutdown();
819+
});
820+
821+
it("POSITIVE: backoff resets after a successful restart", async () => {
822+
let callCount = 0;
823+
const workers = [];
824+
const factory = () => {
825+
callCount++;
826+
// Fail on second call, succeed on all others
827+
if (callCount === 2) throw new Error("transient failure");
828+
const w = new MockWorker();
829+
workers.push(w);
830+
return w;
831+
};
832+
const pool = createEmbedPool(factory, {
833+
restartDelay: 30,
834+
workerReadyTimeout: 5000,
835+
maxRestartDelay: 500,
836+
});
837+
pool.initWorker();
838+
839+
// First exit → restart fails → retries → succeeds
840+
workers[0].emit("exit", 0);
841+
await new Promise(r => setTimeout(r, 500));
842+
843+
assert.equal(pool.isAlive(), true, "pool should have recovered");
844+
const secondWorkerIdx = workers.length - 1;
845+
846+
// Second exit → should restart with initial delay (not backoff from previous failure)
847+
workers[secondWorkerIdx].emit("exit", 0);
848+
await new Promise(r => setTimeout(r, 200));
849+
850+
assert.equal(pool.isAlive(), true, "pool should restart at base delay after prior success");
851+
pool.shutdown();
852+
});
853+
854+
it("POSITIVE: backoff caps at maxRestartDelay", async () => {
855+
let callCount = 0;
856+
const timestamps = [];
857+
const workers = [];
858+
const factory = () => {
859+
callCount++;
860+
timestamps.push(Date.now());
861+
if (callCount >= 2) throw new Error("permanently broken");
862+
const w = new MockWorker();
863+
workers.push(w);
864+
return w;
865+
};
866+
const pool = createEmbedPool(factory, {
867+
restartDelay: 50,
868+
maxRestartDelay: 150,
869+
});
870+
pool.initWorker();
871+
workers[0].emit("exit", 0);
872+
873+
await new Promise(r => setTimeout(r, 2000));
874+
875+
// All gaps after the first few should be capped at ~150ms
876+
const gaps = [];
877+
for (let i = 1; i < timestamps.length; i++) {
878+
gaps.push(timestamps[i] - timestamps[i - 1]);
879+
}
880+
// The last few gaps should all be ≤ maxRestartDelay + jitter
881+
const lastGaps = gaps.slice(-3);
882+
for (const gap of lastGaps) {
883+
assert.ok(gap <= 250,
884+
`Gap ${gap}ms exceeds maxRestartDelay (150ms) + reasonable jitter`);
885+
}
886+
pool.shutdown();
887+
});
888+
889+
it("shutdown cancels pending restart timer", async () => {
890+
const factory = mockWorkerFactory();
891+
const pool = createEmbedPool(factory, { restartDelay: 50 });
892+
pool.initWorker();
893+
894+
// Trigger exit — restart timer starts
895+
factory.workers[0].emit("exit", 1);
896+
897+
// Shutdown immediately — should cancel the restart
898+
pool.shutdown();
899+
900+
// Wait for timer that would have fired
901+
await new Promise(r => setTimeout(r, 100));
902+
903+
// Should NOT have created a second worker
904+
assert.equal(factory.workers.length, 1, "shutdown should cancel pending restart");
905+
});
906+
907+
it("uses default options when none provided", async () => {
908+
const factory = mockWorkerFactory();
909+
const pool = createEmbedPool(factory);
910+
pool.initWorker();
911+
912+
const p = pool.embed("test");
913+
const msg = factory.workers[0].messages[0];
914+
factory.workers[0].emit("message", { id: msg.id, embedding: Buffer.from([1]) });
915+
await p;
916+
pool.shutdown();
917+
});
733918
});

0 commit comments

Comments
 (0)