diff --git a/ui/src/trace_processor/http_rpc_engine.ts b/ui/src/trace_processor/http_rpc_engine.ts index 1e0425c917a..cd827b82435 100644 --- a/ui/src/trace_processor/http_rpc_engine.ts +++ b/ui/src/trace_processor/http_rpc_engine.ts @@ -18,6 +18,9 @@ import {assertExists, reportError} from '../base/logging'; import {EngineBase} from '../trace_processor/engine'; const RPC_CONNECT_TIMEOUT_MS = 2000; +const INITIAL_RETRY_DELAY_MS = 100; +const MAX_RETRY_DELAY_MS = 30000; +const BACKOFF_MULTIPLIER = 2; export interface HttpRpcState { connected: boolean; @@ -34,6 +37,8 @@ export class HttpRpcEngine extends EngineBase { private disposed = false; private queue: Blob[] = []; private isProcessingQueue = false; + private retryDelayMs = INITIAL_RETRY_DELAY_MS; + private retryTimeoutId?: ReturnType; // Can be changed by frontend/index.ts when passing ?rpc_port=1234 . static defaultRpcPort = '9001'; @@ -47,47 +52,109 @@ export class HttpRpcEngine extends EngineBase { } rpcSendRequestBytes(data: Uint8Array): void { - if (this.websocket === undefined) { - if (this.disposed) return; - const wsUrl = `ws://${HttpRpcEngine.getHostAndPort(this.port)}/websocket`; - this.websocket = new WebSocket(wsUrl); - this.websocket.onopen = () => this.onWebsocketConnected(); - this.websocket.onmessage = (e) => this.onWebsocketMessage(e); - this.websocket.onclose = (e) => this.onWebsocketClosed(e); - this.websocket.onerror = (e) => - super.fail( - `WebSocket error rs=${(e.target as WebSocket)?.readyState} (ERR:ws)`, - ); - } + if (this.disposed) return; + const websocket = this.getOrCreateWebSocket(); if (this.connected) { - this.websocket.send(data); + websocket.send(data); } else { this.requestQueue.push(data); // onWebsocketConnected() will flush this. } } + /** + * Returns the existing WebSocket if one exists and is not closed, + * otherwise creates a new one (closing any stale socket first). + */ + private getOrCreateWebSocket(): WebSocket { + // If we have an active websocket that's not closed/closing, reuse it + if ( + this.websocket !== undefined && + this.websocket.readyState !== WebSocket.CLOSED && + this.websocket.readyState !== WebSocket.CLOSING + ) { + return this.websocket; + } + + // Close any stale websocket before creating a new one + this.closeWebSocket(); + + const wsUrl = `ws://${HttpRpcEngine.getHostAndPort(this.port)}/websocket`; + this.websocket = new WebSocket(wsUrl); + this.websocket.onopen = () => this.onWebsocketConnected(); + this.websocket.onmessage = (e) => this.onWebsocketMessage(e); + this.websocket.onclose = (e) => this.onWebsocketClosed(e); + this.websocket.onerror = (e) => this.onWebsocketError(e); + return this.websocket; + } + + /** + * Closes the current websocket if one exists, clearing event handlers + * to prevent spurious callbacks. + */ + private closeWebSocket(): void { + if (this.websocket === undefined) return; + + // Clear handlers to prevent callbacks from a closing socket + this.websocket.onopen = null; + this.websocket.onmessage = null; + this.websocket.onclose = null; + this.websocket.onerror = null; + this.websocket.close(); + this.websocket = undefined; + } + + private onWebsocketError(e: Event): void { + if (this.disposed) return; + const readyState = (e.target as WebSocket)?.readyState; + console.warn(`WebSocket error rs=${readyState}, will retry with backoff`); + // The close event will fire after this, which will trigger the retry logic + } + + private scheduleReconnect(): void { + if (this.disposed) return; + + console.debug( + `Scheduling WebSocket reconnection in ${this.retryDelayMs}ms`, + ); + + this.retryTimeoutId = setTimeout(() => { + if (this.disposed) return; + console.debug('Attempting WebSocket reconnection...'); + this.getOrCreateWebSocket(); + }, this.retryDelayMs); + + // Exponential backoff with cap + this.retryDelayMs = Math.min( + this.retryDelayMs * BACKOFF_MULTIPLIER, + MAX_RETRY_DELAY_MS, + ); + } + private onWebsocketConnected() { + // Reset retry delay on successful connection + this.retryDelayMs = INITIAL_RETRY_DELAY_MS; + for (;;) { const queuedMsg = this.requestQueue.shift(); if (queuedMsg === undefined) break; assertExists(this.websocket).send(queuedMsg); } + console.debug('WebSocket (re)connected on port', this.port); this.connected = true; } private onWebsocketClosed(e: CloseEvent) { if (this.disposed) return; - if (e.code === 1006 && this.connected) { - // On macbooks the act of closing the lid / suspending often causes socket - // disconnections. Try to gracefully re-connect. - console.log('Websocket closed, reconnecting'); - this.websocket = undefined; - this.connected = false; - this.rpcSendRequestBytes(new Uint8Array()); // Triggers a reconnection. - } else { - super.fail(`Websocket closed (${e.code}: ${e.reason}) (ERR:ws)`); - } + + // Always attempt to reconnect with backoff, regardless of close code + console.debug( + `WebSocket closed (code=${e.code}, reason=${e.reason || 'none'}, wasConnected=${this.connected}), scheduling reconnect`, + ); + + this.websocket = undefined; + this.connected = false; + this.scheduleReconnect(); } private onWebsocketMessage(e: MessageEvent) { @@ -147,8 +214,13 @@ export class HttpRpcEngine extends EngineBase { [Symbol.dispose]() { this.disposed = true; this.connected = false; - const websocket = this.websocket; - this.websocket = undefined; - websocket?.close(); + + // Clear any pending retry timeout + if (this.retryTimeoutId !== undefined) { + clearTimeout(this.retryTimeoutId); + this.retryTimeoutId = undefined; + } + + this.closeWebSocket(); } }