Skip to content

Commit 3ecb978

Browse files
authored
Revert "fix: redis pub/sub and streaming response issue" (#6084)
Revert "fix: redis pub/sub and streaming response issue (#6008)" This reverts commit 34aa82e.
1 parent 34aa82e commit 3ecb978

File tree

7 files changed

+511
-361
lines changed

7 files changed

+511
-361
lines changed

packages/server/src/controllers/internal-predictions/index.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF
3333
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
3434
const chatId = req.body.chatId
3535
const sseStreamer = getRunningExpressApp().sseStreamer
36-
const isQueueMode = process.env.MODE === MODE.QUEUE
3736

3837
try {
3938
sseStreamer.addClient(chatId, res)
@@ -43,8 +42,8 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
4342
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
4443
res.flushHeaders()
4544

46-
if (isQueueMode) {
47-
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
45+
if (process.env.MODE === MODE.QUEUE) {
46+
getRunningExpressApp().redisSubscriber.subscribe(chatId)
4847
}
4948

5049
const apiResponse = await utilBuildChatflow(req, true)
@@ -55,9 +54,6 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
5554
}
5655
next(error)
5756
} finally {
58-
if (isQueueMode && chatId) {
59-
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
60-
}
6157
sseStreamer.removeClient(chatId)
6258
}
6359
}

packages/server/src/controllers/predictions/index.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
6464
chatId = req.body.chatId ?? req.body.overrideConfig?.sessionId ?? uuidv4()
6565
req.body.chatId = chatId
6666
}
67-
const isQueueMode = process.env.MODE === MODE.QUEUE
6867
try {
6968
sseStreamer.addExternalClient(chatId, res)
7069
res.setHeader('Content-Type', 'text/event-stream')
@@ -73,8 +72,8 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
7372
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
7473
res.flushHeaders()
7574

76-
if (isQueueMode) {
77-
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
75+
if (process.env.MODE === MODE.QUEUE) {
76+
getRunningExpressApp().redisSubscriber.subscribe(chatId)
7877
}
7978

8079
const apiResponse = await predictionsServices.buildChatflow(req)
@@ -85,9 +84,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
8584
}
8685
next(error)
8786
} finally {
88-
if (isQueueMode && chatId) {
89-
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
90-
}
9187
sseStreamer.removeClient(chatId)
9288
}
9389
} else {

packages/server/src/index.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ export class App {
128128

129129
// Initialize SSE Streamer
130130
this.sseStreamer = new SSEStreamer()
131-
this.sseStreamer.startHeartbeat()
132131
logger.info('🌊 [server]: SSE Streamer initialized successfully')
133132

134133
// Init Queues
@@ -149,7 +148,6 @@ export class App {
149148

150149
this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
151150
await this.redisSubscriber.connect()
152-
this.redisSubscriber.startPeriodicCleanup()
153151
logger.info('🔗 [server]: Redis event subscriber connected successfully')
154152
}
155153

@@ -358,7 +356,6 @@ export class App {
358356

359357
async stopApp() {
360358
try {
361-
this.sseStreamer.stopHeartbeat()
362359
const removePromises: any[] = []
363360
removePromises.push(this.telemetry.flush())
364361
if (this.queueManager) {

packages/server/src/queue/PredictionQueue.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,6 @@ export class PredictionQueue extends BaseQueue {
6363
}
6464

6565
async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
66-
if (this.redisPublisher) {
67-
await this.redisPublisher.connect()
68-
}
6966
if (this.appDataSource) data.appDataSource = this.appDataSource
7067
if (this.telemetry) data.telemetry = this.telemetry
7168
if (this.cachePool) data.cachePool = this.cachePool

0 commit comments

Comments
 (0)