Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 64 additions & 22 deletions ui/src/trace_processor/http_rpc_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import {assertExists, reportError} from '../base/logging';
import {EngineBase} from '../trace_processor/engine';

const RPC_CONNECT_TIMEOUT_MS = 2000;
const INITIAL_RETRY_DELAY_MS = 100;
const MAX_RETRY_DELAY_MS = 30000;
const BACKOFF_MULTIPLIER = 2;

export interface HttpRpcState {
connected: boolean;
Expand All @@ -34,6 +37,8 @@ export class HttpRpcEngine extends EngineBase {
private disposed = false;
private queue: Blob[] = [];
private isProcessingQueue = false;
private retryDelayMs = INITIAL_RETRY_DELAY_MS;
private retryTimeoutId?: ReturnType<typeof setTimeout>;

// Can be changed by frontend/index.ts when passing ?rpc_port=1234 .
static defaultRpcPort = '9001';
Expand All @@ -47,18 +52,8 @@ export class HttpRpcEngine extends EngineBase {
}

rpcSendRequestBytes(data: Uint8Array): void {
if (this.websocket === undefined) {
if (this.disposed) return;
const wsUrl = `ws://${HttpRpcEngine.getHostAndPort(this.port)}/websocket`;
this.websocket = new WebSocket(wsUrl);
this.websocket.onopen = () => this.onWebsocketConnected();
this.websocket.onmessage = (e) => this.onWebsocketMessage(e);
this.websocket.onclose = (e) => this.onWebsocketClosed(e);
this.websocket.onerror = (e) =>
super.fail(
`WebSocket error rs=${(e.target as WebSocket)?.readyState} (ERR:ws)`,
);
}
if (this.disposed) return;
this.websocket ??= this.initWebSocket();

if (this.connected) {
this.websocket.send(data);
Expand All @@ -67,27 +62,67 @@ export class HttpRpcEngine extends EngineBase {
}
}

private initWebSocket(): WebSocket {
const wsUrl = `ws://${HttpRpcEngine.getHostAndPort(this.port)}/websocket`;
this.websocket = new WebSocket(wsUrl);
Comment thread
cdamus marked this conversation as resolved.
this.websocket.onopen = () => this.onWebsocketConnected();
this.websocket.onmessage = (e) => this.onWebsocketMessage(e);
this.websocket.onclose = (e) => this.onWebsocketClosed(e);
this.websocket.onerror = (e) => this.onWebsocketError(e);
return this.websocket;
}

private onWebsocketError(e: Event): void {
if (this.disposed) return;
const readyState = (e.target as WebSocket)?.readyState;
console.warn(`WebSocket error rs=${readyState}, will retry with backoff`);
// The close event will fire after this, which will trigger the retry logic
}

private scheduleReconnect(): void {
if (this.disposed) return;

console.debug(
`Scheduling WebSocket reconnection in ${this.retryDelayMs}ms`,
);

this.retryTimeoutId = setTimeout(() => {
if (this.disposed) return;
console.debug('Attempting WebSocket reconnection...');
this.initWebSocket();
}, this.retryDelayMs);

// Exponential backoff with cap
this.retryDelayMs = Math.min(
this.retryDelayMs * BACKOFF_MULTIPLIER,
MAX_RETRY_DELAY_MS,
Comment thread
cdamus marked this conversation as resolved.
);
}

private onWebsocketConnected() {
// Reset retry delay on successful connection
this.retryDelayMs = INITIAL_RETRY_DELAY_MS;

for (;;) {
const queuedMsg = this.requestQueue.shift();
if (queuedMsg === undefined) break;
assertExists(this.websocket).send(queuedMsg);
}
console.debug('WebSocket (re)connected on port', this.port);
this.connected = true;
}

private onWebsocketClosed(e: CloseEvent) {
if (this.disposed) return;
if (e.code === 1006 && this.connected) {
// On macbooks the act of closing the lid / suspending often causes socket
// disconnections. Try to gracefully re-connect.
console.log('Websocket closed, reconnecting');
this.websocket = undefined;
this.connected = false;
this.rpcSendRequestBytes(new Uint8Array()); // Triggers a reconnection.
} else {
super.fail(`Websocket closed (${e.code}: ${e.reason}) (ERR:ws)`);
Comment thread
cdamus marked this conversation as resolved.
}

// Always attempt to reconnect with backoff, regardless of close code
Comment thread
cdamus marked this conversation as resolved.
console.debug(
`WebSocket closed (code=${e.code}, reason=${e.reason || 'none'}, wasConnected=${this.connected}), scheduling reconnect`,
);

this.websocket = undefined;
this.connected = false;
this.scheduleReconnect();
}

private onWebsocketMessage(e: MessageEvent) {
Expand Down Expand Up @@ -147,6 +182,13 @@ export class HttpRpcEngine extends EngineBase {
[Symbol.dispose]() {
this.disposed = true;
this.connected = false;

// Clear any pending retry timeout
if (this.retryTimeoutId !== undefined) {
clearTimeout(this.retryTimeoutId);
this.retryTimeoutId = undefined;
}

const websocket = this.websocket;
this.websocket = undefined;
websocket?.close();
Expand Down