Skip to content

Commit 50b72ac

Browse files
authored
Revert "enhance ws reconnect logic (#547)" (#600)
This reverts commit 6d5d449.
1 parent c2adf68 commit 50b72ac

1 file changed

Lines changed: 18 additions & 58 deletions

File tree

src/transports/websocket.ts

Lines changed: 18 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ import { connectionErrorLabels, recordWsMessageMetrics } from './metrics'
1010

1111
// Aliasing type for use at adapter level
1212
export { WebSocket, RawData as WebSocketRawData }
13-
// Explicit initial grace (1s) after open for web socket; after that, rely on "time since last message"
14-
const INITIAL_WS_GRACE_MS = 1000
13+
1514
const logger = makeLogger('WebSocketTransport')
1615

1716
type WebSocketClass = new (
@@ -160,11 +159,7 @@ export class WebSocketTransport<
160159
}
161160

162161
connectionClosed(): boolean {
163-
return (
164-
!this.wsConnection ||
165-
this.wsConnection.readyState === WebSocket.CLOSED ||
166-
this.wsConnection.readyState === WebSocket.CLOSING
167-
)
162+
return !this.wsConnection || this.wsConnection.readyState === WebSocket.CLOSED
168163
}
169164

170165
serializeMessage(payload: unknown): string {
@@ -192,7 +187,6 @@ export class WebSocketTransport<
192187

193188
// Called when any message is received by the open connection
194189
message: async (event: WebSocket.MessageEvent) => {
195-
this.lastMessageReceivedAt = Date.now()
196190
const parsed = this.deserializeMessage(event.data)
197191
censorLogs(() => logger.trace(`Got ws message: ${event.data}`))
198192
const providerDataReceived = Date.now()
@@ -208,6 +202,8 @@ export class WebSocketTransport<
208202
})
209203
logger.trace(`Writing ${results?.length ?? 0} responses to cache`)
210204
if (Array.isArray(results) && results.length > 0) {
205+
// Updating the last message received time here, to only care about messages we use
206+
this.lastMessageReceivedAt = Date.now()
211207
await this.responseCache.write(this.name, results)
212208
}
213209

@@ -342,14 +338,20 @@ export class WebSocketTransport<
342338
const now = Date.now()
343339
const timeSinceLastMessage = Math.max(0, now - this.lastMessageReceivedAt)
344340
const timeSinceConnectionOpened = Math.max(0, now - this.connectionOpenedAt)
341+
const timeSinceLastActivity = Math.min(timeSinceLastMessage, timeSinceConnectionOpened)
345342
const connectionUnresponsive =
346-
timeSinceConnectionOpened >= INITIAL_WS_GRACE_MS &&
347-
timeSinceLastMessage > context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL
343+
timeSinceLastActivity > 0 &&
344+
timeSinceLastActivity > context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL
348345

349346
let connectionClosed = this.connectionClosed()
350-
logger.trace(
351-
`WS conn staleness info: now: ${now} | timeSinceLastMessage: ${timeSinceLastMessage} | timeSinceConnectionOpened: ${timeSinceConnectionOpened} | subscriptionUnresponsiveTtl: ${context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL} | connectionUnresponsive: ${connectionUnresponsive} |`,
352-
)
347+
logger.trace(`WS conn staleness info:
348+
now: ${now} |
349+
timeSinceLastMessage: ${timeSinceLastMessage} |
350+
timeSinceConnectionOpened: ${timeSinceConnectionOpened} |
351+
timeSinceLastActivity: ${timeSinceLastActivity} |
352+
subscriptionUnresponsiveTtl: ${context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL} |
353+
connectionUnresponsive: ${connectionUnresponsive} |
354+
`)
353355

354356
// Check if we should close the current connection
355357
if (!connectionClosed && (urlChanged || connectionUnresponsive)) {
@@ -370,18 +372,13 @@ export class WebSocketTransport<
370372
// Check if connection was opened very recently; if so, wait a bit before continuing.
371373
// This is so if we just opened the connection and are waiting to receive some messages,
372374
// we don't close is immediately after and miss the chance to receive them
373-
if (timeSinceConnectionOpened < INITIAL_WS_GRACE_MS) {
375+
if (timeSinceConnectionOpened < 1000) {
374376
logger.info(
375377
`Connection was opened only ${timeSinceConnectionOpened}ms ago, waiting for that to get to 1s before continuing...`,
376378
)
377-
await sleep(INITIAL_WS_GRACE_MS - timeSinceConnectionOpened)
378-
}
379-
if (this.wsConnection && this.wsConnection.readyState !== WebSocket.CLOSED) {
380-
const old = this.wsConnection
381-
// Prevent any further sends/reads via this reference
382-
this.wsConnection = undefined
383-
await waitForGracefulClose(old, 1500)
379+
await sleep(1000 - timeSinceConnectionOpened)
384380
}
381+
this.wsConnection?.close(1000)
385382
connectionClosed = true
386383

387384
if (subscriptions.desired.length) {
@@ -507,40 +504,3 @@ export class WebsocketReverseMappingTransport<
507504
return this.requestMapping.get(value)
508505
}
509506
}
510-
511-
/**
512-
* Waits for a WebSocket connection to close gracefully.
513-
*
514-
* @param ws - The WebSocket instance (from the `ws` package).
515-
* @param gracefulMs - Max time in milliseconds to wait for a graceful close before terminating. Defaults to 1500 ms.
516-
* @returns A promise that resolves when the socket has closed or been terminated.
517-
*/
518-
async function waitForGracefulClose(ws: WebSocket, gracefulMs = 1500): Promise<void> {
519-
if (!ws || ws.readyState === WebSocket.CLOSED) {
520-
return
521-
}
522-
523-
const closed = new Promise<void>((resolve) => {
524-
ws.once('close', resolve)
525-
})
526-
527-
try {
528-
ws.close(1000)
529-
} catch (err) {
530-
logger.debug(`ws.close threw: ${(err as Error).message}`)
531-
}
532-
533-
try {
534-
await Promise.race([closed, sleep(gracefulMs)])
535-
} catch (err) {
536-
logger.debug(`waiting for close failed: ${(err as Error).message}`)
537-
}
538-
const state: number = ws.readyState
539-
if (state !== WebSocket.CLOSED) {
540-
try {
541-
ws.terminate()
542-
} catch (err) {
543-
logger.debug(`ws.terminate threw: ${(err as Error).message}`)
544-
}
545-
}
546-
}

0 commit comments

Comments
 (0)