Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions embed-pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Encapsulates embed-worker lifecycle management.
*
* @param {() => import("node:events").EventEmitter} workerFactory Creates a worker-like object.
* @param {object} [opts]
* @param {number} [opts.embedTimeout=60000] Per-embed timeout in ms.
* @param {number} [opts.restartDelay=2000] Delay before restarting a crashed worker.
* @param {number} [opts.workerReadyTimeout=30000] Max time to wait for a restarting worker.
* @param {number} [opts.maxRestartDelay=60000] Backoff cap for repeated restart failures.
*/
export function createEmbedPool(workerFactory, opts = {}) {
const EMBED_TIMEOUT_MS = opts.embedTimeout ?? 60_000;
const RESTART_DELAY_MS = opts.restartDelay ?? 2000;
const WORKER_READY_TIMEOUT_MS = opts.workerReadyTimeout ?? 30_000;
const MAX_RESTART_DELAY_MS = opts.maxRestartDelay ?? 60_000;

let worker = null;
let workerAlive = false;
let shuttingDown = false;
let embedIdCounter = 0;
let currentRestartDelay = RESTART_DELAY_MS;
const pendingEmbeds = new Map();

let workerReadyResolve = null;
let workerReadyPromise = null;

function rejectAllPending(reason) {
for (const [, { reject, timer }] of pendingEmbeds) {
clearTimeout(timer);
reject(new Error(reason));
}
pendingEmbeds.clear();
}

let restartTimer = null;

function scheduleRestart(code) {
if (shuttingDown) return;
const delay = currentRestartDelay;
process.stderr.write(`[vector-memory] Worker exited (code ${code}) — restarting in ${delay}ms\n`);
workerReadyPromise = new Promise(resolve => { workerReadyResolve = resolve; });
restartTimer = setTimeout(() => {
restartTimer = null;
if (shuttingDown) return;
try {
initWorker();
currentRestartDelay = RESTART_DELAY_MS;
} catch (err) {
process.stderr.write(`[vector-memory] Worker restart failed: ${err.message}\n`);
currentRestartDelay = Math.min(currentRestartDelay * 2, MAX_RESTART_DELAY_MS);
scheduleRestart(code);
return;
}
if (workerReadyResolve) {
workerReadyResolve();
workerReadyResolve = null;
}
}, delay);
}

function initWorker() {
worker = workerFactory();
workerAlive = true;
workerReadyPromise = null;

worker.on("message", (msg) => {
if (msg.type === "ready") return;
if (msg.type === "error") {
process.stderr.write(`[vector-memory] Embedding model error: ${msg.message}\n`);
return;
}
const pending = pendingEmbeds.get(msg.id);
if (pending) {
clearTimeout(pending.timer);
pendingEmbeds.delete(msg.id);
pending.resolve(msg.embedding);
}
});

worker.on("error", (err) => {
process.stderr.write(`[vector-memory] Worker crashed: ${err.message}\n`);
workerAlive = false;
rejectAllPending("Worker crashed: " + err.message);
});

worker.on("exit", (code) => {
workerAlive = false;
rejectAllPending("Worker exited with code " + code);
scheduleRestart(code);
});
}

async function embed(text) {
if (!workerAlive && workerReadyPromise) {
let timeoutId;
const timeout = new Promise((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error("Embed worker restart timed out")),
WORKER_READY_TIMEOUT_MS
);
});
try {
await Promise.race([workerReadyPromise, timeout]);
} finally {
clearTimeout(timeoutId);
}
}

if (!workerAlive) {
throw new Error("Embed worker is not running");
}

return new Promise((resolve, reject) => {
const id = embedIdCounter++;
const timer = setTimeout(() => {
pendingEmbeds.delete(id);
reject(new Error("Embedding timed out after " + EMBED_TIMEOUT_MS + "ms"));
}, EMBED_TIMEOUT_MS);
pendingEmbeds.set(id, { resolve, reject, timer });
try {
worker.postMessage({ id, text });
} catch (err) {
clearTimeout(timer);
pendingEmbeds.delete(id);
reject(err);
}
});
}

function shutdown() {
shuttingDown = true;
clearTimeout(restartTimer);
rejectAllPending("Pool shutting down");
if (worker) {
worker.terminate();
worker = null;
workerAlive = false;
}
if (workerReadyResolve) {
workerReadyResolve();
workerReadyResolve = null;
}
}

return { embed, initWorker, shutdown, isAlive: () => workerAlive };
}
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { request } from "http";
import { userInfo } from "os";

const __dirname = dirname(fileURLToPath(import.meta.url));
const COPILOT_DIR = join(homedir(), ".copilot");
const COPILOT_DIR = process.env.VECTOR_MEMORY_DATA_DIR || join(homedir(), ".copilot");
const EXPECTED_USER = userInfo().username;
const PKG = JSON.parse(readFileSync(join(__dirname, "package.json"), "utf-8"));

Expand All @@ -27,7 +27,7 @@ const PID_FILE = join(COPILOT_DIR, "vector-memory.pid");

const SERVER_DEPS_DIR = join(__dirname, ".server");
const SERVER_DEPS_JSON = join(__dirname, "server-deps.json");
const SERVER_FILES = ["vector-memory-server.js", "embed-worker.js", "lib.js"];
const SERVER_FILES = ["vector-memory-server.js", "embed-worker.js", "embed-pool.js", "lib.js"];

/** Check if server deps are available (either in node_modules or .server/) */
function serverDepsInstalled() {
Expand Down
3 changes: 3 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"index.js",
"vector-memory-server.js",
"embed-worker.js",
"embed-pool.js",
"lib.js",
"server-deps.json",
"LICENSE",
Expand Down Expand Up @@ -50,9 +51,10 @@
"offline"
],
"scripts": {
"lint": "eslint index.js vector-memory-server.js embed-worker.js lib.js",
"lint": "eslint index.js vector-memory-server.js embed-worker.js embed-pool.js lib.js",
"test": "node --test test.js",
"test:coverage": "node --test --experimental-test-coverage --test-coverage-lines=100 --test-coverage-branches=100 --test-coverage-functions=100 --test-coverage-exclude=test.js test.js",
"test:integration": "node --test test-integration.js",
"test:coverage": "node --test --experimental-test-coverage --test-coverage-lines=100 --test-coverage-branches=100 --test-coverage-functions=100 --test-coverage-exclude=test.js --test-coverage-exclude=test-integration.js test.js",
"check": "npm run lint && npm run test:coverage"
},
"dependencies": {
Expand Down
Loading
Loading