Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions packages/server/src/controllers/internal-predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
const chatId = req.body.chatId
const sseStreamer = getRunningExpressApp().sseStreamer
const isQueueMode = process.env.MODE === MODE.QUEUE

try {
sseStreamer.addClient(chatId, res)
Expand All @@ -43,8 +42,8 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

const apiResponse = await utilBuildChatflow(req, true)
Expand All @@ -55,9 +54,6 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
}
next(error)
} finally {
if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
}
Comment on lines 56 to 58

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The logic to unsubscribe from the Redis channel in the finally block has been removed. This will lead to stale subscriptions on the Redis server, as the application subscribes to a channel for each streaming request but never unsubscribes. This is a resource leak that can impact server performance and stability over time. Please reintroduce the unsubscribe call.

}
Expand Down
8 changes: 2 additions & 6 deletions packages/server/src/controllers/predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
chatId = req.body.chatId ?? req.body.overrideConfig?.sessionId ?? uuidv4()
req.body.chatId = chatId
}
const isQueueMode = process.env.MODE === MODE.QUEUE
try {
sseStreamer.addExternalClient(chatId, res)
res.setHeader('Content-Type', 'text/event-stream')
Expand All @@ -73,8 +72,8 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

const apiResponse = await predictionsServices.buildChatflow(req)
Expand All @@ -85,9 +84,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
}
next(error)
} finally {
if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
}
Comment on lines 86 to 88

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The logic to unsubscribe from the Redis channel in the finally block has been removed. This will lead to stale subscriptions on the Redis server, as the application subscribes to a channel for each streaming request but never unsubscribes. This is a resource leak that can impact server performance and stability over time. Please reintroduce the unsubscribe call.

} else {
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ export class App {

// Initialize SSE Streamer
this.sseStreamer = new SSEStreamer()
this.sseStreamer.startHeartbeat()
logger.info('🌊 [server]: SSE Streamer initialized successfully')

// Init Queues
Expand All @@ -149,7 +148,6 @@ export class App {

this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
await this.redisSubscriber.connect()
this.redisSubscriber.startPeriodicCleanup()
logger.info('🔗 [server]: Redis event subscriber connected successfully')
}

Expand Down Expand Up @@ -358,7 +356,6 @@ export class App {

async stopApp() {
try {
this.sseStreamer.stopHeartbeat()
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
if (this.queueManager) {
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/queue/PredictionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ export class PredictionQueue extends BaseQueue {
}

async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
if (this.redisPublisher) {
await this.redisPublisher.connect()
}
if (this.appDataSource) data.appDataSource = this.appDataSource
if (this.telemetry) data.telemetry = this.telemetry
if (this.cachePool) data.cachePool = this.cachePool
Expand Down
Loading
Loading