Skip to content

Commit f82d353

Browse files
committed
Add websocket pong handler
1 parent d7587db commit f82d353

4 files changed

Lines changed: 138 additions & 0 deletions

File tree

docs/components/transport-types/websocket-transport.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,23 @@ The heartbeat interval is controlled by the `WS_HEARTBEAT_INTERVAL_MS` adapter s
158158

159159
**Note:** The heartbeat only starts if the `heartbeat` handler is provided. If you don't need heartbeat functionality, simply omit the `heartbeat` handler.
160160

161+
### Pong handler (optional)
162+
163+
When using WebSocket protocol-level pings (via `connection.ping()`), the server will respond with a `pong` frame. The `pong` handler lets you react to those responses — for example, to track latency, log liveness confirmations, or reset a timeout.
164+
165+
```typescript
166+
handlers: {
167+
heartbeat: (connection) => {
168+
connection.ping()
169+
},
170+
pong: (connection, data) => {
171+
logger.debug('Received pong from server', { data: data.toString() })
172+
},
173+
}
174+
```
175+
176+
The `pong` handler receives the WebSocket connection and a `Buffer` containing any data the server included in the pong frame. It is optional — if omitted, pong frames are silently ignored.
177+
161178
### Retrieving and storing the response or errors
162179

163180
As shown in the first example, the **handlers** object accepts a function called **message** which will be executed when Data Provider sends a message through the WS connection. It takes this message as its first argument and the adapter context as the second, and should build and return a list of response objects (_ProviderResult_) that will be stored in the response cache for the endpoint.

src/transports/websocket.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ export interface WebSocketTransportConfig<T extends WebsocketTransportGenerics>
8282
*/
8383
heartbeat?: (wsConnection: WebSocket, context: EndpointContext<T>) => Promise<void> | void
8484

