Skip to content

Commit 5585cbd

Browse files
committed
feat: persistent extraction server
1 parent 3996da4 commit 5585cbd

2 files changed

Lines changed: 224 additions & 70 deletions

File tree

packages/extractor/processor.ts

Lines changed: 149 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,113 @@ import { formatProgress, writeMultiLineProgress, type DocumentRecord } from "@do
66

77
const PYTHON_DIR = join(dirname(import.meta.path), "python");
88
const PYTHON_PATH = join(PYTHON_DIR, ".venv", "bin", "python");
9-
const SCRIPT_PATH = join(PYTHON_DIR, "extract.py");
9+
const SCRIPT_PATH = join(PYTHON_DIR, "extract_server.py");
10+
11+
/**
12+
* Persistent Python extraction worker.
13+
* Spawns one Python process and communicates via stdin/stdout JSON lines.
14+
*/
15+
class PersistentExtractor {
16+
private proc: ReturnType<typeof Bun.spawn> | null = null;
17+
private stdin: ReturnType<typeof Bun.spawn>["stdin"] | null = null;
18+
private initialized = false;
19+
private readBuffer = "";
20+
private stdoutReader: ReadableStreamDefaultReader<Uint8Array> | null = null;
21+
private decoder = new TextDecoder();
22+
23+
async start(): Promise<void> {
24+
this.proc = Bun.spawn([PYTHON_PATH, SCRIPT_PATH], {
25+
stdin: "pipe",
26+
stdout: "pipe",
27+
stderr: "inherit",
28+
});
29+
30+
const stdout = this.proc.stdout;
31+
this.stdin = this.proc.stdin;
32+
33+
if (!stdout || typeof stdout === "number") {
34+
throw new Error("Failed to get stdout pipe from Python process");
35+
}
36+
if (!this.stdin || typeof this.stdin === "number") {
37+
throw new Error("Failed to get stdin pipe from Python process");
38+
}
39+
40+
// Get reader for stdout
41+
this.stdoutReader = (stdout as ReadableStream<Uint8Array>).getReader();
42+
43+
// Wait for "ready" signal (imports complete)
44+
const readyLine = await this.readLine();
45+
const ready = JSON.parse(readyLine);
46+
if (!ready.ready) {
47+
throw new Error("Python extractor failed to signal ready");
48+
}
49+
50+
// Wait for "initialized" signal (converter created)
51+
const initLine = await this.readLine();
52+
const init = JSON.parse(initLine);
53+
if (!init.initialized) {
54+
throw new Error("Python extractor failed to initialize converter");
55+
}
56+
57+
this.initialized = true;
58+
}
59+
60+
private async readLine(): Promise<string> {
61+
if (!this.stdoutReader) throw new Error("Reader not initialized");
62+
63+
while (true) {
64+
// Check if we already have a complete line in the buffer
65+
const newlineIndex = this.readBuffer.indexOf("\n");
66+
if (newlineIndex !== -1) {
67+
const line = this.readBuffer.slice(0, newlineIndex);
68+
this.readBuffer = this.readBuffer.slice(newlineIndex + 1);
69+
return line;
70+
}
71+
72+
// Read more data
73+
const { value, done } = await this.stdoutReader.read();
74+
if (done) throw new Error("Python process closed unexpectedly");
75+
76+
this.readBuffer += this.decoder.decode(value, { stream: true });
77+
}
78+
}
79+
80+
async extract(filePath: string): Promise<{
81+
success: boolean;
82+
text?: string;
83+
wordCount?: number;
84+
charCount?: number;
85+
tableCount?: number;
86+
imageCount?: number;
87+
extraction?: any;
88+
error?: string;
89+
}> {
90+
if (!this.initialized || !this.stdin || typeof this.stdin === "number") {
91+
throw new Error("Extractor not initialized");
92+
}
93+
94+
// Send file path to Python using Bun's FileSink
95+
(this.stdin as { write: (data: string) => number }).write(filePath + "\n");
96+
97+
// Read JSON response
98+
const responseLine = await this.readLine();
99+
return JSON.parse(responseLine);
100+
}
101+
102+
async stop(): Promise<void> {
103+
if (this.stdin && typeof this.stdin !== "number") {
104+
(this.stdin as { end: () => void }).end();
105+
this.stdin = null;
106+
}
107+
if (this.proc) {
108+
this.proc.kill();
109+
this.proc = null;
110+
}
111+
this.initialized = false;
112+
this.readBuffer = "";
113+
this.stdoutReader = null;
114+
}
115+
}
10116

