Skip to content

Commit 49b6f01

Browse files
authored
Merge branch 'main' into kloet/unit-test-template
2 parents 2d5b7f7 + 225d006 commit 49b6f01

3 files changed

Lines changed: 83 additions & 8 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@chainlink/external-adapter-framework",
3-
"version": "2.14.2",
3+
"version": "2.14.3",
44
"main": "dist/index.js",
55
"license": "MIT",
66
"repository": "git://github.com/smartcontractkit/ea-framework-js.git",

src/transports/websocket.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,6 @@ export class WebSocketTransport<
426426
timeSinceLastActivity > 0 &&
427427
timeSinceLastActivity > context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL
428428

429-
let connectionClosed = this.connectionClosed()
430429
logger.trace(`WS conn staleness info:
431430
now: ${now} |
432431
timeSinceLastMessage: ${timeSinceLastMessage} |
@@ -460,7 +459,7 @@ export class WebSocketTransport<
460459
const urlChanged = this.currentUrl !== urlFromConfig
461460

462461
// Check if we should close the current connection
463-
if (!connectionClosed && (urlChanged || connectionUnresponsive)) {
462+
if (!this.connectionClosed() && (urlChanged || connectionUnresponsive)) {
464463
if (urlChanged) {
465464
logger.info('Websocket URL has changed, closing connection to reconnect...')
466465
censorLogs(() =>
@@ -484,7 +483,7 @@ export class WebSocketTransport<
484483
await sleep(1000 - timeSinceConnectionOpened)
485484
}
486485
this.wsConnection?.close(1000)
487-
connectionClosed = true
486+
this.wsConnection = undefined
488487

489488
if (subscriptions.desired.length) {
490489
// Clear subscription metrics for all active subscriptions
@@ -500,7 +499,7 @@ export class WebSocketTransport<
500499
}
501500

502501
// Check if we need to open a new connection
503-
if (connectionClosed && subscriptions.desired.length) {
502+
if (this.connectionClosed() && subscriptions.desired.length) {
504503
logger.debug('No established connection and new subscriptions available, connecting to WS')
505504
const options =
506505
this.config.options && (await this.config.options(context, subscriptions.desired))
@@ -518,14 +517,13 @@ export class WebSocketTransport<
518517
this.wsConnection = await this.establishWsConnection(context, urlFromConfig, options)
519518

520519
// Now that we successfully opened the connection, we can reset the variables
521-
connectionClosed = false
522520
this.connectionOpenedAt = Date.now()
523521
}
524522

525523
// Send messages only if the connection is open
526524
// Otherwise we could encounter the case where we just closed the connection because there's no desired ones,
527525
// but without this check we'd attempt to send out all the unsubscribe messages
528-
if (!connectionClosed) {
526+
if (!this.connectionClosed()) {
529527
logger.debug('Connection is open, sending subs/unsubs if there are any')
530528
const builders = this.config.builders
531529
if (builders) {

test/transports/websocket.test.ts

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ import {
1313
type WebSocketTransportConfig,
1414
} from '../../src/transports'
1515
import { SingleNumberResultResponse, sleep } from '../../src/util'
16-
import { mockWebSocketProvider, runAllUntilTime, TestAdapter } from '../../src/util/testing-utils'
16+
import {
17+
mockWebSocketProvider,
18+
runAllUntil,
19+
runAllUntilTime,
20+
TestAdapter,
21+
} from '../../src/util/testing-utils'
1722
import { InputParameters } from '../../src/validation'
1823

1924
export const test = untypedTest as TestFn<{
@@ -487,6 +492,78 @@ test.serial('reconnects if provider stops sending expected messages', async (t)
487492
await t.context.clock.runToLastAsync()
488493
})
489494

495+
test.serial('should not inflate failover metric while connection is closed', async (t) => {
496+
const base = 'ETH'
497+
const quote = 'DOGE'
498+
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 120_000
499+
process.env['METRICS_ENABLED'] = 'true'
500+
eaMetrics.clear()
501+
502+
// Mock WS
503+
mockWebSocketProvider(WebSocketClassProvider)
504+
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
505+
let connectionCounter = 0
506+
507+
mockWsServer.on('connection', (socket) => {
508+
let counter = 0
509+
const parseMessage = () => {
510+
if (counter++ === 0) {
511+
socket.send(JSON.stringify({ error: '' }))
512+
}
513+
}
514+
connectionCounter++
515+
socket.on('message', parseMessage)
516+
})
517+
518+
const adapter = createAdapter({
519+
WS_SUBSCRIPTION_TTL: 30_000,
520+
WS_SUBSCRIPTION_UNRESPONSIVE_TTL,
521+
})
522+
523+
const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)
524+
525+
const error = await testAdapter.request({
526+
base,
527+
quote,
528+
})
529+
t.is(error.statusCode, 504)
530+
531+
await runAllUntil(t.context.clock, () => connectionCounter >= 1)
532+
t.is(connectionCounter, 1)
533+
534+
await runAllUntilTime(t.context.clock, WS_SUBSCRIPTION_UNRESPONSIVE_TTL + 1000)
535+
536+
// The connection was closed because it was unresponsive but it wasn't
537+
// reopened because all the subscriptions expired.
538+
t.is(connectionCounter, 1)
539+
;(await testAdapter.getMetrics()).assert(t, {
540+
name: 'ws_connection_failover_count',
541+
labels: {
542+
transport_name: 'default_single_transport',
543+
url: ENDPOINT_URL,
544+
},
545+
expectedValue: 1,
546+
})
547+
548+
// Even after waiting a long time, the connection counter and failover metric
549+
// should remain the same.
550+
await runAllUntilTime(t.context.clock, 2 * WS_SUBSCRIPTION_UNRESPONSIVE_TTL)
551+
t.is(connectionCounter, 1)
552+
;(await testAdapter.getMetrics()).assert(t, {
553+
name: 'ws_connection_failover_count',
554+
labels: {
555+
transport_name: 'default_single_transport',
556+
url: ENDPOINT_URL,
557+
},
558+
expectedValue: 1,
559+
})
560+
561+
testAdapter.api.close()
562+
mockWsServer.close()
563+
await t.context.clock.runToLastAsync()
564+
process.env['METRICS_ENABLED'] = 'false'
565+
})
566+
490567
test.serial('resubscribes after reconnection if server closes connection', async (t) => {
491568
const base = 'ETH'
492569
const quote = 'DOGE'

0 commit comments

Comments
 (0)