Skip to content

Revert "fix: redis pub/sub and streaming response issue"#6083

Closed
yau-wd wants to merge 1 commit intomainfrom
revert-6008-fix/streaming
Closed

Revert "fix: redis pub/sub and streaming response issue"#6083
yau-wd wants to merge 1 commit intomainfrom
revert-6008-fix/streaming

Conversation

@yau-wd
Copy link
Copy Markdown
Contributor

@yau-wd yau-wd commented Mar 30, 2026

Reverts #6008

@yau-wd yau-wd closed this Mar 30, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request reverts several stability and resource management features in the SSE and Redis messaging logic, including heartbeats, periodic subscription cleanup, and explicit unsubscription. The feedback highlights critical stability risks, such as potential server crashes from unhandled exceptions in JSON parsing and socket writes, as well as unhandled promise rejections from fire-and-forget asynchronous calls. Additionally, the removal of cleanup logic is expected to cause significant resource leaks, and the transition from specialized data structures and logging utilities to generic ones is noted as a regression in code quality and maintainability.

Comment on lines +83 to 96
streamCustomEvent(chatId: string, eventType: string, data: any) {
try {
await this.redisPublisher.publish(channel, message)
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType,
data
})
)
} catch (error) {
logger.error(`[RedisEventPublisher] Error publishing to channel ${channel}:`, { error })
console.error('Error streaming custom event:', error)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are a few issues with this implementation that are re-introduced by this revert:

  1. Unhandled Promise Rejection: this.redisPublisher.publish() is an async operation but is not awaited. If it fails, the promise rejection will not be caught by the try...catch block, crashing the process.
  2. Code Duplication: The try...catch logic is duplicated in every stream... method, making the code harder to maintain. The reverted safePublish helper was a good pattern to avoid this.
  3. Inconsistent Logging: console.error is used instead of the project's logger utility.

This pattern is repeated for all stream event methods in this file and poses a critical stability risk.

References
  1. Prioritize code readability and understandability over conciseness. A series of simple, chained operations or helper functions can be preferable to duplicated logic if it improves understandability and reduces the potential for future errors.

this.sseStreamer.streamErrorEvent(chatId, err instanceof Error ? err.message : 'Failed to process stream event')
}
// Parse the message from Redis
const event = JSON.parse(message)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The handleEvent method parses the incoming message with JSON.parse without any error handling. If a malformed message is received from Redis, this will throw an unhandled exception and crash the entire server process. The entire method body should be wrapped in a try...catch block to ensure stability.

event: 'token',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This call to client.response.write() is not wrapped in a try...catch block. If the client has disconnected, this will throw an error, leading to an unhandled exception that will crash the server. This applies to all stream... methods in this class. The reverted code used a safeWrite helper to handle this, which should be restored or similar logic implemented here.

References
  1. Prioritize code readability and understandability over conciseness. Using helper patterns like safeWrite reduces the potential for future errors.

if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The subscribe method on the redis client is asynchronous. By removing await, this operation becomes fire-and-forget. While this might be intended to not block the request, any potential errors during the subscription process will not be caught and will result in an unhandled promise rejection, which could crash the application.

if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This revert removes the call to redisSubscriber.unsubscribe(chatId). This means that for every streaming request in queue mode, a new Redis subscription is created but never removed. This will lead to a resource leak, as subscriptions will accumulate over time, consuming memory and resources on both the application server and the Redis instance.

if (isQueueMode) {
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.subscribe(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The subscribe method on the redis client is asynchronous. By removing await, this operation becomes fire-and-forget. While this might be intended to not block the request, any potential errors during the subscription process will not be caught and will result in an unhandled promise rejection, which could crash the application.

if (isQueueMode && chatId) {
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
sseStreamer.removeClient(chatId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This revert removes the call to redisSubscriber.unsubscribe(chatId). This means that for every streaming request in queue mode, a new Redis subscription is created but never removed. This will lead to a resource leak, as subscriptions will accumulate over time, consuming memory and resources on both the application server and the Redis instance.

// Mark the channel as subscribed
this.subscribedChannels.add(channel)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This revert removes the unsubscribe method and the startPeriodicCleanup logic. This means Redis subscriptions are created but never cleaned up, even after the client connection is closed. This will cause a significant resource leak, as the number of active subscriptions will grow indefinitely, consuming memory and resources on both the application and Redis servers. This logic is critical for production stability and should be restored or replaced with an alternative cleanup mechanism.

hasClient(chatId: string): boolean {
return this.clients.has(chatId)
}
clients: { [id: string]: Client } = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The clients property is changed from a Map to a plain object. While functionally similar for this use case, Map can be more performant for frequent additions and removals, and more clearly expresses the intent of a key-value collection. The reverted code also removed the hasClient method, which was a clean way to check for a client's existence.

References
  1. Prioritize code readability and understandability over conciseness. Using appropriate data structures like Map can more clearly express intent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant