Skip to content

Commit d823757

Browse files
committed
Fix BullMQ job stall: increase stalled interval and add periodic progress updates
1 parent 17c04f0 commit d823757

2 files changed

Lines changed: 19 additions & 5 deletions

File tree

services/knowledge-base/src/services/processor.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ function findSentenceBoundary(text: string, index: number): number {
4242

4343
const logger = createLogger('knowledge-base-service:processor')
4444

45-
export async function processDocument(documentId: string) {
45+
export async function processDocument(documentId: string, job?: { updateProgress: (progress: number) => Promise<void> }) {
4646
const timeoutPromise = new Promise<never>((_, reject) => {
4747
setTimeout(() => {
4848
reject(new Error(`Document processing timeout after ${PROCESSING_CONSTANTS.TIMEOUT_MS / 1000} seconds`))
4949
}, PROCESSING_CONSTANTS.TIMEOUT_MS)
5050
})
5151

52-
try {
52+
try {
5353
await Promise.race([
54-
processDocumentInternal(documentId),
54+
processDocumentInternal(documentId, job),
5555
timeoutPromise,
5656
])
5757
} catch (error) {
@@ -79,7 +79,7 @@ export async function processDocument(documentId: string) {
7979
}
8080
}
8181

82-
async function processDocumentInternal(documentId: string) {
82+
async function processDocumentInternal(documentId: string, job?: { updateProgress: (progress: number) => Promise<void> }) {
8383
const supabase = getSupabase()
8484

8585
await supabase
@@ -268,6 +268,14 @@ async function processDocumentInternal(documentId: string) {
268268
// Move to next window
269269
windowStart = windowEnd
270270

271+
// Update job progress periodically to prevent stall detection
272+
if (job && windowNumber % 3 === 0) {
273+
const progress = Math.min(10 + Math.floor((windowStart / textLength) * 80), 90)
274+
await job.updateProgress(progress).catch(() => {
275+
// Ignore progress update errors (job might be completed)
276+
})
277+
}
278+
271279
// Force GC every few windows for large documents
272280
if (global.gc && windowNumber % 5 === 0) {
273281
global.gc()

services/knowledge-base/src/services/queue.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ async function initializeQueue(): Promise<void> {
109109

110110
try {
111111
await job.updateProgress(10)
112-
await processDocument(documentId)
112+
// Pass job for progress updates during processing
113+
await processDocument(documentId, job)
113114
await job.updateProgress(100)
114115
} catch (error) {
115116
const errorMessage = error instanceof Error ? error.message : String(error)
@@ -123,6 +124,11 @@ async function initializeQueue(): Promise<void> {
123124
{
124125
connection: redisConnection,
125126
concurrency: 2,
127+
// Increase stalled interval to allow long-running document processing
128+
// Default is 30 seconds, but large documents can take 5+ minutes
129+
maxStalledCount: 0, // Don't mark as stalled (we handle timeouts in processor)
130+
stalledInterval: 10 * 60 * 1000, // 10 minutes (longer than max processing time)
131+
lockDuration: 10 * 60 * 1000, // 10 minutes - how long job is locked to this worker
126132
}
127133
)
128134

0 commit comments

Comments
 (0)