Skip to content

Commit 808f7cc

Browse files
committed
Fix Redis/BullMQ: Make Redis optional with graceful in-memory fallback
1 parent 19bf769 commit 808f7cc

2 files changed

Lines changed: 56 additions & 8 deletions

File tree

services/knowledge-base/src/config/redis.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ const logger = createLogger('knowledge-base-service:redis')
1010

1111
let redis: Redis | null = null
1212

13-
export function getRedis(): Redis {
13+
export function getRedis(): Redis | null {
1414
if (!redis) {
1515
const redisUrl = process.env.REDIS_URL
1616
if (!redisUrl) {
17-
throw new Error('REDIS_URL environment variable is required')
17+
logger.warn('REDIS_URL not configured, Redis features will be unavailable. Using in-memory queue fallback.')
18+
return null
1819
}
1920

2021
const isTLS = redisUrl.startsWith('rediss://') || process.env.REDIS_TLS === 'true'

services/knowledge-base/src/services/queue.ts

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,24 @@ const logger = createLogger('knowledge-base-service:queue')
1313
// Queue name
1414
const QUEUE_NAME = 'document-processing'
1515

16-
// Get Redis connection
17-
const redisConnection = getRedis()
16+
/**
17+
* Get Redis connection (may be null if Redis is not configured)
18+
*/
19+
function getRedisConnection(): ReturnType<typeof getRedis> {
20+
try {
21+
return getRedis()
22+
} catch (error) {
23+
logger.warn('Redis connection failed, will use in-memory queue', {
24+
error: error instanceof Error ? error.message : String(error)
25+
})
26+
return null
27+
}
28+
}
1829

1930
/**
2031
* Wait for Redis to be ready by testing with PING
2132
*/
22-
async function waitForRedisReady(): Promise<void> {
33+
async function waitForRedisReady(redisConnection: NonNullable<ReturnType<typeof getRedis>>): Promise<void> {
2334
const maxAttempts = 20
2435
const delayMs = 500
2536

@@ -47,7 +58,22 @@ async function initializeQueue(): Promise<void> {
4758
}
4859

4960
initializationPromise = (async () => {
50-
await waitForRedisReady()
61+
const redisConnection = getRedisConnection()
62+
63+
// If Redis is not available, skip BullMQ initialization
64+
if (!redisConnection) {
65+
logger.info('Redis not available, using in-memory queue only')
66+
return
67+
}
68+
69+
try {
70+
await waitForRedisReady(redisConnection)
71+
} catch (error) {
72+
logger.warn('Failed to connect to Redis, will use in-memory queue', {
73+
error: error instanceof Error ? error.message : String(error)
74+
})
75+
return
76+
}
5177

5278
if (!documentQueue) {
5379
documentQueue = new Queue(QUEUE_NAME, {
@@ -89,7 +115,7 @@ async function initializeQueue(): Promise<void> {
89115
}
90116
},
91117
{
92-
connection: redisConnection,
118+
connection: redisConnection!,
93119
concurrency: 2,
94120
}
95121
)
@@ -161,6 +187,11 @@ export async function enqueueDocument(documentId: string): Promise<Job | { id: s
161187
)
162188
])
163189

190+
// Check if queue was successfully initialized (Redis available)
191+
if (!documentQueue) {
192+
throw new Error('Queue not initialized - Redis unavailable')
193+
}
194+
164195
const queue = getDocumentQueue()
165196

166197
const job = await Promise.race([
@@ -180,7 +211,7 @@ export async function enqueueDocument(documentId: string): Promise<Job | { id: s
180211
return job
181212
} catch (error) {
182213
logger.warn(`BullMQ queue unavailable, using in-memory queue for document ${documentId}`, {
183-
error
214+
error: error instanceof Error ? error.message : String(error)
184215
})
185216

186217
if (inMemoryQueue.some(item => item.documentId === documentId)) {
@@ -206,6 +237,20 @@ export async function enqueueDocument(documentId: string): Promise<Job | { id: s
206237
export async function getQueueStats() {
207238
try {
208239
await initializeQueue()
240+
241+
// If queue is not initialized (Redis unavailable), return in-memory stats only
242+
if (!documentQueue) {
243+
return {
244+
waiting: 0,
245+
active: 0,
246+
completed: 0,
247+
failed: 0,
248+
delayed: 0,
249+
inMemoryQueue: inMemoryQueue.length,
250+
mode: 'in-memory',
251+
}
252+
}
253+
209254
const queue = getDocumentQueue()
210255

211256
const [waiting, active, completed, failed, delayed] = await Promise.all([
@@ -223,6 +268,7 @@ export async function getQueueStats() {
223268
failed,
224269
delayed,
225270
inMemoryQueue: inMemoryQueue.length,
271+
mode: 'bullmq',
226272
}
227273
} catch (error) {
228274
return {
@@ -232,6 +278,7 @@ export async function getQueueStats() {
232278
failed: 0,
233279
delayed: 0,
234280
inMemoryQueue: inMemoryQueue.length,
281+
mode: 'in-memory',
235282
}
236283
}
237284
}

0 commit comments

Comments
 (0)