Revert "fix: redis pub/sub and streaming response issue"#6083
Revert "fix: redis pub/sub and streaming response issue"#6083
Conversation
This reverts commit 34aa82e.
There was a problem hiding this comment.
Code Review
This pull request reverts several stability and resource management features in the SSE and Redis messaging logic, including heartbeats, periodic subscription cleanup, and explicit unsubscription. The feedback highlights critical stability risks, such as potential server crashes from unhandled exceptions in JSON parsing and socket writes, as well as unhandled promise rejections from fire-and-forget asynchronous calls. Additionally, the removal of cleanup logic is expected to cause significant resource leaks, and the transition from specialized data structures and logging utilities to generic ones is noted as a regression in code quality and maintainability.
| streamCustomEvent(chatId: string, eventType: string, data: any) { | ||
| try { | ||
| await this.redisPublisher.publish(channel, message) | ||
| this.redisPublisher.publish( | ||
| chatId, | ||
| JSON.stringify({ | ||
| chatId, | ||
| eventType, | ||
| data | ||
| }) | ||
| ) | ||
| } catch (error) { | ||
| logger.error(`[RedisEventPublisher] Error publishing to channel ${channel}:`, { error }) | ||
| console.error('Error streaming custom event:', error) | ||
| } | ||
| } |
There was a problem hiding this comment.
There are a few issues with this implementation that are re-introduced by this revert:
- Unhandled Promise Rejection:
this.redisPublisher.publish()is an async operation but is notawaited. If it fails, the promise rejection will not be caught by thetry...catchblock, crashing the process. - Code Duplication: The
try...catchlogic is duplicated in everystream...method, making the code harder to maintain. The revertedsafePublishhelper was a good pattern to avoid this. - Inconsistent Logging:
console.erroris used instead of the project'sloggerutility.
This pattern is repeated for all stream event methods in this file and poses a critical stability risk.
References
- Prioritize code readability and understandability over conciseness. A series of simple, chained operations or helper functions can be preferable to duplicated logic if it improves understandability and reduces the potential for future errors.
| this.sseStreamer.streamErrorEvent(chatId, err instanceof Error ? err.message : 'Failed to process stream event') | ||
| } | ||
| // Parse the message from Redis | ||
| const event = JSON.parse(message) |
There was a problem hiding this comment.
The handleEvent method parses the incoming message with JSON.parse without any error handling. If a malformed message is received from Redis, this will throw an unhandled exception and crash the entire server process. The entire method body should be wrapped in a try...catch block to ensure stability.
| event: 'token', | ||
| data: data | ||
| } | ||
| client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') |
There was a problem hiding this comment.
This call to client.response.write() is not wrapped in a try...catch block. If the client has disconnected, this will throw an error, leading to an unhandled exception that will crash the server. This applies to all stream... methods in this class. The reverted code used a safeWrite helper to handle this, which should be restored or similar logic implemented here.
References
- Prioritize code readability and understandability over conciseness. Using helper patterns like safeWrite reduces the potential for future errors.
| if (isQueueMode) { | ||
| await getRunningExpressApp().redisSubscriber.subscribe(chatId) | ||
| if (process.env.MODE === MODE.QUEUE) { | ||
| getRunningExpressApp().redisSubscriber.subscribe(chatId) |
There was a problem hiding this comment.
The subscribe method on the redis client is asynchronous. By removing await, this operation becomes fire-and-forget. While this might be intended to not block the request, any potential errors during the subscription process will not be caught and will result in an unhandled promise rejection, which could crash the application.
| if (isQueueMode && chatId) { | ||
| await getRunningExpressApp().redisSubscriber.unsubscribe(chatId) | ||
| } | ||
| sseStreamer.removeClient(chatId) |
There was a problem hiding this comment.
This revert removes the call to redisSubscriber.unsubscribe(chatId). This means that for every streaming request in queue mode, a new Redis subscription is created but never removed. This will lead to a resource leak, as subscriptions will accumulate over time, consuming memory and resources on both the application server and the Redis instance.
| if (isQueueMode) { | ||
| await getRunningExpressApp().redisSubscriber.subscribe(chatId) | ||
| if (process.env.MODE === MODE.QUEUE) { | ||
| getRunningExpressApp().redisSubscriber.subscribe(chatId) |
There was a problem hiding this comment.
The subscribe method on the redis client is asynchronous. By removing await, this operation becomes fire-and-forget. While this might be intended to not block the request, any potential errors during the subscription process will not be caught and will result in an unhandled promise rejection, which could crash the application.
| if (isQueueMode && chatId) { | ||
| await getRunningExpressApp().redisSubscriber.unsubscribe(chatId) | ||
| } | ||
| sseStreamer.removeClient(chatId) |
There was a problem hiding this comment.
This revert removes the call to redisSubscriber.unsubscribe(chatId). This means that for every streaming request in queue mode, a new Redis subscription is created but never removed. This will lead to a resource leak, as subscriptions will accumulate over time, consuming memory and resources on both the application server and the Redis instance.
| // Mark the channel as subscribed | ||
| this.subscribedChannels.add(channel) | ||
| } | ||
|
|
There was a problem hiding this comment.
This revert removes the unsubscribe method and the startPeriodicCleanup logic. This means Redis subscriptions are created but never cleaned up, even after the client connection is closed. This will cause a significant resource leak, as the number of active subscriptions will grow indefinitely, consuming memory and resources on both the application and Redis servers. This logic is critical for production stability and should be restored or replaced with an alternative cleanup mechanism.
| hasClient(chatId: string): boolean { | ||
| return this.clients.has(chatId) | ||
| } | ||
| clients: { [id: string]: Client } = {} |
There was a problem hiding this comment.
The clients property is changed from a Map to a plain object. While functionally similar for this use case, Map can be more performant for frequent additions and removals, and more clearly expresses the intent of a key-value collection. The reverted code also removed the hasClient method, which was a clean way to check for a client's existence.
References
- Prioritize code readability and understandability over conciseness. Using appropriate data structures like Map can more clearly express intent.
Reverts #6008