Skip to content

Commit f951302

Browse files
committed
Stop sending undefine ws message + pass through raw endpoint name
1 parent 2b2f838 commit f951302

5 files changed

Lines changed: 123 additions & 3 deletions

File tree

src/transports/websocket.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,10 +533,14 @@ export class WebSocketTransport<
533533
await this.sendMessages(
534534
context,
535535
subscribeMessage
536-
? subscriptions.new.map((sub) => subscribeMessage(sub, context))
536+
? subscriptions.new
537+
.map((sub) => subscribeMessage(sub, context))
538+
.filter((m) => m !== undefined)
537539
: subscriptions.new,
538540
unsubscribeMessage
539-
? subscriptions.stale.map((sub) => unsubscribeMessage(sub, context))
541+
? subscriptions.stale
542+
.map((sub) => unsubscribeMessage(sub, context))
543+
.filter((m) => m !== undefined)
540544
: subscriptions.stale,
541545
)
542546
recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale)

src/util/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ export type AdapterRequestContext<T> = {
3232
/** Name of the endpoint this payload should be directed to */
3333
endpointName: string
3434

35+
/** Name of the endpoint from input before aliasing */
36+
rawEndpointName: string
37+
3538
/** Name of the transport this payload should be directed to */
3639
transportName: string
3740

src/validation/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export const validatorMiddleware: AdapterMiddlewareBuilder =
7070
cacheKey: '',
7171
data: validatedData,
7272
endpointName: endpoint.name,
73+
rawEndpointName: endpointParam,
7374
} as AdapterRequestContext<TypeFromDefinition<InputParametersDefinition>>
7475

7576
// We do it afterwards so the custom routers can have a request with a requestContext fulfilled (sans transportName, ofc)
@@ -150,6 +151,7 @@ export const errorCatchingMiddleware = (err: Error, req: FastifyRequest, res: Fa
150151
cacheKey: errorLabel,
151152
data: undefined,
152153
endpointName: errorLabel,
154+
rawEndpointName: errorLabel,
153155
transportName: errorLabel,
154156
meta: { error: err },
155157
}

