Skip to content

Commit 7afc9eb

Browse files
fix(worker-pool): clear parse timeouts so --full exits promptly (#146)
* fix(worker-pool): clear parse timeouts so --full exits promptly Promise.race with uncancellable delay() left 120s timers on the event loop after workers finished. Replace with explicit setTimeout + clearTimeout on worker response and dispose. * test(worker-pool): subprocess exit guard for timer leak Review follow-up: in-process wall time did not catch orphaned event-loop timers. Spawn a child that runs parseFilesParallel and assert it exits in <5s. Export INLINE_PARSE_MAX for fixture gate; remove unused delay(). * test(worker-pool): fail fast when subprocess hangs on timer leak Race child exit against 6s and kill on hang so regressions fail with a clear assertion instead of burning the full test timeout. * test(ci): tighten exit-delay guards Drop redundant 5s elapsed assertion; hang race is the regression signal. Wrap Node dist --full smoke in timeout 45 so a timer leak fails CI fast. * test(worker-pool): address review nits before merge Stop exporting INLINE_PARSE_MAX; mirror the threshold in tests. Add node dist --full subprocess exit regression (skips when dist/ absent).
1 parent 01d4931 commit 7afc9eb

7 files changed

Lines changed: 216 additions & 66 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@stainless-code/codemap": patch
3+
---
4+
5+
Fix `codemap --full` (and other worker-pool parses) appearing to hang ~120s after stats print — clear parse timeout timers when workers respond instead of leaving orphaned `setTimeout` handles on the event loop.

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,10 @@ jobs:
131131
# Full rebuild exercises multi-statement DDL (`runSql`), `dropAll`, workers path — Node only.
132132
- name: Node full index (fixtures/minimal)
133133
run: |
134+
set -euo pipefail
134135
export CODEMAP_ROOT="$GITHUB_WORKSPACE/fixtures/minimal"
135136
rm -f "$CODEMAP_ROOT/.codemap.db"
136-
node dist/index.mjs --full
137+
timeout 45 node dist/index.mjs --full
137138
node dist/index.mjs query "SELECT COUNT(*) AS files FROM files"
138139
139140
benchmark:

docs/architecture.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ The full rebuild (`--full`) applies several optimizations that are not safe for
771771

772772
### Worker thread parallelism
773773

774-
File I/O and parsing dominate full rebuild time. The indexer spawns N worker threads (capped at CPU count, min 2, max 6; override **`CODEMAP_PARSE_WORKERS`**, max 32) via `parse-worker.ts`. Each worker receives a chunk of file paths, reads files from disk, and runs the appropriate parser (oxc-parser, lightningcss, or regex). Per-file parse budget: **`CODEMAP_PARSE_TIMEOUT_MS`** when set, else 10s + ~1ms per 50KB file size capped at 30s (`parse-timeout.ts`). Workers recycle after **`CODEMAP_WORKER_RECYCLE_EVERY`** files (default 250) to limit memory growth. Timeouts and other per-file failures append to `<state-dir>/errors.log` without aborting the run. Workers return structured `ParsedFile` results to the main thread, which handles import resolution and database inserts serially.
774+
File I/O and parsing dominate full rebuild time. The indexer spawns N worker threads (capped at CPU count, min 2, max 6; override **`CODEMAP_PARSE_WORKERS`**, max 32) via `parse-worker.ts`. Each worker receives a chunk of file paths, reads files from disk, and runs the appropriate parser (oxc-parser, lightningcss, or regex). Per-file parse budget: **`CODEMAP_PARSE_TIMEOUT_MS`** when set, else 10s + ~1ms per 50KB file size capped at 30s (`parse-timeout.ts`); multi-file worker messages cap the sum at 120s. `worker-pool.ts` clears the parse timeout when the worker responds (or on `dispose`) so the process does not wait on orphaned timers after indexing finishes. Workers recycle after **`CODEMAP_WORKER_RECYCLE_EVERY`** files (default 250) to limit memory growth. Timeouts and other per-file failures append to `<state-dir>/errors.log` without aborting the run. Workers return structured `ParsedFile` results to the main thread, which handles import resolution and database inserts serially.
775775

776776
### Deferred index creation
777777

src/application/parse-timeout.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,3 @@ export class ParseTimeoutError extends Error {
4444
this.timeoutMs = timeoutMs;
4545
}
4646
}
47-
48-
export function delay(ms: number): Promise<void> {
49-
return new Promise((resolve) => setTimeout(resolve, ms));
50-
}

