-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathembed-pool.js
More file actions
146 lines (132 loc) · 4.4 KB
/
embed-pool.js
File metadata and controls
146 lines (132 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* Encapsulates embed-worker lifecycle management.
*
* @param {() => import("node:events").EventEmitter} workerFactory Creates a worker-like object.
* @param {object} [opts]
* @param {number} [opts.embedTimeout=60000] Per-embed timeout in ms.
* @param {number} [opts.restartDelay=2000] Delay before restarting a crashed worker.
* @param {number} [opts.workerReadyTimeout=30000] Max time to wait for a restarting worker.
* @param {number} [opts.maxRestartDelay=60000] Backoff cap for repeated restart failures.
*/
export function createEmbedPool(workerFactory, opts = {}) {
const EMBED_TIMEOUT_MS = opts.embedTimeout ?? 60_000;
const RESTART_DELAY_MS = opts.restartDelay ?? 2000;
const WORKER_READY_TIMEOUT_MS = opts.workerReadyTimeout ?? 30_000;
const MAX_RESTART_DELAY_MS = opts.maxRestartDelay ?? 60_000;
let worker = null;
let workerAlive = false;
let shuttingDown = false;
let embedIdCounter = 0;
let currentRestartDelay = RESTART_DELAY_MS;
const pendingEmbeds = new Map();
let workerReadyResolve = null;
let workerReadyPromise = null;
function rejectAllPending(reason) {
for (const [, { reject, timer }] of pendingEmbeds) {
clearTimeout(timer);
reject(new Error(reason));
}
pendingEmbeds.clear();
}
let restartTimer = null;
function scheduleRestart(code) {
if (shuttingDown) return;
const delay = currentRestartDelay;
process.stderr.write(`[vector-memory] Worker exited (code ${code}) — restarting in ${delay}ms\n`);
workerReadyPromise = new Promise(resolve => { workerReadyResolve = resolve; });
restartTimer = setTimeout(() => {
restartTimer = null;
if (shuttingDown) return;
try {
initWorker();
currentRestartDelay = RESTART_DELAY_MS;
} catch (err) {
process.stderr.write(`[vector-memory] Worker restart failed: ${err.message}\n`);
currentRestartDelay = Math.min(currentRestartDelay * 2, MAX_RESTART_DELAY_MS);
scheduleRestart(code);
return;
}
if (workerReadyResolve) {
workerReadyResolve();
workerReadyResolve = null;
}
}, delay);
}
function initWorker() {
worker = workerFactory();
workerAlive = true;
workerReadyPromise = null;
worker.on("message", (msg) => {
if (msg.type === "ready") return;
if (msg.type === "error") {
process.stderr.write(`[vector-memory] Embedding model error: ${msg.message}\n`);
return;
}
const pending = pendingEmbeds.get(msg.id);
if (pending) {
clearTimeout(pending.timer);
pendingEmbeds.delete(msg.id);
pending.resolve(msg.embedding);
}
});
worker.on("error", (err) => {
process.stderr.write(`[vector-memory] Worker crashed: ${err.message}\n`);
workerAlive = false;
rejectAllPending("Worker crashed: " + err.message);
});
worker.on("exit", (code) => {
workerAlive = false;
rejectAllPending("Worker exited with code " + code);
scheduleRestart(code);
});
}
async function embed(text) {
if (!workerAlive && workerReadyPromise) {
let timeoutId;
const timeout = new Promise((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error("Embed worker restart timed out")),
WORKER_READY_TIMEOUT_MS
);
});
try {
await Promise.race([workerReadyPromise, timeout]);
} finally {
clearTimeout(timeoutId);
}
}
if (!workerAlive) {
throw new Error("Embed worker is not running");
}
return new Promise((resolve, reject) => {
const id = embedIdCounter++;
const timer = setTimeout(() => {
pendingEmbeds.delete(id);
reject(new Error("Embedding timed out after " + EMBED_TIMEOUT_MS + "ms"));
}, EMBED_TIMEOUT_MS);
pendingEmbeds.set(id, { resolve, reject, timer });
try {
worker.postMessage({ id, text });
} catch (err) {
clearTimeout(timer);
pendingEmbeds.delete(id);
reject(err);
}
});
}
function shutdown() {
shuttingDown = true;
clearTimeout(restartTimer);
rejectAllPending("Pool shutting down");
if (worker) {
worker.terminate();
worker = null;
workerAlive = false;
}
if (workerReadyResolve) {
workerReadyResolve();
workerReadyResolve = null;
}
}
return { embed, initWorker, shutdown, isAlive: () => workerAlive };
}