@@ -13,34 +13,55 @@ const logger = createLogger('knowledge-base-service:queue')
1313// Queue name
1414const QUEUE_NAME = 'document-processing'
1515
16- /**
17- * Get Redis connection (may be null if Redis is not configured)
18- */
19- function getRedisConnection ( ) : ReturnType < typeof getRedis > {
20- try {
21- return getRedis ( )
22- } catch ( error ) {
23- logger . warn ( 'Redis connection failed, will use in-memory queue' , {
24- error : error instanceof Error ? error . message : String ( error )
25- } )
26- return null
27- }
28- }
16+ // Get Redis connection
17+ const redisConnection = getRedis ( )
2918
3019/**
3120 * Wait for Redis to be ready by testing with PING
3221 */
33- async function waitForRedisReady ( redisConnection : NonNullable < ReturnType < typeof getRedis > > ) : Promise < void > {
34- const maxAttempts = 20
22+ async function waitForRedisReady ( ) : Promise < void > {
23+ const maxAttempts = 60 // Increased for Railway network
3524 const delayMs = 500
3625
26+ // First, wait for connection status to be 'ready' or 'connecting'
27+ let statusCheckAttempts = 0
28+ while ( redisConnection . status !== 'ready' && redisConnection . status !== 'connecting' && statusCheckAttempts < 10 ) {
29+ await new Promise ( resolve => setTimeout ( resolve , 100 ) )
30+ statusCheckAttempts ++
31+ }
32+
33+ // Then test with PING
3734 for ( let attempt = 1 ; attempt <= maxAttempts ; attempt ++ ) {
3835 try {
39- await redisConnection . ping ( )
40- return
36+ const result = await Promise . race ( [
37+ redisConnection . ping ( ) ,
38+ new Promise < never > ( ( _ , reject ) =>
39+ setTimeout ( ( ) => reject ( new Error ( 'PING timeout' ) ) , 5000 )
40+ )
41+ ] )
42+ if ( result === 'PONG' ) {
43+ logger . info ( 'Redis connection verified' , {
44+ status : redisConnection . status ,
45+ attempt
46+ } )
47+ return
48+ }
4149 } catch ( error ) {
50+ const errorMessage = error instanceof Error ? error . message : String ( error )
4251 if ( attempt === maxAttempts ) {
43- throw new Error ( `Redis connection timeout after ${ maxAttempts } attempts. Status: ${ redisConnection . status } ` )
52+ logger . error ( 'Redis connection timeout' , {
53+ attempts : maxAttempts ,
54+ status : redisConnection . status ,
55+ error : errorMessage
56+ } )
57+ throw new Error ( `Redis connection timeout after ${ maxAttempts } attempts. Status: ${ redisConnection . status } . Error: ${ errorMessage } ` )
58+ }
59+ if ( attempt % 10 === 0 ) {
60+ logger . warn ( 'Redis connection still waiting' , {
61+ attempt,
62+ maxAttempts,
63+ status : redisConnection . status
64+ } )
4465 }
4566 await new Promise ( resolve => setTimeout ( resolve , delayMs ) )
4667 }
@@ -58,22 +79,7 @@ async function initializeQueue(): Promise<void> {
5879 }
5980
6081 initializationPromise = ( async ( ) => {
61- const redisConnection = getRedisConnection ( )
62-
63- // If Redis is not available, skip BullMQ initialization
64- if ( ! redisConnection ) {
65- logger . info ( 'Redis not available, using in-memory queue only' )
66- return
67- }
68-
69- try {
70- await waitForRedisReady ( redisConnection )
71- } catch ( error ) {
72- logger . warn ( 'Failed to connect to Redis, will use in-memory queue' , {
73- error : error instanceof Error ? error . message : String ( error )
74- } )
75- return
76- }
82+ await waitForRedisReady ( )
7783
7884 if ( ! documentQueue ) {
7985 documentQueue = new Queue ( QUEUE_NAME , {
@@ -115,7 +121,7 @@ async function initializeQueue(): Promise<void> {
115121 }
116122 } ,
117123 {
118- connection : redisConnection ! ,
124+ connection : redisConnection ,
119125 concurrency : 2 ,
120126 }
121127 )
@@ -187,11 +193,6 @@ export async function enqueueDocument(documentId: string): Promise<Job | { id: s
187193 )
188194 ] )
189195
190- // Check if queue was successfully initialized (Redis available)
191- if ( ! documentQueue ) {
192- throw new Error ( 'Queue not initialized - Redis unavailable' )
193- }
194-
195196 const queue = getDocumentQueue ( )
196197
197198 const job = await Promise . race ( [
@@ -211,7 +212,7 @@ export async function enqueueDocument(documentId: string): Promise<Job | { id: s
211212 return job
212213 } catch ( error ) {
213214 logger . warn ( `BullMQ queue unavailable, using in-memory queue for document ${ documentId } ` , {
214- error : error instanceof Error ? error . message : String ( error )
215+ error
215216 } )
216217
217218 if ( inMemoryQueue . some ( item => item . documentId === documentId ) ) {
@@ -237,20 +238,6 @@ export async function enqueueDocument(documentId: string): Promise<Job | { id: s
237238export async function getQueueStats ( ) {
238239 try {
239240 await initializeQueue ( )
240-
241- // If queue is not initialized (Redis unavailable), return in-memory stats only
242- if ( ! documentQueue ) {
243- return {
244- waiting : 0 ,
245- active : 0 ,
246- completed : 0 ,
247- failed : 0 ,
248- delayed : 0 ,
249- inMemoryQueue : inMemoryQueue . length ,
250- mode : 'in-memory' ,
251- }
252- }
253-
254241 const queue = getDocumentQueue ( )
255242
256243 const [ waiting , active , completed , failed , delayed ] = await Promise . all ( [
@@ -268,7 +255,6 @@ export async function getQueueStats() {
268255 failed,
269256 delayed,
270257 inMemoryQueue : inMemoryQueue . length ,
271- mode : 'bullmq' ,
272258 }
273259 } catch ( error ) {
274260 return {
@@ -278,7 +264,6 @@ export async function getQueueStats() {
278264 failed : 0 ,
279265 delayed : 0 ,
280266 inMemoryQueue : inMemoryQueue . length ,
281- mode : 'in-memory' ,
282267 }
283268 }
284269}
0 commit comments