Skip to content
8 changes: 6 additions & 2 deletions packages/server/src/controllers/internal-predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
const chatId = req.body.chatId
const sseStreamer = getRunningExpressApp().sseStreamer
const isQueueMode = process.env.MODE === MODE.QUEUE

try {
sseStreamer.addClient(chatId, res)
Expand All @@ -42,8 +43,8 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

const apiResponse = await utilBuildChatflow(req, true)
Expand All @@ -54,6 +55,9 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
}
next(error)
} finally {
if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
}
}
Expand Down
8 changes: 6 additions & 2 deletions packages/server/src/controllers/predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
chatId = req.body.chatId ?? req.body.overrideConfig?.sessionId ?? uuidv4()
req.body.chatId = chatId
}
const isQueueMode = process.env.MODE === MODE.QUEUE
try {
sseStreamer.addExternalClient(chatId, res)
res.setHeader('Content-Type', 'text/event-stream')
Expand All @@ -72,8 +73,8 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
res.flushHeaders()

if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

const apiResponse = await predictionsServices.buildChatflow(req)
Expand All @@ -84,6 +85,9 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
}
next(error)
} finally {
if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export class App {

// Initialize SSE Streamer
this.sseStreamer = new SSEStreamer()
this.sseStreamer.startHeartbeat()
logger.info('🌊 [server]: SSE Streamer initialized successfully')

// Init Queues
Expand All @@ -148,6 +149,7 @@ export class App {

this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
await this.redisSubscriber.connect()
this.redisSubscriber.startPeriodicCleanup()
logger.info('🔗 [server]: Redis event subscriber connected successfully')
}

Expand Down Expand Up @@ -356,6 +358,7 @@ export class App {

async stopApp() {
try {
this.sseStreamer.stopHeartbeat()
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
if (this.queueManager) {
Expand Down
3 changes: 3 additions & 0 deletions packages/server/src/queue/PredictionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ export class PredictionQueue extends BaseQueue {
}

async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
if (this.redisPublisher) {
await this.redisPublisher.connect()
}
if (this.appDataSource) data.appDataSource = this.appDataSource
if (this.telemetry) data.telemetry = this.telemetry
if (this.cachePool) data.cachePool = this.cachePool
Expand Down
Loading
Loading