Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion packages/server/src/controllers/internal-predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,19 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (process.env.MODE === MODE.QUEUE) {
const isQueueMode = process.env.MODE === MODE.QUEUE
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

// Detect client disconnect (browser close, network timeout, ALB idle timeout)
res.on('close', () => {
sseStreamer.removeClient(chatId)
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
})

const apiResponse = await utilBuildChatflow(req, true)
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
Comment on lines 58 to 59
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Similar to the external predictions controller, the chatId here is user-controlled and used to route SSE events. An attacker with access to the internal API can hijack another user's chat stream within the same workspace by providing a colliding chatId. This allows the attacker to receive real-time chat events intended for another user. Ensure that chatId is generated server-side or validated for uniqueness and ownership before being used to establish an SSE connection.

} catch (error) {
Expand All @@ -55,6 +64,9 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
next(error)
} finally {
sseStreamer.removeClient(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
Comment on lines +67 to +69
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency, you can use the isQueueMode constant defined on line 45. Also, the cleanup logic is duplicated from the res.on('close') handler. It would be cleaner to extract this into a shared function to avoid duplication and potential future bugs if one place is updated and the other is not.

Suggested change
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}

}
}
export default {
Expand Down
14 changes: 13 additions & 1 deletion packages/server/src/controllers/predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,19 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (process.env.MODE === MODE.QUEUE) {
const isQueueMode = process.env.MODE === MODE.QUEUE
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

// Detect client disconnect (browser close, network timeout, ALB idle timeout)
res.on('close', () => {
sseStreamer.removeClient(chatId)
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
})

const apiResponse = await predictionsServices.buildChatflow(req)
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
Comment on lines 88 to 89
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The chatId used to identify and route Server-Sent Events (SSE) is taken directly from the user-supplied request body without any uniqueness or ownership validation. An attacker can provide a chatId that is already in use by another user's active session. This will overwrite the victim's connection in the server's internal clients map. As a result, any subsequent events generated for the victim's chat session will be routed to the attacker's connection, allowing for session hijacking. The server should be responsible for generating unique, cryptographically secure chatIds (e.g., using uuidv4()) and returning them to the client, rather than allowing the client to provide them.

} catch (error) {
Expand All @@ -85,6 +94,9 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
next(error)
} finally {
sseStreamer.removeClient(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
Comment on lines +97 to +99
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency, you can use the isQueueMode constant defined on line 75. Also, the cleanup logic is duplicated from the res.on('close') handler. It would be cleaner to extract this into a shared function to avoid duplication and potential future bugs if one place is updated and the other is not.

Suggested change
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}

}
} else {
const apiResponse = await predictionsServices.buildChatflow(req)
Expand Down
3 changes: 3 additions & 0 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export class App {

// Initialize SSE Streamer
this.sseStreamer = new SSEStreamer()
this.sseStreamer.startHeartbeat()
logger.info('🌊 [server]: SSE Streamer initialized successfully')

// Init Queues
Expand All @@ -148,6 +149,7 @@ export class App {

this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
await this.redisSubscriber.connect()
this.redisSubscriber.startPeriodicCleanup()
logger.info('🔗 [server]: Redis event subscriber connected successfully')
}

Expand Down Expand Up @@ -361,6 +363,7 @@ export class App {

async stopApp() {
try {
this.sseStreamer.stopHeartbeat()
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
if (this.queueManager) {
Expand Down
Loading