src/worker-pool.dist.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Node + dist regression for orphaned parse timers on `--full`.
3+
* Binary-level dist smoke also lives in `.github/workflows/ci.yml`.
4+
*/
5+
6+
import { describe, expect, test } from "bun:test";
7+
import { existsSync } from "node:fs";
8+
import { join } from "node:path";
9+
10+
import { globSync } from "./glob-sync";
11+
12+
const repoRoot = join(import.meta.dir, "..");
13+
const minimalRoot = join(repoRoot, "fixtures", "minimal");
14+
const distEntry = join(repoRoot, "dist", "index.mjs");
15+
/** Mirrors `INLINE_PARSE_MAX` in `worker-pool.ts`. */
16+
const WORKER_POOL_INLINE_PARSE_MAX = 12;
17+
18+
async function expectSubprocessExits(
19+
spawn: () => ReturnType<typeof Bun.spawn>,
20+
): Promise<void> {
21+
const proc = spawn();
22+
const hangMs = 6_000;
23+
const exitOrHang = await Promise.race([
24+
proc.exited.then((code) => ({ kind: "exit" as const, code })),
25+
Bun.sleep(hangMs).then(async () => {
26+
proc.kill();
27+
await proc.exited;
28+
return { kind: "hang" as const };
29+
}),
30+
]);
31+
32+
expect(exitOrHang.kind).toBe("exit");
33+
if (exitOrHang.kind === "exit") {
34+
expect(exitOrHang.code).toBe(0);
35+
}
36+
}
37+
38+
describe("node dist --full exit delay", () => {
39+
test.skipIf(!existsSync(distEntry))(
40+
"subprocess exits promptly after node dist --full (no orphaned timers)",
41+
async () => {
42+
const files = globSync(["**/*.ts", "**/*.tsx", "**/*.css"], minimalRoot);
43+
expect(files.length).toBeGreaterThan(WORKER_POOL_INLINE_PARSE_MAX);
44+
45+
await expectSubprocessExits(() =>
46+
Bun.spawn(["node", distEntry, "--full"], {
47+
cwd: repoRoot,
48+
env: { ...process.env, CODEMAP_ROOT: minimalRoot },
49+
stdout: "ignore",
50+
stderr: "pipe",
51+
}),
52+
);
53+
},
54+
8_000,
55+
);
56+
});

src/worker-pool.test.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
import { describe, expect, test } from "bun:test";
2+
import { join } from "node:path";
23

4+
import { resolveCodemapConfig } from "./config";
5+
import { globSync } from "./glob-sync";
6+
import { initCodemap } from "./runtime";
37
import {
48
parseFilesParallel,
59
parseParseWorkerCountOverride,
610
parseWorkerRecycleEvery,
711
} from "./worker-pool";
812

