diff --git a/packages/server/src/controllers/internal-predictions/index.ts b/packages/server/src/controllers/internal-predictions/index.ts index b5614faf248..8066022fb1d 100644 --- a/packages/server/src/controllers/internal-predictions/index.ts +++ b/packages/server/src/controllers/internal-predictions/index.ts @@ -42,10 +42,19 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629 res.flushHeaders() - if (process.env.MODE === MODE.QUEUE) { + const isQueueMode = process.env.MODE === MODE.QUEUE + if (isQueueMode) { getRunningExpressApp().redisSubscriber.subscribe(chatId) } + // Detect client disconnect (browser close, network timeout, ALB idle timeout) + res.on('close', () => { + sseStreamer.removeClient(chatId) + if (isQueueMode) { + getRunningExpressApp().redisSubscriber.unsubscribe(chatId) + } + }) + const apiResponse = await utilBuildChatflow(req, true) sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse) } catch (error) { @@ -55,6 +64,9 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne next(error) } finally { sseStreamer.removeClient(chatId) + if (process.env.MODE === MODE.QUEUE) { + getRunningExpressApp().redisSubscriber.unsubscribe(chatId) + } } } export default { diff --git a/packages/server/src/controllers/predictions/index.ts b/packages/server/src/controllers/predictions/index.ts index d467f3157e4..7e8d5d8a64f 100644 --- a/packages/server/src/controllers/predictions/index.ts +++ b/packages/server/src/controllers/predictions/index.ts @@ -72,10 +72,19 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction) res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629 res.flushHeaders() - if (process.env.MODE === MODE.QUEUE) { + const isQueueMode = process.env.MODE === MODE.QUEUE + if (isQueueMode) { getRunningExpressApp().redisSubscriber.subscribe(chatId) } + // Detect client disconnect (browser close, network timeout, ALB idle timeout) + res.on('close', () => { + sseStreamer.removeClient(chatId) + if (isQueueMode) { + getRunningExpressApp().redisSubscriber.unsubscribe(chatId) + } + }) + const apiResponse = await predictionsServices.buildChatflow(req) sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse) } catch (error) { @@ -85,6 +94,9 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction) next(error) } finally { sseStreamer.removeClient(chatId) + if (process.env.MODE === MODE.QUEUE) { + getRunningExpressApp().redisSubscriber.unsubscribe(chatId) + } } } else { const apiResponse = await predictionsServices.buildChatflow(req) diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index bbb31910e9e..523a063bc49 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -128,6 +128,7 @@ export class App { // Initialize SSE Streamer this.sseStreamer = new SSEStreamer() + this.sseStreamer.startHeartbeat() logger.info('🌊 [server]: SSE Streamer initialized successfully') // Init Queues @@ -148,6 +149,7 @@ export class App { this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer) await this.redisSubscriber.connect() + this.redisSubscriber.startPeriodicCleanup() logger.info('🔗 [server]: Redis event subscriber connected successfully') } @@ -361,6 +363,7 @@ export class App { async stopApp() { try { + this.sseStreamer.stopHeartbeat() const removePromises: any[] = [] removePromises.push(this.telemetry.flush()) if (this.queueManager) { diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index cb8aa6ecf1f..6d185f70d71 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -80,244 +80,80 @@ export class RedisEventPublisher implements IServerSideEventStreamer { await this.redisPublisher.connect() } - streamCustomEvent(chatId: string, eventType: string, data: any) { + private async safePublish(channel: string, message: string) { + if (!this.redisPublisher.isReady) { + logger.warn(`[RedisEventPublisher] Cannot publish to channel ${channel}: Redis client not ready`) + return + } try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType, - data - }) - ) + await this.redisPublisher.publish(channel, message) } catch (error) { - console.error('Error streaming custom event:', error) + logger.error(`[RedisEventPublisher] Error publishing to channel ${channel}:`, { error }) } } + streamCustomEvent(chatId: string, eventType: string, data: any) { + this.safePublish(chatId, JSON.stringify({ chatId, eventType, data })) + } + streamStartEvent(chatId: string, data: string) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'start', - data - }) - ) - } catch (error) { - console.error('Error streaming start event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'start', data })) } streamTokenEvent(chatId: string, data: string) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'token', - data - }) - ) - } catch (error) { - console.error('Error streaming token event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'token', data })) } streamSourceDocumentsEvent(chatId: string, data: any) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'sourceDocuments', - data - }) - ) - } catch (error) { - console.error('Error streaming sourceDocuments event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'sourceDocuments', data })) } streamArtifactsEvent(chatId: string, data: any) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'artifacts', - data - }) - ) - } catch (error) { - console.error('Error streaming artifacts event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'artifacts', data })) } streamUsedToolsEvent(chatId: string, data: any) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'usedTools', - data - }) - ) - } catch (error) { - console.error('Error streaming usedTools event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'usedTools', data })) } streamCalledToolsEvent(chatId: string, data: any) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'calledTools', - data - }) - ) - } catch (error) { - console.error('Error streaming calledTools event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'calledTools', data })) } streamFileAnnotationsEvent(chatId: string, data: any) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'fileAnnotations', - data - }) - ) - } catch (error) { - console.error('Error streaming fileAnnotations event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'fileAnnotations', data })) } streamToolEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'tool', - data - }) - ) - } catch (error) { - console.error('Error streaming tool event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'tool', data })) } streamAgentReasoningEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'agentReasoning', - data - }) - ) - } catch (error) { - console.error('Error streaming agentReasoning event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'agentReasoning', data })) } streamAgentFlowEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'agentFlowEvent', - data - }) - ) - } catch (error) { - console.error('Error streaming agentFlow event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'agentFlowEvent', data })) } streamAgentFlowExecutedDataEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'agentFlowExecutedData', - data - }) - ) - } catch (error) { - console.error('Error streaming agentFlowExecutedData event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'agentFlowExecutedData', data })) } streamNextAgentEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'nextAgent', - data - }) - ) - } catch (error) { - console.error('Error streaming nextAgent event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'nextAgent', data })) } streamNextAgentFlowEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'nextAgentFlow', - data - }) - ) - } catch (error) { - console.error('Error streaming nextAgentFlow event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'nextAgentFlow', data })) } streamActionEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'action', - data - }) - ) - } catch (error) { - console.error('Error streaming action event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'action', data })) } streamAbortEvent(chatId: string): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'abort', - data: '[DONE]' - }) - ) - } catch (error) { - console.error('Error streaming abort event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'abort', data: '[DONE]' })) } streamEndEvent(_: string) { @@ -325,18 +161,7 @@ export class RedisEventPublisher implements IServerSideEventStreamer { } streamErrorEvent(chatId: string, msg: string) { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'error', - data: msg - }) - ) - } catch (error) { - console.error('Error streaming error event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'error', data: msg })) } streamMetadataEvent(chatId: string, apiResponse: any) { @@ -361,87 +186,28 @@ export class RedisEventPublisher implements IServerSideEventStreamer { this.streamCustomEvent(chatId, 'metadata', metadataJson) } } catch (error) { - console.error('Error streaming metadata event:', error) + logger.error('[RedisEventPublisher] Error streaming metadata event:', { error }) } } streamUsageMetadataEvent(chatId: string, data: any): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - eventType: 'usageMetadata', - data - }) - ) - } catch (error) { - console.error('Error streaming usage metadata event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, eventType: 'usageMetadata', data })) } streamTTSStartEvent(chatId: string, chatMessageId: string, format: string): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - chatMessageId, - eventType: 'tts_start', - data: { format } - }) - ) - } catch (error) { - console.error('Error streaming TTS start event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_start', data: { format } })) } streamTTSDataEvent(chatId: string, chatMessageId: string, audioChunk: string): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - chatMessageId, - eventType: 'tts_data', - data: audioChunk - }) - ) - } catch (error) { - console.error('Error streaming TTS data event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_data', data: audioChunk })) } streamTTSEndEvent(chatId: string, chatMessageId: string): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - chatMessageId, - eventType: 'tts_end', - data: {} - }) - ) - } catch (error) { - console.error('Error streaming TTS end event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_end', data: {} })) } streamTTSAbortEvent(chatId: string, chatMessageId: string): void { - try { - this.redisPublisher.publish( - chatId, - JSON.stringify({ - chatId, - chatMessageId, - eventType: 'tts_abort', - data: {} - }) - ) - } catch (error) { - console.error('Error streaming TTS abort event:', error) - } + this.safePublish(chatId, JSON.stringify({ chatId, chatMessageId, eventType: 'tts_abort', data: {} })) } async disconnect() { diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index c70d6f73284..b21159699bb 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -6,6 +6,7 @@ export class RedisEventSubscriber { private redisSubscriber: ReturnType private sseStreamer: SSEStreamer private subscribedChannels: Set = new Set() + private cleanupInterval: NodeJS.Timeout | null = null constructor(sseStreamer: SSEStreamer) { if (process.env.REDIS_URL) { @@ -99,6 +100,45 @@ export class RedisEventSubscriber { this.subscribedChannels.add(channel) } + async unsubscribe(channel: string) { + if (!this.subscribedChannels.has(channel)) { + return + } + + try { + await this.redisSubscriber.unsubscribe(channel) + this.subscribedChannels.delete(channel) + logger.debug( + `[RedisEventSubscriber] Unsubscribed from channel: ${channel}. Active subscriptions: ${this.subscribedChannels.size}` + ) + } catch (error) { + logger.error(`[RedisEventSubscriber] Error unsubscribing from channel ${channel}:`, { error }) + // Still remove from tracking set to prevent stale entries + this.subscribedChannels.delete(channel) + } + } + + getSubscriptionCount(): number { + return this.subscribedChannels.size + } + + startPeriodicCleanup(intervalMs: number = 60_000) { + this.cleanupInterval = setInterval(() => { + let cleaned = 0 + for (const channel of this.subscribedChannels) { + if (!this.sseStreamer.clients[channel]) { + this.unsubscribe(channel) + cleaned++ + } + } + if (cleaned > 0) { + logger.info( + `[RedisEventSubscriber] Periodic cleanup: removed ${cleaned} stale subscriptions. Remaining: ${this.subscribedChannels.size}` + ) + } + }, intervalMs) + } + private handleEvent(message: string) { // Parse the message from Redis const event = JSON.parse(message) @@ -176,6 +216,10 @@ export class RedisEventSubscriber { } async disconnect() { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval) + this.cleanupInterval = null + } if (this.redisSubscriber) { await this.redisSubscriber.quit() } diff --git a/packages/server/src/utils/SSEStreamer.ts b/packages/server/src/utils/SSEStreamer.ts index 2d858a6cf9f..5001042a3b3 100644 --- a/packages/server/src/utils/SSEStreamer.ts +++ b/packages/server/src/utils/SSEStreamer.ts @@ -12,6 +12,7 @@ type Client = { export class SSEStreamer implements IServerSideEventStreamer { clients: { [id: string]: Client } = {} + private heartbeatInterval: NodeJS.Timeout | null = null addExternalClient(chatId: string, res: Response) { this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false } @@ -21,30 +22,48 @@ export class SSEStreamer implements IServerSideEventStreamer { this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false } } - removeClient(chatId: string) { + /** + * Safely write data to a client's response. If the write fails (e.g., client already disconnected), + * the client is automatically removed to prevent further writes to a dead connection. + */ + private safeWrite(chatId: string, data: string): boolean { const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'end', - data: '[DONE]' - } - client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') - client.response.end() + if (!client) return false + try { + client.response.write(data) + return true + } catch { delete this.clients[chatId] + return false } } - streamCustomEvent(chatId: string, eventType: string, data: any) { + removeClient(chatId: string) { const client = this.clients[chatId] if (client) { - const clientResponse = { - event: eventType, - data: data + try { + const clientResponse = { + event: 'end', + data: '[DONE]' + } + client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') + client.response.end() + } catch { + // Client already disconnected, ignore write errors + } finally { + delete this.clients[chatId] } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } } + streamCustomEvent(chatId: string, eventType: string, data: any) { + const clientResponse = { + event: eventType, + data: data + } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + } + streamStartEvent(chatId: string, data: string) { const client = this.clients[chatId] // prevent multiple start events being streamed to the client @@ -53,152 +72,111 @@ export class SSEStreamer implements IServerSideEventStreamer { event: 'start', data: data } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') - client.started = true + if (this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')) { + client.started = true + } } } streamTokenEvent(chatId: string, data: string) { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'token', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'token', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamSourceDocumentsEvent(chatId: string, data: any) { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'sourceDocuments', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'sourceDocuments', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamArtifactsEvent(chatId: string, data: any) { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'artifacts', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'artifacts', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamUsedToolsEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'usedTools', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'usedTools', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamCalledToolsEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'calledTools', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'calledTools', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamFileAnnotationsEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'fileAnnotations', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'fileAnnotations', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamToolEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'tool', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'tool', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAgentReasoningEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'agentReasoning', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'agentReasoning', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamNextAgentEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'nextAgent', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'nextAgent', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAgentFlowEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'agentFlowEvent', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'agentFlowEvent', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAgentFlowExecutedDataEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'agentFlowExecutedData', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'agentFlowExecutedData', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamNextAgentFlowEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'nextAgentFlow', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'nextAgentFlow', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamActionEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'action', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'action', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamAbortEvent(chatId: string): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'abort', - data: '[DONE]' - } - client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'abort', + data: '[DONE]' } + this.safeWrite(chatId, 'message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamEndEvent(_: string) { @@ -208,14 +186,11 @@ export class SSEStreamer implements IServerSideEventStreamer { streamErrorEvent(chatId: string, msg: string) { if (msg.includes('401 Incorrect API key provided')) msg = '401 Unauthorized – check your API key and ensure it has access to the requested model.' - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'error', - data: msg - } - client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'error', + data: msg } + this.safeWrite(chatId, 'message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamMetadataEvent(chatId: string, apiResponse: any) { @@ -249,59 +224,69 @@ export class SSEStreamer implements IServerSideEventStreamer { } streamUsageMetadataEvent(chatId: string, data: any): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'usageMetadata', - data: data - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'usageMetadata', + data: data } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamTTSStartEvent(chatId: string, chatMessageId: string, format: string): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'tts_start', - data: { chatMessageId, format } - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'tts_start', + data: { chatMessageId, format } } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamTTSDataEvent(chatId: string, chatMessageId: string, audioChunk: string): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'tts_data', - data: { chatMessageId, audioChunk } - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'tts_data', + data: { chatMessageId, audioChunk } } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamTTSEndEvent(chatId: string, chatMessageId: string): void { - const client = this.clients[chatId] - if (client) { - const clientResponse = { - event: 'tts_end', - data: { chatMessageId } - } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + const clientResponse = { + event: 'tts_end', + data: { chatMessageId } } + this.safeWrite(chatId, 'message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } streamTTSAbortEvent(chatId: string, chatMessageId: string): void { const client = this.clients[chatId] if (client) { - const clientResponse = { - event: 'tts_abort', - data: { chatMessageId } + try { + const clientResponse = { + event: 'tts_abort', + data: { chatMessageId } + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + client.response.end() + } catch { + // Client already disconnected, ignore write errors + } finally { + delete this.clients[chatId] } - client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') - client.response.end() - delete this.clients[chatId] + } + } + + startHeartbeat(intervalMs: number = 30_000) { + this.heartbeatInterval = setInterval(() => { + const clientIds = Object.keys(this.clients) + for (const chatId of clientIds) { + // SSE comment line — ignored by clients but keeps the connection alive through ALB/proxies + this.safeWrite(chatId, ':heartbeat\n\n') + } + }, intervalMs) + } + + stopHeartbeat() { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval) + this.heartbeatInterval = null } } }