Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions packages/server/src/controllers/internal-predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
const chatId = req.body.chatId
const sseStreamer = getRunningExpressApp().sseStreamer
const isQueueMode = process.env.MODE === MODE.QUEUE

try {
sseStreamer.addClient(chatId, res)
Expand All @@ -43,8 +42,8 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The subscribe method on the redis client is asynchronous. By removing await, this operation becomes fire-and-forget. While this might be intended to not block the request, any potential errors during the subscription process will not be caught and will result in an unhandled promise rejection, which could crash the application.

}

const apiResponse = await utilBuildChatflow(req, true)
Expand All @@ -55,9 +54,6 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
}
next(error)
} finally {
if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This revert removes the call to redisSubscriber.unsubscribe(chatId). This means that for every streaming request in queue mode, a new Redis subscription is created but never removed. This will lead to a resource leak, as subscriptions will accumulate over time, consuming memory and resources on both the application server and the Redis instance.

}
}
Expand Down
8 changes: 2 additions & 6 deletions packages/server/src/controllers/predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
chatId = req.body.chatId ?? req.body.overrideConfig?.sessionId ?? uuidv4()
req.body.chatId = chatId
}
const isQueueMode = process.env.MODE === MODE.QUEUE
try {
sseStreamer.addExternalClient(chatId, res)
res.setHeader('Content-Type', 'text/event-stream')
Expand All @@ -73,8 +72,8 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The subscribe method on the redis client is asynchronous. By removing await, this operation becomes fire-and-forget. While this might be intended to not block the request, any potential errors during the subscription process will not be caught and will result in an unhandled promise rejection, which could crash the application.

}

const apiResponse = await predictionsServices.buildChatflow(req)
Expand All @@ -85,9 +84,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
}
next(error)
} finally {
if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This revert removes the call to redisSubscriber.unsubscribe(chatId). This means that for every streaming request in queue mode, a new Redis subscription is created but never removed. This will lead to a resource leak, as subscriptions will accumulate over time, consuming memory and resources on both the application server and the Redis instance.

}
} else {
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ export class App {

// Initialize SSE Streamer
this.sseStreamer = new SSEStreamer()
this.sseStreamer.startHeartbeat()
logger.info('🌊 [server]: SSE Streamer initialized successfully')

// Init Queues
Expand All @@ -149,7 +148,6 @@ export class App {

this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
await this.redisSubscriber.connect()
this.redisSubscriber.startPeriodicCleanup()
logger.info('🔗 [server]: Redis event subscriber connected successfully')
}

Expand Down Expand Up @@ -358,7 +356,6 @@ export class App {

async stopApp() {
try {
this.sseStreamer.stopHeartbeat()
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
if (this.queueManager) {
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/queue/PredictionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ export class PredictionQueue extends BaseQueue {
}

async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
if (this.redisPublisher) {
await this.redisPublisher.connect()
}
if (this.appDataSource) data.appDataSource = this.appDataSource
if (this.telemetry) data.telemetry = this.telemetry
if (this.cachePool) data.cachePool = this.cachePool
Expand Down
Loading
Loading