13+
const repoRoot = join(import.meta.dir, "..");
14+
const minimalRoot = join(repoRoot, "fixtures", "minimal");
15+
/** Mirrors `INLINE_PARSE_MAX` in `worker-pool.ts`. */
16+
const WORKER_POOL_INLINE_PARSE_MAX = 12;
17+
918
describe("parseParseWorkerCountOverride", () => {
1019
test("accepts valid decimal integers", () => {
1120
expect(parseParseWorkerCountOverride("2")).toBe(2);
@@ -41,4 +50,56 @@ describe("parseFilesParallel", () => {
4150
test("resolves immediately for an empty file list", async () => {
4251
await expect(parseFilesParallel([])).resolves.toEqual([]);
4352
});
53+
54+
test("returns parsed results via the worker pool path", async () => {
55+
initCodemap(resolveCodemapConfig(minimalRoot, undefined));
56+
const files = globSync(["**/*.ts", "**/*.tsx", "**/*.css"], minimalRoot);
57+
expect(files.length).toBeGreaterThan(WORKER_POOL_INLINE_PARSE_MAX);
58+
59+
const results = await parseFilesParallel(files);
60+
expect(results.length).toBe(files.length);
61+
});
62+
63+
test("subprocess exits promptly after worker-pool parse (no orphaned timers)", async () => {
64+
const files = globSync(["**/*.ts", "**/*.tsx", "**/*.css"], minimalRoot);
65+
expect(files.length).toBeGreaterThan(WORKER_POOL_INLINE_PARSE_MAX);
66+
67+
const script = `
68+
import { resolveCodemapConfig } from "./src/config.ts";
69+
import { globSync } from "./src/glob-sync.ts";
70+
import { initCodemap } from "./src/runtime.ts";
71+
import { parseFilesParallel } from "./src/worker-pool.ts";
72+
73+
const inlineParseMax = ${WORKER_POOL_INLINE_PARSE_MAX};
74+
const root = ${JSON.stringify(minimalRoot)};
75+
initCodemap(resolveCodemapConfig(root, undefined));
76+
const files = globSync(["**/*.ts", "**/*.tsx", "**/*.css"], root);
77+
if (files.length <= inlineParseMax) {
78+
throw new Error("fixture too small for worker-pool path");
79+
}
80+
await parseFilesParallel(files);
81+
`;
82+
83+
const proc = Bun.spawn([process.execPath, "-e", script], {
84+
cwd: repoRoot,
85+
stdout: "ignore",
86+
stderr: "pipe",
87+
});
88+
const hangMs = 6_000;
89+
const exitOrHang = await Promise.race([
90+
proc.exited.then((code) => ({ kind: "exit" as const, code })),
91+
Bun.sleep(hangMs).then(async () => {
92+
proc.kill();
93+
await proc.exited;
94+
return { kind: "hang" as const };
95+
}),
96+
]);
97+
const stderr = await new Response(proc.stderr).text();
98+
99+
expect(exitOrHang.kind).toBe("exit");
100+
if (exitOrHang.kind === "exit") {
101+
expect(exitOrHang.code).toBe(0);
102+
}
103+
expect(stderr).toBe("");
104+
}, 8_000);
44105
});

src/worker-pool.ts

Lines changed: 91 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { Worker as NodeWorker } from "node:worker_threads";
77
import {
88
ParseTimeoutError,
99
computeParseTimeoutMs,
10-
delay,
1110
parseParseTimeoutMsOverride,
1211
} from "./application/parse-timeout";
1312
import { CODEMAP_BUILD_OUTPUT_DIR } from "./build-output";
@@ -89,60 +88,106 @@ interface ParseWorkerSession {
8988
dispose(): void;
9089
}
9190

91+
interface PendingParse {
92+
gen: number;
93+
resolve: (value: WorkerOutput) => void;
94+
reject: (err: Error) => void;
95+
clearTimer: () => void;
96+
}
97+
9298
function createParseWorkerSession(): ParseWorkerSession {
9399
let generation = 0;
94-
let pending:
95-
| {
96-
gen: number;
97-
resolve: (value: WorkerOutput) => void;
98-
reject: (err: Error) => void;
99-
}
100-
| undefined;
100+
let pending: PendingParse | undefined;
101+
102+
const clearPending = (): void => {
103+
if (pending === undefined) return;
104+
pending.clearTimer();
105+
pending = undefined;
106+
};
107+
108+
const settleSuccess = (data: WorkerOutput): void => {
109+
if (!pending || pending.gen !== generation) return;
110+
const { resolve, clearTimer } = pending;
111+
clearTimer();
112+
pending = undefined;
113+
resolve(data);
114+
};
115+
116+
const settleError = (err: Error): void => {
117+
if (!pending || pending.gen !== generation) return;
118+
const { reject, clearTimer } = pending;
119+
clearTimer();
120+
pending = undefined;
121+
reject(err);
122+
};
123+
124+
const parseWithTimeout = (
125+
timeoutMs: number,
126+
postMessage: () => void,
127+
recycleWorker: () => void,
128+
): Promise<WorkerOutput> => {
129+
const gen = generation;
130+
return new Promise<WorkerOutput>((resolve, reject) => {
131+
let timer: ReturnType<typeof setTimeout> | undefined = setTimeout(() => {
132+
timer = undefined;
133+
if (!pending || pending.gen !== gen) return;
134+
pending = undefined;
135+
generation++;
136+
recycleWorker();
137+
reject(new ParseTimeoutError(timeoutMs));
138+
}, timeoutMs);
139+
140+
pending = {
141+
gen,
142+
resolve,
143+
reject,
144+
clearTimer: () => {
145+
if (timer !== undefined) {
146+
clearTimeout(timer);
147+
timer = undefined;
148+
}
149+
},
150+
};
151+
postMessage();
152+
});
153+
};
154+
155+
const disposeSession = (terminateWorker: () => void): void => {
156+
clearPending();
157+
generation++;
158+
terminateWorker();
159+
};
101160

102161
if (IS_BUN) {
103162
let worker = new Worker(WORKER_URL_BUN);
104163
const attach = (): void => {
105164
worker.onmessage = (event: MessageEvent<WorkerOutput>) => {
106-
if (!pending || pending.gen !== generation) return;
107-
const { resolve } = pending;
108-
pending = undefined;
109-
resolve(event.data);
165+
settleSuccess(event.data);
110166
};
111167
worker.onerror = (event: ErrorEvent) => {
112-
if (!pending || pending.gen !== generation) return;
113-
const { reject } = pending;
114-
pending = undefined;
115-
reject(event.error ?? new Error(event.message));
168+
settleError(event.error ?? new Error(event.message));
116169
};
117170
};
118171
attach();
119172

120173
return {
121174
parse(input, timeoutMs) {
122-
const gen = generation;
123-
return Promise.race([
124-
new Promise<WorkerOutput>((resolve, reject) => {
125-
pending = { gen, resolve, reject };
175+
return parseWithTimeout(
176+
timeoutMs,
177+
() => {
126178
worker.postMessage(input);
127-
}),
128-
delay(timeoutMs).then(() => {
129-
throw new ParseTimeoutError(timeoutMs);
130-
}),
131-
]).catch((err) => {
132-
if (err instanceof ParseTimeoutError) {
133-
generation++;
134-
pending = undefined;
179+
},
180+
() => {
135181
worker.terminate();
136182
worker = new Worker(WORKER_URL_BUN);
137183
attach();
138-
}
139-
throw err;
140-
});
184+
},
185+
);
141186
},
142187
dispose() {
143-
pending = undefined;
144-
generation++;
145-
worker.terminate();
188+
disposeSession(() => {
189+
worker.terminate();
190+
});
146191
},
147192
};
148193
}
@@ -153,48 +198,34 @@ function createParseWorkerSession(): ParseWorkerSession {
153198

154199
const attachNode = (): void => {
155200
worker.on("message", (data: WorkerOutput) => {
156-
if (!pending || pending.gen !== generation) return;
157-
const { resolve } = pending;
158-
pending = undefined;
159-
resolve(data);
201+
settleSuccess(data);
160202
});
161203
worker.on("error", (err: Error) => {
162-
if (!pending || pending.gen !== generation) return;
163-
const { reject } = pending;
164-
pending = undefined;
165-
reject(err);
204+
settleError(err);
166205
});
167206
};
168207
attachNode();
169208

170209
return {
171210
parse(input, timeoutMs) {
172-
const gen = generation;
173-
return Promise.race([
174-
new Promise<WorkerOutput>((resolve, reject) => {
175-
pending = { gen, resolve, reject };
211+
return parseWithTimeout(
212+
timeoutMs,
213+
() => {
176214
worker.postMessage(input);
177-
}),
178-
delay(timeoutMs).then(() => {
179-
throw new ParseTimeoutError(timeoutMs);
180-
}),
181-
]).catch((err) => {
182-
if (err instanceof ParseTimeoutError) {
183-
generation++;
184-
pending = undefined;
215+
},
216+
() => {
185217
void worker.terminate();
186218
worker = new NodeWorker(NODE_WORKER_PATH, {
187219
type: "module",
188220
} as import("node:worker_threads").WorkerOptions);
189221
attachNode();
190-
}
191-
throw err;
192-
});
222+
},
223+
);
193224
},
194225
dispose() {
195-
pending = undefined;
196-
generation++;
197-
void worker.terminate();
226+
disposeSession(() => {
227+
void worker.terminate();
228+
});
198229
},
199230
};
200231
}

0 commit comments

Comments
 (0)