test/transports/websocket.test.ts

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
WebSocketClassProvider,
1111
WebsocketReverseMappingTransport,
1212
WebSocketTransport,
13+
type WebSocketTransportConfig,
1314
} from '../../src/transports'
1415
import { SingleNumberResultResponse, sleep } from '../../src/util'
1516
import { mockWebSocketProvider, runAllUntilTime, TestAdapter } from '../../src/util/testing-utils'
@@ -65,6 +66,7 @@ const createAdapter = (
6566
context: EndpointContext<WebSocketTypes>,
6667
) => Promise<void> | void,
6768
pongHandler?: (connection: WebSocket, data: Buffer) => void,
69+
buildersOverride?: WebSocketTransportConfig<WebSocketTypes>['builders'],
6870
): Adapter => {
6971
const websocketTransport = new WebSocketTransport<WebSocketTypes>({
7072
url: () => ENDPOINT_URL,
@@ -99,7 +101,7 @@ const createAdapter = (
99101
heartbeat: heartbeatHandler,
100102
pong: pongHandler,
101103
},
102-
builders: {
104+
builders: buildersOverride ?? {
103105
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
104106
unsubscribeMessage: (params) => ({
105107
request: 'unsubscribe',
@@ -200,6 +202,81 @@ test.serial('connects to websocket, subscribes, gets message, unsubscribes', asy
200202
await t.context.clock.runToLastAsync()
201203
})
202204

205+
test.serial('filters out undefined subscribe and unsubscribe messages from builders', async (t) => {
206+
mockWebSocketProvider(WebSocketClassProvider)
207+
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
208+
const messagesReceived: string[] = []
209+
210+
mockWsServer.on('connection', (socket) => {
211+
socket.on('message', (data) => {
212+
messagesReceived.push(typeof data === 'string' ? data : data.toString())
213+
socket.send(
214+
JSON.stringify({
215+
pair: `ETH/DOGE`,
216+
value: price,
217+
}),
218+
)
219+
})
220+
})
221+
222+
const adapter = createAdapter(
223+
{
224+
WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000,
225+
WS_SUBSCRIPTION_TTL: 60_000,
226+
},
227+
undefined,
228+
undefined,
229+
{
230+
subscribeMessage: (params) => {
231+
return params.base === 'SKIP' ? undefined : `S:${params.base}/${params.quote}`
232+
},
233+
unsubscribeMessage: (params) => {
234+
return params.quote === 'SKIPUNSUB' ? undefined : `U:${params.base}/${params.quote}`
235+
},
236+
},
237+
)
238+
239+
const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)
240+
241+
// ETH first so the socket gets a subscribe + provider message before SKIP (no subscribe payload).
242+
await testAdapter.request({ base: 'ETH', quote: 'DOGE' })
243+
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 200)
244+
await testAdapter.request({ base: 'SKIP', quote: 'DOGE' })
245+
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 200)
246+
await testAdapter.request({ base: 'ETH', quote: 'SKIPUNSUB' })
247+
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 200)
248+
249+
t.false(
250+
messagesReceived.includes('S:SKIP/DOGE'),
251+
'no subscribe payload for feeds where subscribeMessage returns undefined',
252+
)
253+
t.false(
254+
messagesReceived.some((m) => m === 'undefined'),
255+
'undefined must not be serialized and sent',
256+
)
257+
t.true(messagesReceived.includes('S:ETH/DOGE'))
258+
t.true(messagesReceived.includes('S:ETH/SKIPUNSUB'))
259+
260+
await runAllUntilTime(
261+
t.context.clock,
262+
adapter.config.settings.WS_SUBSCRIPTION_TTL + BACKGROUND_EXECUTE_MS_WS * 3 + 100,
263+
)
264+
265+
const unsubscribePayloads = messagesReceived.filter((m) => m.startsWith('U:'))
266+
t.false(
267+
unsubscribePayloads.some((m) => m === 'U:ETH/SKIPUNSUB'),
268+
'unsubscribe builder returned undefined for SKIPUNSUB quote — must not send that payload',
269+
)
270+
t.true(
271+
unsubscribePayloads.includes('U:ETH/DOGE'),
272+
'defined unsubscribe payloads are still sent for stale subs that map to a message',
273+
)
274+
275+
testAdapter.api.close()
276+
mockWsServer.close()
277+
await t.context.clock.runToLastAsync()
278+
})
279+
203280
test.serial('reconnects when url changed', async (t) => {
204281
// Mock WS
205282
mockWebSocketProvider(WebSocketClassProvider)

test/validation.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,40 @@ test.serial('examples in input parameters', async (t) => {
837837
])
838838
})
839839

840+
test.serial('requestContext has correct endpointName and rawEndpointName', async (t) => {
841+
let capturedContext: { endpointName: string; rawEndpointName: string } | undefined
842+
843+
const adapter = new Adapter({
844+
name: 'TEST',
845+
endpoints: [
846+
new AdapterEndpoint({
847+
name: 'test',
848+
aliases: ['test-alias'],
849+
transport: new (class extends NopTransport {
850+
override async foregroundExecute(req: AdapterRequest<EmptyInputParameters>) {
851+
capturedContext = {
852+
endpointName: req.requestContext.endpointName,
853+
rawEndpointName: req.requestContext.rawEndpointName,
854+
}
855+
}
856+
})(),
857+
}),
858+
],
859+
})
860+
861+
const testAdapter = await TestAdapter.start(adapter, t.context)
862+
863+
// When using the canonical name, both fields should match
864+
await testAdapter.request({ endpoint: 'test' })
865+
t.is(capturedContext?.endpointName, 'test')
866+
t.is(capturedContext?.rawEndpointName, 'test')
867+
868+
// When using an alias, endpointName is the canonical name and rawEndpointName is the alias
869+
await testAdapter.request({ endpoint: 'test-alias' })
870+
t.is(capturedContext?.endpointName, 'test')
871+
t.is(capturedContext?.rawEndpointName, 'test-alias')
872+
})
873+
840874
test.serial('limit size of input parameters', async (t) => {
841875
process.env['MAX_PAYLOAD_SIZE_LIMIT'] = '1048576'
842876

0 commit comments

Comments
 (0)