Skip to content

Commit 34aa82e

Browse files
authored
fix: redis pub/sub and streaming response issue (#6008)
* fix: unsubscribe redis pubsub when connection closes during prediction or after successful response * update: add heartbeat to keep connection alive and start/stop heartbeat on server start/stop * refactor: redis pub/sub methods, add unsubscribe, await subscribe * fix: redundant unsubscribe * fix: use map to store clients list ---------
1 parent 06dc0e1 commit 34aa82e

File tree

7 files changed

+361
-511
lines changed

7 files changed

+361
-511
lines changed

packages/server/src/controllers/internal-predictions/index.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF
3333
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
3434
const chatId = req.body.chatId
3535
const sseStreamer = getRunningExpressApp().sseStreamer
36+
const isQueueMode = process.env.MODE === MODE.QUEUE
3637

3738
try {
3839
sseStreamer.addClient(chatId, res)
@@ -42,8 +43,8 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
4243
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
4344
res.flushHeaders()
4445

45-
if (process.env.MODE === MODE.QUEUE) {
46-
getRunningExpressApp().redisSubscriber.subscribe(chatId)
46+
if (isQueueMode) {
47+
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
4748
}
4849

4950
const apiResponse = await utilBuildChatflow(req, true)
@@ -54,6 +55,9 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
5455
}
5556
next(error)
5657
} finally {
58+
if (isQueueMode && chatId) {
59+
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
60+
}
5761
sseStreamer.removeClient(chatId)
5862
}
5963
}

packages/server/src/controllers/predictions/index.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
6464
chatId = req.body.chatId ?? req.body.overrideConfig?.sessionId ?? uuidv4()
6565
req.body.chatId = chatId
6666
}
67+
const isQueueMode = process.env.MODE === MODE.QUEUE
6768
try {
6869
sseStreamer.addExternalClient(chatId, res)
6970
res.setHeader('Content-Type', 'text/event-stream')
@@ -72,8 +73,8 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
7273
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
7374
res.flushHeaders()
7475

75-
if (process.env.MODE === MODE.QUEUE) {
76-
getRunningExpressApp().redisSubscriber.subscribe(chatId)
76+
if (isQueueMode) {
77+
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
7778
}
7879

7980
const apiResponse = await predictionsServices.buildChatflow(req)
@@ -84,6 +85,9 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
8485
}
8586
next(error)
8687
} finally {
88+
if (isQueueMode && chatId) {
89+
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
90+
}
8791
sseStreamer.removeClient(chatId)
8892
}
8993
} else {

packages/server/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ export class App {
128128

129129
// Initialize SSE Streamer
130130
this.sseStreamer = new SSEStreamer()
131+
this.sseStreamer.startHeartbeat()
131132
logger.info('🌊 [server]: SSE Streamer initialized successfully')
132133

133134
// Init Queues
@@ -148,6 +149,7 @@ export class App {
148149

149150
this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
150151
await this.redisSubscriber.connect()
152+
this.redisSubscriber.startPeriodicCleanup()
151153
logger.info('🔗 [server]: Redis event subscriber connected successfully')
152154
}
153155

@@ -356,6 +358,7 @@ export class App {
356358

357359
async stopApp() {
358360
try {
361+
this.sseStreamer.stopHeartbeat()
359362
const removePromises: any[] = []
360363
removePromises.push(this.telemetry.flush())
361364
if (this.queueManager) {

packages/server/src/queue/PredictionQueue.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ export class PredictionQueue extends BaseQueue {
6363
}
6464

6565
async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
66+
if (this.redisPublisher) {
67+
await this.redisPublisher.connect()
68+
}
6669
if (this.appDataSource) data.appDataSource = this.appDataSource
6770
if (this.telemetry) data.telemetry = this.telemetry
6871
if (this.cachePool) data.cachePool = this.cachePool

0 commit comments

Comments
 (0)