Skip to content

Commit eb22ac1

Browse files
committed
feat: Enhance SSE and Redis event handling with heartbeat and cleanup mechanisms
- Added heartbeat functionality to SSEStreamer for improved client connection management. - Implemented periodic cleanup in RedisEventSubscriber to remove stale subscriptions. - Updated internal prediction and prediction controllers to handle client disconnects and manage subscriptions more effectively. - Refactored RedisEventPublisher to streamline event publishing with error handling. These changes improve the reliability and performance of server-side event streaming and Redis event handling.
1 parent 4d857f8 commit eb22ac1

6 files changed

Lines changed: 245 additions & 423 deletions

File tree

packages/server/src/controllers/internal-predictions/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,19 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
4242
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
4343
res.flushHeaders()
4444

45-
if (process.env.MODE === MODE.QUEUE) {
45+
const isQueueMode = process.env.MODE === MODE.QUEUE
46+
if (isQueueMode) {
4647
getRunningExpressApp().redisSubscriber.subscribe(chatId)
4748
}
4849

50+
// Detect client disconnect (browser close, network timeout, ALB idle timeout)
51+
res.on('close', () => {
52+
sseStreamer.removeClient(chatId)
53+
if (isQueueMode) {
54+
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
55+
}
56+
})
57+
4958
const apiResponse = await utilBuildChatflow(req, true)
5059
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
5160
} catch (error) {
@@ -55,6 +64,9 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
5564
next(error)
5665
} finally {
5766
sseStreamer.removeClient(chatId)
67+
if (process.env.MODE === MODE.QUEUE) {
68+
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
69+
}
5870
}
5971
}
6072
export default {

packages/server/src/controllers/predictions/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,19 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
7272
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
7373
res.flushHeaders()
7474

75-
if (process.env.MODE === MODE.QUEUE) {
75+
const isQueueMode = process.env.MODE === MODE.QUEUE
76+
if (isQueueMode) {
7677
getRunningExpressApp().redisSubscriber.subscribe(chatId)
7778
}
7879

80+
// Detect client disconnect (browser close, network timeout, ALB idle timeout)
81+
res.on('close', () => {
82+
sseStreamer.removeClient(chatId)
83+
if (isQueueMode) {
84+
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
85+
}
86+
})
87+
7988
const apiResponse = await predictionsServices.buildChatflow(req)
8089
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
8190
} catch (error) {
@@ -85,6 +94,9 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
8594
next(error)
8695
} finally {
8796
sseStreamer.removeClient(chatId)
97+
if (process.env.MODE === MODE.QUEUE) {
98+
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
99+
}
88100
}
89101
} else {
90102
const apiResponse = await predictionsServices.buildChatflow(req)

packages/server/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ export class App {
128128

129129
// Initialize SSE Streamer
130130
this.sseStreamer = new SSEStreamer()
131+
this.sseStreamer.startHeartbeat()
131132
logger.info('🌊 [server]: SSE Streamer initialized successfully')
132133

133134
// Init Queues
@@ -148,6 +149,7 @@ export class App {
148149

149150
this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
150151
await this.redisSubscriber.connect()
152+
this.redisSubscriber.startPeriodicCleanup()
151153
logger.info('🔗 [server]: Redis event subscriber connected successfully')
152154
}
153155

@@ -361,6 +363,7 @@ export class App {
361363

362364
async stopApp() {
363365
try {
366+
this.sseStreamer.stopHeartbeat()
364367
const removePromises: any[] = []
365368
removePromises.push(this.telemetry.flush())
366369
if (this.queueManager) {

0 commit comments

Comments
 (0)