diff --git a/packages/server/src/controllers/internal-predictions/index.ts b/packages/server/src/controllers/internal-predictions/index.ts index 418421a4386..b5614faf248 100644 --- a/packages/server/src/controllers/internal-predictions/index.ts +++ b/packages/server/src/controllers/internal-predictions/index.ts @@ -33,7 +33,6 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => { const chatId = req.body.chatId const sseStreamer = getRunningExpressApp().sseStreamer - const isQueueMode = process.env.MODE === MODE.QUEUE try { sseStreamer.addClient(chatId, res) @@ -43,8 +42,8 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629 res.flushHeaders() - if (isQueueMode) { - await getRunningExpressApp().redisSubscriber.subscribe(chatId) + if (process.env.MODE === MODE.QUEUE) { + getRunningExpressApp().redisSubscriber.subscribe(chatId) } const apiResponse = await utilBuildChatflow(req, true) @@ -55,9 +54,6 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne } next(error) } finally { - if (isQueueMode && chatId) { - await getRunningExpressApp().redisSubscriber.unsubscribe(chatId) - } sseStreamer.removeClient(chatId) } } diff --git a/packages/server/src/controllers/predictions/index.ts b/packages/server/src/controllers/predictions/index.ts index 57561d8c39a..d467f3157e4 100644 --- a/packages/server/src/controllers/predictions/index.ts +++ b/packages/server/src/controllers/predictions/index.ts @@ -64,7 +64,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction) chatId = req.body.chatId ?? req.body.overrideConfig?.sessionId ?? uuidv4() req.body.chatId = chatId } - const isQueueMode = process.env.MODE === MODE.QUEUE try { sseStreamer.addExternalClient(chatId, res) res.setHeader('Content-Type', 'text/event-stream') @@ -73,8 +72,8 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction) res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629 res.flushHeaders() - if (isQueueMode) { - await getRunningExpressApp().redisSubscriber.subscribe(chatId) + if (process.env.MODE === MODE.QUEUE) { + getRunningExpressApp().redisSubscriber.subscribe(chatId) } const apiResponse = await predictionsServices.buildChatflow(req) @@ -85,9 +84,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction) } next(error) } finally { - if (isQueueMode && chatId) { - await getRunningExpressApp().redisSubscriber.unsubscribe(chatId) - } sseStreamer.removeClient(chatId) } } else { diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index b0b031d633a..02d6a43d898 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -128,7 +128,6 @@ export class App { // Initialize SSE Streamer this.sseStreamer = new SSEStreamer() - this.sseStreamer.startHeartbeat() logger.info('🌊 [server]: SSE Streamer initialized successfully') // Init Queues @@ -149,7 +148,6 @@ export class App { this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer) await this.redisSubscriber.connect() - this.redisSubscriber.startPeriodicCleanup() logger.info('🔗 [server]: Redis event subscriber connected successfully') } @@ -358,7 +356,6 @@ export class App { async stopApp() { try { - this.sseStreamer.stopHeartbeat() const removePromises: any[] = [] removePromises.push(this.telemetry.flush()) if (this.queueManager) { diff --git a/packages/server/src/queue/PredictionQueue.ts b/packages/server/src/queue/PredictionQueue.ts index 57251527106..10cc125f779 100644 --- a/packages/server/src/queue/PredictionQueue.ts +++ b/packages/server/src/queue/PredictionQueue.ts @@ -63,9 +63,6 @@ export class PredictionQueue extends BaseQueue { } async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) { - if (this.redisPublisher) { - await this.redisPublisher.connect() - } if (this.appDataSource) data.appDataSource = this.appDataSource if (this.telemetry) data.telemetry = this.telemetry if (this.cachePool) data.cachePool = this.cachePool diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index e275343a492..da3f54c368c 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -4,7 +4,6 @@ import logger from '../utils/logger' export class RedisEventPublisher implements IServerSideEventStreamer { private redisPublisher: ReturnType - private connectPromise: Promise | null = null constructor() { if (process.env.REDIS_URL) { @@ -77,35 +76,53 @@ export class RedisEventPublisher implements IServerSideEventStreamer { return this.redisPublisher.isReady } - async connect(): Promise { - if (this.connectPromise === null) { - this.connectPromise = this.redisPublisher.connect().then(() => undefined) - } - await this.connectPromise + async connect() { + await this.redisPublisher.connect() } - private async safePublish(channel: string, message: string) { - if (!this.redisPublisher.isReady) { - logger.warn(`[RedisEventPublisher] Cannot publish to channel ${channel}: Redis client not ready`) - return - } + streamCustomEvent(chatId: string, eventType: string, data: any) { try { - await this.redisPublisher.publish(channel, message) + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType, + data + }) + ) } catch (error) { - logger.error(`[RedisEventPublisher] Error publishing to channel ${channel}:`, { error }) + console.error('Error streaming custom event:', error) } } - streamCustomEvent(chatId: string, eventType: string, data: any) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType, data })) - } - streamStartEvent(chatId: string, data: string) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'start', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'start', + data + }) + ) + } catch (error) { + console.error('Error streaming start event:', error) + } } streamTokenEvent(chatId: string, data: string) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'token', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'token', + data + }) + ) + } catch (error) { + console.error('Error streaming token event:', error) + } } streamThinkingEvent(chatId: string, data: string, duration?: number) { @@ -125,55 +142,198 @@ export class RedisEventPublisher implements IServerSideEventStreamer { } streamSourceDocumentsEvent(chatId: string, data: any) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'sourceDocuments', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'sourceDocuments', + data + }) + ) + } catch (error) { + console.error('Error streaming sourceDocuments event:', error) + } } streamArtifactsEvent(chatId: string, data: any) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'artifacts', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'artifacts', + data + }) + ) + } catch (error) { + console.error('Error streaming artifacts event:', error) + } } streamUsedToolsEvent(chatId: string, data: any) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'usedTools', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'usedTools', + data + }) + ) + } catch (error) { + console.error('Error streaming usedTools event:', error) + } } streamCalledToolsEvent(chatId: string, data: any) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'calledTools', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'calledTools', + data + }) + ) + } catch (error) { + console.error('Error streaming calledTools event:', error) + } } streamFileAnnotationsEvent(chatId: string, data: any) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'fileAnnotations', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'fileAnnotations', + data + }) + ) + } catch (error) { + console.error('Error streaming fileAnnotations event:', error) + } } streamToolEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'tool', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'tool', + data + }) + ) + } catch (error) { + console.error('Error streaming tool event:', error) + } } streamAgentReasoningEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'agentReasoning', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'agentReasoning', + data + }) + ) + } catch (error) { + console.error('Error streaming agentReasoning event:', error) + } } streamAgentFlowEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'agentFlowEvent', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'agentFlowEvent', + data + }) + ) + } catch (error) { + console.error('Error streaming agentFlow event:', error) + } } streamAgentFlowExecutedDataEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'agentFlowExecutedData', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'agentFlowExecutedData', + data + }) + ) + } catch (error) { + console.error('Error streaming agentFlowExecutedData event:', error) + } } streamNextAgentEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'nextAgent', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'nextAgent', + data + }) + ) + } catch (error) { + console.error('Error streaming nextAgent event:', error) + } } streamNextAgentFlowEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'nextAgentFlow', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'nextAgentFlow', + data + }) + ) + } catch (error) { + console.error('Error streaming nextAgentFlow event:', error) + } } streamActionEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'action', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'action', + data + }) + ) + } catch (error) { + console.error('Error streaming action event:', error) + } } streamAbortEvent(chatId: string): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'abort', data: '[DONE]' })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'abort', + data: '[DONE]' + }) + ) + } catch (error) { + console.error('Error streaming abort event:', error) + } } streamEndEvent(_: string) { @@ -181,7 +341,18 @@ export class RedisEventPublisher implements IServerSideEventStreamer { } streamErrorEvent(chatId: string, msg: string) { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'error', data: msg })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'error', + data: msg + }) + ) + } catch (error) { + console.error('Error streaming error event:', error) + } } streamMetadataEvent(chatId: string, apiResponse: any) { @@ -206,28 +377,87 @@ export class RedisEventPublisher implements IServerSideEventStreamer { this.streamCustomEvent(chatId, 'metadata', metadataJson) } } catch (error) { - logger.error('[RedisEventPublisher] Error streaming metadata event:', { error }) + console.error('Error streaming metadata event:', error) } } streamUsageMetadataEvent(chatId: string, data: any): void { - this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'usageMetadata', data })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + eventType: 'usageMetadata', + data + }) + ) + } catch (error) { + console.error('Error streaming usage metadata event:', error) + } } streamTTSStartEvent(chatId: string, chatMessageId: string, format: string): void { - this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_start', data: { format } })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + chatMessageId, + eventType: 'tts_start', + data: { format } + }) + ) + } catch (error) { + console.error('Error streaming TTS start event:', error) + } } streamTTSDataEvent(chatId: string, chatMessageId: string, audioChunk: string): void { - this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_data', data: audioChunk })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + chatMessageId, + eventType: 'tts_data', + data: audioChunk + }) + ) + } catch (error) { + console.error('Error streaming TTS data event:', error) + } } streamTTSEndEvent(chatId: string, chatMessageId: string): void { - this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_end', data: {} })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + chatMessageId, + eventType: 'tts_end', + data: {} + }) + ) + } catch (error) { + console.error('Error streaming TTS end event:', error) + } } streamTTSAbortEvent(chatId: string, chatMessageId: string): void { - this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_abort', data: {} })) + try { + this.redisPublisher.publish( + chatId, + JSON.stringify({ + chatId, + chatMessageId, + eventType: 'tts_abort', + data: {} + }) + ) + } catch (error) { + console.error('Error streaming TTS abort event:', error) + } } async disconnect() { diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index 0cf93502780..c26c46ccba9 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -6,7 +6,6 @@ export class RedisEventSubscriber { private redisSubscriber: ReturnType private sseStreamer: SSEStreamer private subscribedChannels: Set = new Set() - private cleanupInterval: NodeJS.Timeout | null = null constructor(sseStreamer: SSEStreamer) { if (process.env.REDIS_URL) { @@ -57,9 +56,6 @@ export class RedisEventSubscriber { this.redisSubscriber.on('ready', () => { logger.info(`[RedisEventSubscriber] Redis client ready and connected`) - void this.resubscribeAllChannels().catch((err) => { - logger.error(`[RedisEventSubscriber] Failed to resubscribe to channels after ready:`, { error: err }) - }) }) this.redisSubscriber.on('error', (err) => { @@ -84,7 +80,7 @@ export class RedisEventSubscriber { await this.redisSubscriber.connect() } - async subscribe(channel: string) { + subscribe(channel: string) { // Subscribe to the Redis channel for job events if (!this.redisSubscriber) { throw new Error('Redis subscriber not connected.') @@ -95,7 +91,7 @@ export class RedisEventSubscriber { return // Prevent duplicate subscription } - await this.redisSubscriber.subscribe(channel, (message) => { + this.redisSubscriber.subscribe(channel, (message) => { this.handleEvent(message) }) @@ -103,160 +99,86 @@ export class RedisEventSubscriber { this.subscribedChannels.add(channel) } - async unsubscribe(channel: string): Promise { - if (!this.redisSubscriber) return - if (!this.subscribedChannels.has(channel)) return - - try { - await this.redisSubscriber.unsubscribe(channel) - logger.debug( - `[RedisEventSubscriber] Unsubscribed from channel: ${channel}. Active subscriptions: ${this.subscribedChannels.size}` - ) - } catch (error) { - logger.error(`[RedisEventSubscriber] Error unsubscribing from channel ${channel}:`, { error }) - } finally { - this.subscribedChannels.delete(channel) - } - } - - getSubscriptionCount(): number { - return this.subscribedChannels.size - } - - startPeriodicCleanup(intervalMs: number = 60_000) { - this.cleanupInterval = setInterval(() => { - const staleChannels = Array.from(this.subscribedChannels).filter((channel) => !this.sseStreamer.hasClient(channel)) - if (staleChannels.length > 0) { - for (const channel of staleChannels) { - this.unsubscribe(channel) - } - logger.info( - `[RedisEventSubscriber] Periodic cleanup: removed ${staleChannels.length} stale subscriptions. Remaining: ${this.subscribedChannels.size}` - ) - } - }, intervalMs) - } - - private async resubscribeAllChannels(): Promise { - if (this.subscribedChannels.size === 0) return - - const channels = Array.from(this.subscribedChannels) - this.subscribedChannels.clear() - - const messageHandler = (message: string) => this.handleEvent(message) - for (const channel of channels) { - try { - await this.redisSubscriber.subscribe(channel, messageHandler) - this.subscribedChannels.add(channel) - } catch (err) { - logger.error(`[RedisEventSubscriber] Failed to resubscribe to channel ${channel}:`, { error: err }) - } - } - logger.info(`[RedisEventSubscriber] Resubscribed to ${this.subscribedChannels.size} channel(s) after reconnection`) - } - private handleEvent(message: string) { - let event: { eventType?: string; chatId?: string; chatMessageId?: string; data?: any } - try { - event = JSON.parse(message) - } catch (err) { - logger.error(`[RedisEventSubscriber] Failed to parse pub/sub message:`, { error: err, rawMessage: message }) - return - } - - const { eventType, chatId, chatMessageId, data } = event - if (!eventType || chatId === undefined) { - logger.warn(`[RedisEventSubscriber] Invalid event shape (missing eventType or chatId):`, { event }) - return - } - - const chatMessageIdStr = chatMessageId ?? '' - const dataObj = data ?? {} - - try { - switch (eventType) { - case 'start': - this.sseStreamer.streamStartEvent(chatId, data) - break - case 'token': - this.sseStreamer.streamTokenEvent(chatId, data) - break - case 'sourceDocuments': - this.sseStreamer.streamSourceDocumentsEvent(chatId, data) - break - case 'artifacts': - this.sseStreamer.streamArtifactsEvent(chatId, data) - break - case 'usedTools': - this.sseStreamer.streamUsedToolsEvent(chatId, data) - break - case 'calledTools': - this.sseStreamer.streamCalledToolsEvent(chatId, data) - break - case 'fileAnnotations': - this.sseStreamer.streamFileAnnotationsEvent(chatId, data) - break - case 'tool': - this.sseStreamer.streamToolEvent(chatId, data) - break - case 'agentReasoning': - this.sseStreamer.streamAgentReasoningEvent(chatId, data) - break - case 'nextAgent': - this.sseStreamer.streamNextAgentEvent(chatId, data) - break - case 'agentFlowEvent': - this.sseStreamer.streamAgentFlowEvent(chatId, data) - break - case 'agentFlowExecutedData': - this.sseStreamer.streamAgentFlowExecutedDataEvent(chatId, data) - break - case 'nextAgentFlow': - this.sseStreamer.streamNextAgentFlowEvent(chatId, data) - break - case 'action': - this.sseStreamer.streamActionEvent(chatId, data) - break - case 'abort': - this.sseStreamer.streamAbortEvent(chatId) - break - case 'error': - this.sseStreamer.streamErrorEvent(chatId, typeof data === 'string' ? data : String(data ?? '')) - break - case 'metadata': - this.sseStreamer.streamMetadataEvent(chatId, data) - break - case 'usageMetadata': - this.sseStreamer.streamUsageMetadataEvent(chatId, data) - break - case 'tts_start': - this.sseStreamer.streamTTSStartEvent(chatId, chatMessageIdStr, (dataObj as { format?: string }).format ?? '') - break - case 'tts_data': - this.sseStreamer.streamTTSDataEvent(chatId, chatMessageIdStr, data) - break - case 'tts_end': - this.sseStreamer.streamTTSEndEvent(chatId, chatMessageIdStr) - break - case 'tts_abort': - this.sseStreamer.streamTTSAbortEvent(chatId, chatMessageIdStr) - break - default: - logger.debug(`[RedisEventSubscriber] Unknown event type: ${eventType}`) - } - } catch (err) { - logger.error(`[RedisEventSubscriber] Error handling pub/sub event:`, { error: err, eventType, chatId }) - if (chatId) { - this.sseStreamer.streamErrorEvent(chatId, err instanceof Error ? err.message : 'Failed to process stream event') - } + // Parse the message from Redis + const event = JSON.parse(message) + const { eventType, chatId, chatMessageId, data, duration } = event + + // Stream the event to the client + switch (eventType) { + case 'start': + this.sseStreamer.streamStartEvent(chatId, data) + break + case 'token': + this.sseStreamer.streamTokenEvent(chatId, data) + break + case 'thinking': + this.sseStreamer.streamThinkingEvent(chatId, data, duration) + break + case 'sourceDocuments': + this.sseStreamer.streamSourceDocumentsEvent(chatId, data) + break + case 'artifacts': + this.sseStreamer.streamArtifactsEvent(chatId, data) + break + case 'usedTools': + this.sseStreamer.streamUsedToolsEvent(chatId, data) + break + case 'calledTools': + this.sseStreamer.streamCalledToolsEvent(chatId, data) + break + case 'fileAnnotations': + this.sseStreamer.streamFileAnnotationsEvent(chatId, data) + break + case 'tool': + this.sseStreamer.streamToolEvent(chatId, data) + break + case 'agentReasoning': + this.sseStreamer.streamAgentReasoningEvent(chatId, data) + break + case 'nextAgent': + this.sseStreamer.streamNextAgentEvent(chatId, data) + break + case 'agentFlowEvent': + this.sseStreamer.streamAgentFlowEvent(chatId, data) + break + case 'agentFlowExecutedData': + this.sseStreamer.streamAgentFlowExecutedDataEvent(chatId, data) + break + case 'nextAgentFlow': + this.sseStreamer.streamNextAgentFlowEvent(chatId, data) + break + case 'action': + this.sseStreamer.streamActionEvent(chatId, data) + break + case 'abort': + this.sseStreamer.streamAbortEvent(chatId) + break + case 'error': + this.sseStreamer.streamErrorEvent(chatId, data) + break + case 'metadata': + this.sseStreamer.streamMetadataEvent(chatId, data) + break + case 'usageMetadata': + this.sseStreamer.streamUsageMetadataEvent(chatId, data) + break + case 'tts_start': + this.sseStreamer.streamTTSStartEvent(chatId, chatMessageId, data.format) + break + case 'tts_data': + this.sseStreamer.streamTTSDataEvent(chatId, chatMessageId, data) + break + case 'tts_end': + this.sseStreamer.streamTTSEndEvent(chatId, chatMessageId) + break + case 'tts_abort': + this.sseStreamer.streamTTSAbortEvent(chatId, chatMessageId) + break } } async disconnect() { - if (this.cleanupInterval) { - clearInterval(this.cleanupInterval) - this.cleanupInterval = null - } if (this.redisSubscriber) { await this.redisSubscriber.quit() } diff --git a/packages/server/src/utils/SSEStreamer.ts b/packages/server/src/utils/SSEStreamer.ts index 9d12c219d3e..afd656349d0 100644 --- a/packages/server/src/utils/SSEStreamer.ts +++ b/packages/server/src/utils/SSEStreamer.ts @@ -11,87 +11,66 @@ type Client = { } export class SSEStreamer implements IServerSideEventStreamer { - private readonly clients: Map = new Map() - private heartbeatInterval: NodeJS.Timeout | null = null - - hasClient(chatId: string): boolean { - return this.clients.has(chatId) - } + clients: { [id: string]: Client } = {} addExternalClient(chatId: string, res: Response) { - this.clients.set(chatId, { clientType: 'EXTERNAL', response: res, started: false }) + this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false } } addClient(chatId: string, res: Response) { - this.clients.set(chatId, { clientType: 'INTERNAL', response: res, started: false }) - } - - /** - * Safely write data to a client's response. If the write fails (e.g., client already disconnected), - * the client is automatically removed to prevent further writes to a dead connection. - */ - private safeWrite(chatId: string, data: string): boolean { - const client = this.clients.get(chatId) - if (!client) return false - try { - client.response.write(data) - return true - } catch { - this.clients.delete(chatId) - return false - } + this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false } } removeClient(chatId: string) { - const client = this.clients.get(chatId) + const client = this.clients[chatId] if (client) { - try { - const clientResponse = { - event: 'end', - data: '[DONE]' - } - client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') - client.response.end() - } catch { - // Client already disconnected, ignore write errors - } finally { - this.clients.delete(chatId) + const clientResponse = { + event: 'end', + data: '[DONE]' } + client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') + client.response.end() + delete this.clients[chatId] } } streamCustomEvent(chatId: string, eventType: string, data: any) { - const clientResponse = { - event: eventType, - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: eventType, + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamStartEvent(chatId: string, data: string) { - const client = this.clients.get(chatId) + const client = this.clients[chatId] // prevent multiple start events being streamed to the client if (client && !client.started) { const clientResponse = { event: 'start', data: data } - if (this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')) { - client.started = true - } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + client.started = true } } streamTokenEvent(chatId: string, data: string) { - const clientResponse = { - event: 'token', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'token', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamThinkingEvent(chatId: string, data: string, duration?: number) { - const client = this.clients.get(chatId) + const client = this.clients[chatId] if (client) { const clientResponse = { event: 'thinking', @@ -103,96 +82,135 @@ export class SSEStreamer implements IServerSideEventStreamer { } streamSourceDocumentsEvent(chatId: string, data: any) { - const clientResponse = { - event: 'sourceDocuments', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'sourceDocuments', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamArtifactsEvent(chatId: string, data: any) { - const clientResponse = { - event: 'artifacts', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'artifacts', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamUsedToolsEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'usedTools', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'usedTools', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamCalledToolsEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'calledTools', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'calledTools', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamFileAnnotationsEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'fileAnnotations', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'fileAnnotations', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamToolEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'tool', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'tool', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAgentReasoningEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'agentReasoning', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'agentReasoning', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamNextAgentEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'nextAgent', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'nextAgent', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAgentFlowEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'agentFlowEvent', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'agentFlowEvent', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAgentFlowExecutedDataEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'agentFlowExecutedData', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'agentFlowExecutedData', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamNextAgentFlowEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'nextAgentFlow', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'nextAgentFlow', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamActionEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'action', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'action', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAbortEvent(chatId: string): void { - const clientResponse = { - event: 'abort', - data: '[DONE]' + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'abort', + data: '[DONE]' + } + client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamEndEvent(_: string) { @@ -202,11 +220,14 @@ export class SSEStreamer implements IServerSideEventStreamer { streamErrorEvent(chatId: string, msg: string) { if (msg.includes('401 Incorrect API key provided')) msg = '401 Unauthorized – check your API key and ensure it has access to the requested model.' - const clientResponse = { - event: 'error', - data: msg + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'error', + data: msg + } + client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamMetadataEvent(chatId: string, apiResponse: any) { @@ -240,68 +261,59 @@ export class SSEStreamer implements IServerSideEventStreamer { } streamUsageMetadataEvent(chatId: string, data: any): void { - const clientResponse = { - event: 'usageMetadata', - data: data + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'usageMetadata', + data: data + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamTTSStartEvent(chatId: string, chatMessageId: string, format: string): void { - const clientResponse = { - event: 'tts_start', - data: { chatMessageId, format } + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'tts_start', + data: { chatMessageId, format } + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamTTSDataEvent(chatId: string, chatMessageId: string, audioChunk: string): void { - const clientResponse = { - event: 'tts_data', - data: { chatMessageId, audioChunk } + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'tts_data', + data: { chatMessageId, audioChunk } + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamTTSEndEvent(chatId: string, chatMessageId: string): void { - const clientResponse = { - event: 'tts_end', - data: { chatMessageId } - } - this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') - } - - streamTTSAbortEvent(chatId: string, chatMessageId: string): void { - const client = this.clients.get(chatId) + const client = this.clients[chatId] if (client) { - try { - const clientResponse = { - event: 'tts_abort', - data: { chatMessageId } - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') - client.response.end() - } catch { - // Client already disconnected, ignore write errors - } finally { - this.clients.delete(chatId) + const clientResponse = { + event: 'tts_end', + data: { chatMessageId } } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } } - startHeartbeat(intervalMs: number = 30_000) { - this.heartbeatInterval = setInterval(() => { - for (const chatId of this.clients.keys()) { - // SSE comment line — ignored by clients but keeps the connection alive through ALB/proxies - this.safeWrite(chatId, ':heartbeat\n\n') + streamTTSAbortEvent(chatId: string, chatMessageId: string): void { + const client = this.clients[chatId] + if (client) { + const clientResponse = { + event: 'tts_abort', + data: { chatMessageId } } - }, intervalMs) - } - - stopHeartbeat() { - if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval) - this.heartbeatInterval = null + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + client.response.end() + delete this.clients[chatId] } } }