Skip to content

Commit fbbd4ca

Browse files
authored
Add WS Bad Closure Failover Mechanism (#705)
* Add WS Bad Closure Failover Mechanism * Add more test cases * lint * Make setter function for failover counter
1 parent 91d12b9 commit fbbd4ca

File tree

2 files changed

+430
-19
lines changed

2 files changed

+430
-19
lines changed

src/transports/websocket.ts

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,18 @@ export class WebSocketTransport<
187187
return !this.wsConnection || this.wsConnection.readyState === WebSocket.CLOSED
188188
}
189189

190+
/**
191+
* Increments the failover counter and updates the wsConnectionFailoverCount metric.
192+
* Both operations must always happen together, so they are encapsulated here.
193+
*/
194+
private incrementFailoverCounter(filteredUrl: string): void {
195+
this.streamHandlerInvocationsWithNoConnection += 1
196+
metrics
197+
.get('wsConnectionFailoverCount')
198+
.labels({ transport_name: this.name, url: filteredUrl })
199+
.set(this.streamHandlerInvocationsWithNoConnection)
200+
}
201+
190202
serializeMessage(payload: unknown): string {
191203
return typeof payload === 'string' ? payload : JSON.stringify(payload)
192204
}
@@ -289,20 +301,24 @@ export class WebSocketTransport<
289301

290302
// Called when the WS connection closes for any reason
291303
close: (event: WebSocket.CloseEvent) => {
292-
// If the connection closed with 1000, it's a usual closure
293-
const level = event.code === 1000 ? 'debug' : 'info'
294-
logger[level](
295-
`Closed websocket connection. Code: ${event.code} ; reason: ${event.reason?.toString()}`,
296-
)
297-
298-
// Record active ws connections by decrementing count on close
299-
// Using URL in label since connection_key is removed from v3
300-
metrics.get('wsConnectionActive').dec()
301-
302-
// Also, register that the connection was closed and the reason why.
303304
// We need to filter out query params from the URL to avoid having
304305
// the cardinality of the metric go out of control.
305306
const filteredUrl = this.currentUrl.split('?')[0]
307+
// If the connection closed with 1000, it's a usual closure
308+
const isAbnormal = event.code !== 1000
309+
310+
if (isAbnormal) {
311+
this.incrementFailoverCounter(filteredUrl)
312+
logger.warn(
313+
`WebSocket closed abnormally (code: ${event.code}, reason: ${event.reason?.toString() || 'none'}). ` +
314+
`Failover counter incremented to ${this.streamHandlerInvocationsWithNoConnection}. ` +
315+
`URL: ${filteredUrl}`,
316+
)
317+
} else {
318+
logger.debug(`WebSocket closed normally (code: ${event.code}). URL: ${filteredUrl}`)
319+
}
320+
321+
metrics.get('wsConnectionActive').dec()
306322
metrics.get('wsConnectionClosures').inc({
307323
code: event.code,
308324
url: filteredUrl,
@@ -413,17 +429,13 @@ export class WebSocketTransport<
413429
// WS_SUBSCRIPTION_UNRESPONSIVE_TTL. There is interplay with WS_SUBSCRIPTION_TTL
414430
// to determine minimum TTL of an open connection given no explicit connection errors.
415431
if (connectionUnresponsive) {
416-
this.streamHandlerInvocationsWithNoConnection += 1
417-
logger.info(
418-
`The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`,
419-
)
420432
// Filter out query params from the URL to avoid leaking sensitive data
421433
// and prevent metric cardinality explosion
422434
const filteredUrl = this.currentUrl.split('?')[0]
423-
metrics
424-
.get('wsConnectionFailoverCount')
425-
.labels({ transport_name: this.name, url: filteredUrl })
426-
.set(this.streamHandlerInvocationsWithNoConnection)
435+
this.incrementFailoverCounter(filteredUrl)
436+
logger.info(
437+
`The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`,
438+
)
427439
}
428440

429441
// We want to check if the URL we calculate is different from the one currently connected.

0 commit comments

Comments
 (0)