Skip to content

Commit 7536f42

Browse files
committed
Add WS Bad Closure Failover Mechanism
1 parent 91d12b9 commit 7536f42

2 files changed

Lines changed: 243 additions & 12 deletions

File tree

src/transports/websocket.ts

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -289,20 +289,27 @@ export class WebSocketTransport<
289289

290290
// Called when the WS connection closes for any reason
291291
close: (event: WebSocket.CloseEvent) => {
292-
// If the connection closed with 1000, it's a usual closure
293-
const level = event.code === 1000 ? 'debug' : 'info'
294-
logger[level](
295-
`Closed websocket connection. Code: ${event.code} ; reason: ${event.reason?.toString()}`,
296-
)
292+
const filteredUrl = this.currentUrl.split('?')[0]
293+
const isAbnormal = event.code !== 1000
297294

298-
// Record active ws connections by decrementing count on close
299-
// Using URL in label since connection_key is removed from v3
300-
metrics.get('wsConnectionActive').dec()
295+
if (isAbnormal) {
296+
this.streamHandlerInvocationsWithNoConnection += 1
297+
logger.warn(
298+
`WebSocket closed abnormally (code: ${event.code}, reason: ${event.reason?.toString() || 'none'}). ` +
299+
`Failover counter incremented to ${this.streamHandlerInvocationsWithNoConnection}. ` +
300+
`URL: ${filteredUrl}`,
301+
)
302+
metrics
303+
.get('wsConnectionFailoverCount')
304+
.labels({ transport_name: this.name, url: filteredUrl })
305+
.set(this.streamHandlerInvocationsWithNoConnection)
306+
} else {
307+
logger.debug(
308+
`WebSocket closed normally (code: ${event.code}). URL: ${filteredUrl}`,
309+
)
310+
}
301311

302-
// Also, register that the connection was closed and the reason why.
303-
// We need to filter out query params from the URL to avoid having
304-
// the cardinality of the metric go out of control.
305-
const filteredUrl = this.currentUrl.split('?')[0]
312+
metrics.get('wsConnectionActive').dec()
306313
metrics.get('wsConnectionClosures').inc({
307314
code: event.code,
308315
url: filteredUrl,

test/transports/websocket.test.ts

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,3 +1312,227 @@ test.serial('does not heartbeat when handler throws an error', async (t) => {
13121312
mockWsServer.close()
13131313
await t.context.clock.runToLastAsync()
13141314
})
1315+
1316+
test.serial(
1317+
'increments failover counter on abnormal closure and passes it to url function',
1318+
async (t) => {
1319+
const base = 'ETH'
1320+
const quote = 'DOGE'
1321+
process.env['METRICS_ENABLED'] = 'true'
1322+
eaMetrics.clear()
1323+
1324+
const counterValues: number[] = []
1325+
1326+
mockWebSocketProvider(WebSocketClassProvider)
1327+
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
1328+
mockWsServer.on('connection', (socket) => {
1329+
socket.on('message', () => {
1330+
socket.close({ code: 4000, reason: 'Simulated abnormal closure', wasClean: false })
1331+
})
1332+
})
1333+
1334+
const transport = new WebSocketTransport<WebSocketTypes>({
1335+
url: (_context, _desiredSubs, params) => {
1336+
counterValues.push(params.streamHandlerInvocationsWithNoConnection)
1337+
return ENDPOINT_URL
1338+
},
1339+
handlers: {
1340+
message(message) {
1341+
if (!message.pair) {
1342+
return []
1343+
}
1344+
const [curBase, curQuote] = message.pair.split('/')
1345+
return [
1346+
{
1347+
params: { base: curBase, quote: curQuote },
1348+
response: {
1349+
data: {
1350+
result: message.value,
1351+
},
1352+
result: message.value,
1353+
},
1354+
},
1355+
]
1356+
},
1357+
},
1358+
builders: {
1359+
subscribeMessage: (params) => ({
1360+
request: 'subscribe',
1361+
pair: `${params.base}/${params.quote}`,
1362+
}),
1363+
unsubscribeMessage: (params) => ({
1364+
request: 'unsubscribe',
1365+
pair: `${params.base}/${params.quote}`,
1366+
}),
1367+
},
1368+
})
1369+
1370+
const webSocketEndpoint = new AdapterEndpoint({
1371+
name: 'TEST',
1372+
transport: transport,
1373+
inputParameters,
1374+
})
1375+
1376+
const config = new AdapterConfig(
1377+
{},
1378+
{
1379+
envDefaultOverrides: {
1380+
BACKGROUND_EXECUTE_MS_WS,
1381+
WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000,
1382+
},
1383+
},
1384+
)
1385+
1386+
const adapter = new Adapter({
1387+
name: 'TEST',
1388+
defaultEndpoint: 'test',
1389+
config,
1390+
endpoints: [webSocketEndpoint],
1391+
})
1392+
1393+
const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)
1394+
1395+
await testAdapter.request({ base, quote })
1396+
1397+
// Each cycle: connect -> subscribe -> server closes with 4000 -> counter increments
1398+
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 5 + 100)
1399+
1400+
// The counter should be strictly increasing due to abnormal close incrementing it
1401+
t.true(counterValues.length >= 3, `Expected at least 3 url calls, got ${counterValues.length}`)
1402+
1403+
for (let i = 1; i < counterValues.length; i++) {
1404+
t.true(
1405+
counterValues[i] > counterValues[i - 1],
1406+
`Counter should increase: index ${i} (${counterValues[i]}) should be > index ${i - 1} (${counterValues[i - 1]})`,
1407+
)
1408+
}
1409+
1410+
process.env['METRICS_ENABLED'] = 'false'
1411+
await testAdapter.api.close()
1412+
mockWsServer.close()
1413+
await t.context.clock.runToLastAsync()
1414+
},
1415+
)
1416+
1417+
test.serial(
1418+
'cycles between primary and secondary URLs on abnormal closure',
1419+
async (t) => {
1420+
const base = 'ETH'
1421+
const quote = 'DOGE'
1422+
1423+
const PRIMARY_URL = 'wss://primary.test.com/ws'
1424+
const SECONDARY_URL = 'wss://secondary.test.com/ws'
1425+
const urlsConnected: string[] = []
1426+
1427+
mockWebSocketProvider(WebSocketClassProvider)
1428+
1429+
const mockPrimary = new Server(PRIMARY_URL, { mock: false })
1430+
mockPrimary.on('connection', (socket) => {
1431+
socket.on('message', () => {
1432+
socket.close({ code: 4000, reason: 'Primary abnormal close', wasClean: false })
1433+
})
1434+
})
1435+
1436+
const mockSecondary = new Server(SECONDARY_URL, { mock: false })
1437+
mockSecondary.on('connection', (socket) => {
1438+
socket.on('message', () => {
1439+
socket.close({ code: 4001, reason: 'Secondary abnormal close', wasClean: false })
1440+
})
1441+
})
1442+
1443+
// Mimics Tiingo's wsSelectUrl with a 1:1 primary:secondary ratio
1444+
const transport = new WebSocketTransport<WebSocketTypes>({
1445+
url: (_context, _desiredSubs, params) => {
1446+
const counter = params.streamHandlerInvocationsWithNoConnection
1447+
const zeroIndexed = counter - 1
1448+
const cyclePos = zeroIndexed % 2
1449+
const url = cyclePos < 1 ? PRIMARY_URL : SECONDARY_URL
1450+
urlsConnected.push(url)
1451+
return url
1452+
},
1453+
handlers: {
1454+
message(message) {
1455+
if (!message.pair) {
1456+
return []
1457+
}
1458+
const [curBase, curQuote] = message.pair.split('/')
1459+
return [
1460+
{
1461+
params: { base: curBase, quote: curQuote },
1462+
response: {
1463+
data: { result: message.value },
1464+
result: message.value,
1465+
},
1466+
},
1467+
]
1468+
},
1469+
},
1470+
builders: {
1471+
subscribeMessage: (params) => ({
1472+
request: 'subscribe',
1473+
pair: `${params.base}/${params.quote}`,
1474+
}),
1475+
unsubscribeMessage: (params) => ({
1476+
request: 'unsubscribe',
1477+
pair: `${params.base}/${params.quote}`,
1478+
}),
1479+
},
1480+
})
1481+
1482+
const webSocketEndpoint = new AdapterEndpoint({
1483+
name: 'TEST',
1484+
transport: transport,
1485+
inputParameters,
1486+
})
1487+
1488+
const config = new AdapterConfig(
1489+
{},
1490+
{
1491+
envDefaultOverrides: {
1492+
BACKGROUND_EXECUTE_MS_WS,
1493+
WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000,
1494+
},
1495+
},
1496+
)
1497+
1498+
const adapter = new Adapter({
1499+
name: 'TEST',
1500+
defaultEndpoint: 'test',
1501+
config,
1502+
endpoints: [webSocketEndpoint],
1503+
})
1504+
1505+
const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)
1506+
1507+
await testAdapter.request({ base, quote })
1508+
1509+
// Run through enough cycles to see URL cycling:
1510+
// counter=0 -> primary, counter=1 -> primary, counter=2 -> secondary,
1511+
// counter=3 -> primary, counter=4 -> secondary, ...
1512+
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 6 + 100)
1513+
1514+
const primaryCount = urlsConnected.filter((u) => u === PRIMARY_URL).length
1515+
const secondaryCount = urlsConnected.filter((u) => u === SECONDARY_URL).length
1516+
1517+
t.true(primaryCount >= 2, `Expected at least 2 primary connections, got ${primaryCount}`)
1518+
t.true(secondaryCount >= 1, `Expected at least 1 secondary connection, got ${secondaryCount}`)
1519+
1520+
// After hitting secondary, it should cycle back to primary
1521+
const firstSecondaryIndex = urlsConnected.indexOf(SECONDARY_URL)
1522+
t.true(firstSecondaryIndex >= 0, 'Should have connected to secondary')
1523+
t.true(
1524+
firstSecondaryIndex < urlsConnected.length - 1,
1525+
'Secondary should not be the last connection (should have returned to primary)',
1526+
)
1527+
t.is(
1528+
urlsConnected[firstSecondaryIndex + 1],
1529+
PRIMARY_URL,
1530+
'After secondary, should reconnect to primary',
1531+
)
1532+
1533+
await testAdapter.api.close()
1534+
mockPrimary.close()
1535+
mockSecondary.close()
1536+
await t.context.clock.runToLastAsync()
1537+
},
1538+
)

0 commit comments

Comments
 (0)