85+
/**
86+
* Handles when the WS receives a pong
87+
*
88+
* @param wsConnection - the WebSocket with an established connection
89+
* @param data - the data received by the WS
90+
* @returns void
91+
*/
92+
pong?: (wsConnection: WebSocket, data: Buffer) => void
93+
8594
/**
8695
* Handles when the websocket connection dispatches an error event
8796
* Optional to let the adapter handle the event in its own way if it decides to
@@ -371,6 +380,9 @@ export class WebSocketTransport<
371380
)
372381
connection.addEventListener('error', handlers.error)
373382
connection.addEventListener('close', handlers.close)
383+
if (this.config.handlers.pong) {
384+
connection.on('pong', (data) => this.config.handlers.pong?.(connection, data))
385+
}
374386

375387
return connection
376388
}

src/util/testing-utils.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,9 +491,13 @@ export function setEnvVariables(envVariables: NodeJS.ProcessEnv): void {
491491
export const mockWebSocketProvider = (provider: typeof WebSocketClassProvider): void => {
492492
// Extend mock WebSocket class to bypass protocol headers error
493493
class MockWebSocket extends WebSocket {
494+
// Separate listener map for ws-style .on()/.emit() (e.g. 'pong'), which mock-socket doesn't support
495+
private wsListeners: Map<string, ((...args: unknown[]) => void)[]> = new Map()
496+
494497
constructor(url: string, protocol: string | string[] | Record<string, string> | undefined) {
495498
super(url, protocol instanceof Object ? undefined : protocol)
496499
}
500+
497501
// This is part of the 'ws' node library but not the common interface, but it's used in our WS transport
498502
removeAllListeners() {
499503
for (const eventType in this.listeners) {
@@ -502,6 +506,22 @@ export const mockWebSocketProvider = (provider: typeof WebSocketClassProvider):
502506
delete this.listeners[eventType]
503507
}
504508
}
509+
this.wsListeners.clear()
510+
}
511+
512+
// Ws-style EventEmitter API used for protocol-level events like 'pong'
513+
on(event: string, listener: (...args: unknown[]) => void) {
514+
const existing = this.wsListeners.get(event) ?? []
515+
this.wsListeners.set(event, [...existing, listener])
516+
return this
517+
}
518+
519+
emit(event: string, ...args: unknown[]): boolean {
520+
const listeners = this.wsListeners.get(event) ?? []
521+
for (const l of listeners) {
522+
l(...args)
523+
}
524+
return listeners.length > 0
505525
}
506526
}
507527

test/transports/websocket.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ const createAdapter = (
6464
connection: WebSocket,
6565
context: EndpointContext<WebSocketTypes>,
6666
) => Promise<void> | void,
67+
pongHandler?: (connection: WebSocket, data: Buffer) => void,
6768
): Adapter => {
6869
const websocketTransport = new WebSocketTransport<WebSocketTypes>({
6970
url: () => ENDPOINT_URL,
@@ -96,6 +97,7 @@ const createAdapter = (
9697
]
9798
},
9899
heartbeat: heartbeatHandler,
100+
pong: pongHandler,
99101
},
100102
builders: {
101103
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
@@ -1313,6 +1315,93 @@ test.serial('does not heartbeat when handler throws an error', async (t) => {
13131315
await t.context.clock.runToLastAsync()
13141316
})
13151317

1318+
test.serial('calls pong handler when a pong frame is received', async (t) => {
1319+
const base = 'ETH'
1320+
const quote = 'DOGE'
1321+
1322+
mockWebSocketProvider(WebSocketClassProvider)
1323+
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
1324+
let pongCallCount = 0
1325+
1326+
mockWsServer.on('connection', (socket) => {
1327+
socket.on('message', () => {
1328+
socket.send(
1329+
JSON.stringify({
1330+
pair: `${base}/${quote}`,
1331+
value: price,
1332+
}),
1333+
)
1334+
})
1335+
})
1336+
1337+
const websocketTransport = new WebSocketTransport<WebSocketTypes>({
1338+
url: () => ENDPOINT_URL,
1339+
options: () => ({ headers: { 'x-auth-token': 'token' } }),
1340+
handlers: {
1341+
message(message) {
1342+
if (!message.pair) {
1343+
return []
1344+
}
1345+
const [curBase, curQuote] = message.pair.split('/')
1346+
return [
1347+
{
1348+
params: { base: curBase, quote: curQuote },
1349+
response: {
1350+
data: { result: message.value },
1351+
result: message.value,
1352+
timestamps: { providerIndicatedTimeUnixMs: Date.now() },
1353+
},
1354+
},
1355+
]
1356+
},
1357+
pong: () => {
1358+
pongCallCount++
1359+
},
1360+
},
1361+
builders: {
1362+
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
1363+
unsubscribeMessage: (params) => ({
1364+
request: 'unsubscribe',
1365+
pair: `${params.base}/${params.quote}`,
1366+
}),
1367+
},
1368+
})
1369+
1370+
const webSocketEndpoint = new AdapterEndpoint({
1371+
name: 'TEST',
1372+
transport: websocketTransport,
1373+
inputParameters,
1374+
})
1375+
1376+
const config = new AdapterConfig({}, { envDefaultOverrides: { BACKGROUND_EXECUTE_MS_WS } })
1377+
1378+
const adapter = new Adapter({
1379+
name: 'TEST',
1380+
defaultEndpoint: 'test',
1381+
endpoints: [webSocketEndpoint],
1382+
config,
1383+
})
1384+
1385+
const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)
1386+
1387+
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
1388+
requestData: { base, quote },
1389+
expectedResponse: {
1390+
data: { result: price },
1391+
result: price,
1392+
statusCode: 200,
1393+
},
1394+
})
1395+
1396+
websocketTransport.wsConnection?.emit('pong')
1397+
1398+
t.is(pongCallCount, 1)
1399+
1400+
testAdapter.api.close()
1401+
mockWsServer.close()
1402+
await t.context.clock.runToLastAsync()
1403+
})
1404+
13161405
test.serial(
13171406
'increments failover counter on abnormal closure and passes it to url function',
13181407
async (t) => {

0 commit comments

Comments
 (0)