Skip to content

Commit 8b4eeea

Browse files
steookkclaude
andcommitted
Fix WebSocket connection accumulation across gateway restarts
Three listener leaks caused handlers to pile up on each in-process reload: - notification.added_to_channel was registered in StreamChatClientRuntime.start() but never removed; stop() now saves the reference and calls client.off() on it. - ai_indicator.stop was registered in startAccount but handleAbort only removed message.new; extracted to handleAiStop so it can be cleaned up properly. - Added module-level activeGatewayCleanup registry: startAccount force-invokes any stale cleanup for the same accountId before opening a new connection, self-healing the case where the framework reloads without calling stop() first. - handleAbort is now idempotent via a `stopped` boolean guard. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 794d8da commit 8b4eeea

3 files changed

Lines changed: 38 additions & 10 deletions

File tree

CLAUDE.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,5 @@ Config supports a flat default account or named sub-accounts:
152152
- **`safeSendEvent` swallows errors.** Indicator events are best-effort; a failed `ai_indicator` update must not abort message delivery. Retries: 5 attempts, exponential backoff starting at 100 ms, only on 429/5xx.
153153
- **`seenThreads` is process-scoped.** The `Set<string>` tracking "first message in thread" lives at module level, so it persists across gateway reloads until the process restarts. This is intentional — it avoids re-sending parent context for active threads after a config reload.
154154
- **`onTextChunk` receives deltas despite the wire protocol using full text.** `onPartialReply` provides cumulative text; `channel.ts` extracts the delta before calling `onTextChunk`. Inside `StreamingHandler`, `onTextChunk` re-accumulates deltas into `accumulatedText` and passes that full string to `partialUpdateMessage`. The round-trip is: cumulative → delta → cumulative. The delta extraction exists because `StreamingHandler` was designed around the "streaming chunks" mental model — it owns the accumulation and the throttle counter, making that API feel natural. The redundancy is intentional for architectural clarity, not a bug.
155+
- **`activeGatewayCleanup` is a module-level registry for defence against connection accumulation.** `startAccount` stores a cleanup function per `accountId`. On the next `startAccount` call for the same account, any existing entry is force-invoked before creating the new connection. This self-heals the case where OpenClaw's in-process gateway reload calls `startAccount` without calling `stop()` first, which would otherwise leave orphaned WebSocket connections accumulating (each receiving every `message.new` event). `handleAbort` is idempotent via a `stopped` boolean guard, so it is safe to call from both the abort signal and `stop()`.
156+
- **All three SDK event listeners are removed on cleanup.** `handleAbort` explicitly calls `client.off()` for `message.new` and `ai_indicator.stop`; `chatRuntime.stop()` removes `notification.added_to_channel` (saved as `addedToChannelHandler` in `StreamChatClientRuntime`). Failing to remove any one of these would cause listener accumulation across restarts.

src/channel.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ import {
2727
// Track which threads we've already seen (for first-in-thread detection)
2828
const seenThreads = new Set<string>();
2929

30+
// Module-level registry of active gateway cleanup functions keyed by accountId.
31+
// Allows startAccount to force-stop a stale connection if the framework calls
32+
// startAccount again without having called stop() first (e.g. in-process reloads).
33+
const activeGatewayCleanup = new Map<string, () => void>();
34+
3035
// ---------------------------------------------------------------------------
3136
// Reactions helper
3237
// ---------------------------------------------------------------------------
@@ -471,6 +476,16 @@ export const streamchatPlugin: StreamChatChannelPlugin = {
471476
);
472477
}
473478

479+
// Force-stop any stale runtime for this accountId that was never cleaned up
480+
// (can happen when the framework does an in-process reload without calling stop()).
481+
const staleCleanup = activeGatewayCleanup.get(accountId);
482+
if (staleCleanup) {
483+
log?.warn?.(
484+
`[StreamChat] Stale connection detected for account "${accountId}" — forcing cleanup before restart`,
485+
);
486+
staleCleanup();
487+
}
488+
474489
const chatRuntime = new StreamChatClientRuntime(account, log);
475490
const runContexts = new RunContextMap();
476491
const streamingHandler = new StreamingHandler({
@@ -508,15 +523,10 @@ export const streamchatPlugin: StreamChatChannelPlugin = {
508523
});
509524
};
510525

511-
client.on("message.new", handleMessage);
512-
513526
// Listen for force stop from client
514-
client.on("ai_indicator.stop" as "user.watching.start", (event: Event) => {
515-
// Find the run associated with this message
527+
const handleAiStop = (event: Event) => {
516528
const messageId = (event as unknown as Record<string, unknown>).message_id as string | undefined;
517529
if (!messageId) return;
518-
519-
// Look through active streams to find the matching run
520530
const activeRun = runContexts.findByResponseMessageId(messageId);
521531
if (activeRun) {
522532
streamingHandler.onForceStop(activeRun.runId).catch((err) => {
@@ -525,11 +535,19 @@ export const streamchatPlugin: StreamChatChannelPlugin = {
525535
);
526536
});
527537
}
528-
});
538+
};
529539

530-
// Handle abort signal
540+
client.on("message.new", handleMessage);
541+
client.on("ai_indicator.stop" as "user.watching.start", handleAiStop);
542+
543+
// Handle abort signal / explicit stop — idempotent via `stopped` guard
544+
let stopped = false;
531545
const handleAbort = () => {
546+
if (stopped) return;
547+
stopped = true;
532548
client.off("message.new", handleMessage);
549+
client.off("ai_indicator.stop" as "user.watching.start", handleAiStop);
550+
activeGatewayCleanup.delete(accountId);
533551
chatRuntime.stop().catch((err) => {
534552
log?.error?.(
535553
`[StreamChat] Disconnect error: ${String(err)}`,
@@ -542,6 +560,8 @@ export const streamchatPlugin: StreamChatChannelPlugin = {
542560
});
543561
};
544562

563+
activeGatewayCleanup.set(accountId, handleAbort);
564+
545565
if (abortSignal) {
546566
abortSignal.addEventListener("abort", handleAbort, { once: true });
547567
}

src/stream-chat-runtime.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export class StreamChatClientRuntime {
99
private account: ResolvedAccount;
1010
private log?: ChannelLogSink;
1111
private connected = false;
12+
private addedToChannelHandler?: (event: Event) => void;
1213

1314
constructor(account: ResolvedAccount, log?: ChannelLogSink) {
1415
this.account = account;
@@ -48,7 +49,7 @@ export class StreamChatClientRuntime {
4849
);
4950

5051
// Auto-watch new channels the bot is added to
51-
this.client.on("notification.added_to_channel", (event: Event) => {
52+
this.addedToChannelHandler = (event: Event) => {
5253
if (event.channel) {
5354
const ch = this.client.channel(
5455
event.channel.type,
@@ -66,11 +67,16 @@ export class StreamChatClientRuntime {
6667
);
6768
});
6869
}
69-
});
70+
};
71+
this.client.on("notification.added_to_channel", this.addedToChannelHandler);
7072
}
7173

7274
async stop(): Promise<void> {
7375
if (this.connected) {
76+
if (this.addedToChannelHandler) {
77+
this.client.off("notification.added_to_channel", this.addedToChannelHandler);
78+
this.addedToChannelHandler = undefined;
79+
}
7480
this.log?.info?.(`[StreamChat] Disconnecting...`);
7581
await this.client.disconnectUser();
7682
this.connected = false;

0 commit comments

Comments
 (0)