11117
export async function processDirectory(
12118
config: ExtractConfig,
@@ -58,59 +164,6 @@ export async function processDirectory(
58164

59165
const EXTRACTION_TIMEOUT_MS = 30_000; // 30 seconds per document
60166

61-
async function extractWithPython(
62-
doc: DocumentRecord,
63-
localFilePath: string
64-
): Promise<ExtractedDocument> {
65-
const proc = Bun.spawn([PYTHON_PATH, SCRIPT_PATH, localFilePath], {
66-
stdout: "pipe",
67-
stderr: "pipe",
68-
});
69-
70-
const extractionPromise = (async () => {
71-
const stdout = await new Response(proc.stdout).text();
72-
const stderr = await new Response(proc.stderr).text();
73-
const exitCode = await proc.exited;
74-
return { stdout, stderr, exitCode };
75-
})();
76-
77-
let timeoutId: Timer;
78-
const timeoutPromise = new Promise<never>((_, reject) => {
79-
timeoutId = setTimeout(() => {
80-
proc.kill();
81-
reject(new Error(`Extraction timed out after ${EXTRACTION_TIMEOUT_MS / 1000}s`));
82-
}, EXTRACTION_TIMEOUT_MS);
83-
});
84-
85-
try {
86-
var { stdout, stderr, exitCode } = await Promise.race([
87-
extractionPromise,
88-
timeoutPromise,
89-
]);
90-
} finally {
91-
clearTimeout(timeoutId!);
92-
}
93-
94-
if (exitCode !== 0) {
95-
const errorData = stderr ? JSON.parse(stderr) : { error: "Unknown error" };
96-
throw new Error(errorData.error || "Python extraction failed");
97-
}
98-
99-
const result = JSON.parse(stdout);
100-
101-
return {
102-
id: doc.id,
103-
sourceKey: `documents/${doc.id}.docx`,
104-
text: result.text,
105-
wordCount: result.wordCount,
106-
charCount: result.charCount,
107-
tableCount: result.tableCount,
108-
imageCount: result.imageCount,
109-
extraction: result.extraction,
110-
extractedAt: new Date().toISOString(),
111-
};
112-
}
113-
114167
async function processBatch(
115168
documents: DocumentRecord[],
116169
config: ExtractConfig,
@@ -151,7 +204,20 @@ async function processBatch(
151204

152205
const progressInterval = !verbose ? setInterval(updateProgress, 100) : null;
153206

154-
const processFile = async (): Promise<void> => {
207+
// Start pool of persistent Python extractors (one per worker)
208+
const numWorkers = Math.min(workers, documents.length);
209+
const extractors: PersistentExtractor[] = [];
210+
211+
console.log(`Starting ${numWorkers} persistent Python extractor(s)...`);
212+
for (let i = 0; i < numWorkers; i++) {
213+
const extractor = new PersistentExtractor();
214+
await extractor.start();
215+
extractors.push(extractor);
216+
}
217+
console.log(`${numWorkers} extractor(s) ready, processing documents...`);
218+
219+
// Worker function - each worker uses its own extractor
220+
const processWorker = async (extractor: PersistentExtractor): Promise<void> => {
155221
while (queue.length > 0) {
156222
const doc = queue.shift();
157223
if (!doc) continue;
@@ -168,26 +234,37 @@ async function processBatch(
168234
const tempFile = join(tempDir, `${doc.id}.docx`);
169235
await Bun.write(tempFile, content);
170236

171-
// Extract using Python
172-
const extracted = await extractWithPython(doc, tempFile);
237+
// Extract using persistent Python worker with timeout
238+
const extractPromise = extractor.extract(tempFile);
239+
const timeoutPromise = new Promise<never>((_, reject) => {
240+
setTimeout(() => {
241+
reject(new Error(`Extraction timed out after ${EXTRACTION_TIMEOUT_MS / 1000}s`));
242+
}, EXTRACTION_TIMEOUT_MS);
243+
});
244+
245+
const result = await Promise.race([extractPromise, timeoutPromise]);
246+
247+
if (!result.success) {
248+
throw new Error(result.error || "Extraction failed");
249+
}
173250

174251
// Write text file to storage
175-
await storage.write(`${outputPrefix}/${doc.id}.txt`, extracted.text);
252+
await storage.write(`${outputPrefix}/${doc.id}.txt`, result.text!);
176253

177254
// Write extraction JSON to storage
178255
await storage.write(
179256
`${outputPrefix}/${doc.id}.json`,
180-
JSON.stringify(extracted.extraction)
257+
JSON.stringify(result.extraction)
181258
);
182259

183260
// Update database with extraction metadata
184261
await db.updateExtraction({
185262
id: doc.id,
186-
word_count: extracted.wordCount,
187-
char_count: extracted.charCount,
188-
table_count: extracted.tableCount,
189-
image_count: extracted.imageCount,
190-
extracted_at: extracted.extractedAt,
263+
word_count: result.wordCount!,
264+
char_count: result.charCount!,
265+
table_count: result.tableCount!,
266+
image_count: result.imageCount!,
267+
extracted_at: new Date().toISOString(),
191268
});
192269

193270
successCount++;
@@ -196,7 +273,7 @@ async function processBatch(
196273
await rm(tempFile, { force: true });
197274

198275
if (verbose) {
199-
console.log(` Extracted: ${doc.id} (${extracted.wordCount} words)`);
276+
console.log(` Extracted: ${doc.id} (${result.wordCount} words)`);
200277
}
201278
} catch (err) {
202279
const error = err instanceof Error ? err.message : String(err);
@@ -212,11 +289,13 @@ async function processBatch(
212289
}
213290
};
214291

215-
const workerPromises = Array(Math.min(workers, documents.length))
216-
.fill(null)
217-
.map(() => processFile());
218-
219-
await Promise.all(workerPromises);
292+
try {
293+
// Run all workers in parallel, each with its own extractor
294+
await Promise.all(extractors.map(extractor => processWorker(extractor)));
295+
} finally {
296+
// Always stop all extractors
297+
await Promise.all(extractors.map(e => e.stop()));
298+
}
220299

221300
// Clean up progress display
222301
if (progressInterval) {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/env python3
2+
"""Persistent DocX extraction server using Docling.
3+
4+
Reads file paths from stdin (one per line), outputs JSON per line to stdout.
5+
This avoids the overhead of spawning a new Python process for each document.
6+
"""
7+
import json
8+
import sys
9+
from pathlib import Path
10+
11+
from docling.document_converter import DocumentConverter
12+
from docling.datamodel.base_models import InputFormat
13+
14+
15+
def strip_image_data(extraction: dict) -> dict:
16+
"""Remove base64 image data from extraction to reduce size."""
17+
if "pictures" in extraction:
18+
extraction["pictures"] = [
19+
{k: v for k, v in pic.items() if k != "image"}
20+
for pic in extraction["pictures"]
21+
]
22+
return extraction
23+
24+
25+
def extract(converter: DocumentConverter, file_path: str) -> dict:
26+
"""Extract text and structure from a DOCX file using Docling."""
27+
result = converter.convert(file_path)
28+
29+
# Export as markdown for text extraction
30+
text = result.document.export_to_markdown()
31+
32+
# Get full structured extraction (stripped of image data)
33+
extraction = result.document.export_to_dict()
34+
extraction = strip_image_data(extraction)
35+
36+
return {
37+
"text": text,
38+
"wordCount": len(text.split()),
39+
"charCount": len(text),
40+
"tableCount": len(extraction.get("tables", [])),
41+
"imageCount": len(extraction.get("pictures", [])),
42+
"extraction": extraction,
43+
}
44+
45+
46+
def main():
47+
# Signal that we're ready (after imports complete)
48+
print(json.dumps({"ready": True}), flush=True)
49+
50+
# Initialize converter ONCE (restricted to DOCX only to avoid loading PDF models)
51+
converter = DocumentConverter(allowed_formats=[InputFormat.DOCX])
52+
53+
# Signal that converter is initialized
54+
print(json.dumps({"initialized": True}), flush=True)
55+
56+
# Read file paths from stdin, output JSON per line
57+
for line in sys.stdin:
58+
file_path = line.strip()
59+
if not file_path:
60+
continue
61+
62+
try:
63+
if not Path(file_path).exists():
64+
print(json.dumps({"success": False, "error": f"File not found: {file_path}"}), flush=True)
65+
continue
66+
67+
result = extract(converter, file_path)
68+
print(json.dumps({"success": True, **result}), flush=True)
69+
70+
except Exception as e:
71+
print(json.dumps({"success": False, "error": str(e)}), flush=True)
72+
73+
74+
if __name__ == "__main__":
75+
main()

0 commit comments

Comments
 (0)