Skip to content

Commit ff4bcc7

Browse files
committed
fix: processor stuck
1 parent 3c9ee84 commit ff4bcc7

1 file changed

Lines changed: 58 additions & 6 deletions

File tree

packages/extractor/processor.ts

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ class PersistentExtractor {
112112
this.readBuffer = "";
113113
this.stdoutReader = null;
114114
}
115+
116+
async restart(): Promise<void> {
117+
await this.stop();
118+
await this.start();
119+
}
115120
}
116121

117122
export async function processDirectory(
@@ -163,6 +168,7 @@ export async function processDirectory(
163168
}
164169

165170
const EXTRACTION_TIMEOUT_MS = 30_000; // 30 seconds per document
171+
const STALL_TIMEOUT_MS = 60_000; // 1 minute without progress
166172

167173
async function processBatch(
168174
documents: DocumentRecord[],
@@ -175,13 +181,20 @@ async function processBatch(
175181
let errorCount = 0;
176182
const queue = [...documents];
177183

184+
// Atomic queue retrieval to avoid race conditions
185+
const getNextDocument = (): DocumentRecord | undefined => queue.shift();
186+
178187
// Progress tracking (only used when not verbose)
179188
const startTime = Date.now();
180189
let lastThroughputUpdate = startTime;
181190
let docsAtLastUpdate = 0;
182191
let currentDocsPerSec = 0;
183192
let prevLineCount = 0;
184193

194+
// Stall detection - track when last progress was made
195+
let lastProgressTime = Date.now();
196+
let stalled = false;
197+
185198
const updateProgress = () => {
186199
const now = Date.now();
187200
const elapsed = (now - lastThroughputUpdate) / 1000;
@@ -218,11 +231,10 @@ async function processBatch(
218231

219232
// Worker function - each worker uses its own extractor
220233
const processWorker = async (extractor: PersistentExtractor): Promise<void> => {
221-
while (queue.length > 0) {
222-
const doc = queue.shift();
223-
if (!doc) continue;
224-
234+
let doc: DocumentRecord | undefined;
235+
while ((doc = getNextDocument()) && !stalled) {
225236
const sourceKey = `${inputPrefix}/${doc.id}.docx`;
237+
let timeoutId: ReturnType<typeof setTimeout> | undefined;
226238

227239
try {
228240
// Download file from storage to temp
@@ -237,13 +249,16 @@ async function processBatch(
237249
// Extract using persistent Python worker with timeout
238250
const extractPromise = extractor.extract(tempFile);
239251
const timeoutPromise = new Promise<never>((_, reject) => {
240-
setTimeout(() => {
252+
timeoutId = setTimeout(() => {
241253
reject(new Error(`Extraction timed out after ${EXTRACTION_TIMEOUT_MS / 1000}s`));
242254
}, EXTRACTION_TIMEOUT_MS);
243255
});
244256

245257
const result = await Promise.race([extractPromise, timeoutPromise]);
246258

259+
// Clear timeout on success
260+
if (timeoutId) clearTimeout(timeoutId);
261+
247262
if (!result.success) {
248263
throw new Error(result.error || "Extraction failed");
249264
}
@@ -268,6 +283,7 @@ async function processBatch(
268283
});
269284

270285
successCount++;
286+
lastProgressTime = Date.now();
271287

272288
// Cleanup temp file
273289
await rm(tempFile, { force: true });
@@ -276,11 +292,25 @@ async function processBatch(
276292
console.log(` Extracted: ${doc.id} (${result.wordCount} words)`);
277293
}
278294
} catch (err) {
295+
// Clear timeout on error
296+
if (timeoutId) clearTimeout(timeoutId);
297+
279298
const error = err instanceof Error ? err.message : String(err);
280299

300+
// Restart Python process after timeout to restore clean state
301+
if (error.includes("timed out")) {
302+
try {
303+
await extractor.restart();
304+
} catch {
305+
// If restart fails, continue with potentially broken extractor
306+
// It will fail on next extraction and be caught
307+
}
308+
}
309+
281310
// Update database with error
282311
await db.updateExtractionError(doc.id, error);
283312
errorCount++;
313+
lastProgressTime = Date.now();
284314

285315
if (verbose) {
286316
console.error(` Failed: ${doc.id}: ${error}`);
@@ -289,11 +319,33 @@ async function processBatch(
289319
}
290320
};
291321

322+
// Stall detection interval - check if no progress for too long
323+
const stallCheckInterval = setInterval(async () => {
324+
const timeSinceProgress = Date.now() - lastProgressTime;
325+
if (timeSinceProgress > STALL_TIMEOUT_MS && queue.length > 0) {
326+
console.error(`\nStall detected: no progress for ${STALL_TIMEOUT_MS / 1000}s, restarting extractors...`);
327+
stalled = true;
328+
329+
// Force restart all extractors
330+
await Promise.all(extractors.map(async (e) => {
331+
try {
332+
await e.restart();
333+
} catch {
334+
// Ignore restart errors
335+
}
336+
}));
337+
338+
stalled = false;
339+
lastProgressTime = Date.now();
340+
}
341+
}, 10_000); // Check every 10 seconds
342+
292343
try {
293344
// Run all workers in parallel, each with its own extractor
294345
await Promise.all(extractors.map(extractor => processWorker(extractor)));
295346
} finally {
296-
// Always stop all extractors
347+
// Always stop all extractors and clear intervals
348+
clearInterval(stallCheckInterval);
297349
await Promise.all(extractors.map(e => e.stop()));
298350
}
299351

0 commit comments

Comments
 (0)