diff --git a/.changeset/floppy-spiders-turn.md b/.changeset/floppy-spiders-turn.md new file mode 100644 index 00000000000..3100c9f13c3 --- /dev/null +++ b/.changeset/floppy-spiders-turn.md @@ -0,0 +1,5 @@ +--- +'@chainlink/blocksize-capital-adapter': minor +--- + +Optimize blocksize-capital WS subscribe/unsubscribe via customSubscriptionMessages batching diff --git a/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts b/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts index e4cb24c06b1..8456750578e 100644 --- a/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts +++ b/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts @@ -1,11 +1,13 @@ -import { BaseEndpointTypes } from '../endpoint/crypto-lwba' +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { SubscriptionDeltas } from '@chainlink/external-adapter-framework/transports/abstract/streaming' import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket' import { makeLogger, ProviderResult } from '@chainlink/external-adapter-framework/util' +import { BaseEndpointTypes } from '../endpoint/crypto-lwba' import { BaseMessage, - blocksizeDefaultUnsubscribeMessageBuilder, blocksizeDefaultWebsocketOpenHandler, buildBlocksizeWebsocketTickersMessage, + buildTicker, } from './utils' const logger = makeLogger('BlocksizeCapitalLwbaWebsocketEndpoint') @@ -77,12 +79,33 @@ export const transport: WebsocketReverseMappingTransport { - const pair = `${params.base}${params.quote}`.toUpperCase() - transport.setReverseMapping(pair, params) - return buildBlocksizeWebsocketTickersMessage('bidask_subscribe', pair) + customSubscriptionMessages: ( + _context: EndpointContext, + subscriptions: SubscriptionDeltas<{ quote: string; base: string }>, + ) => { + const messages = [] + if (subscriptions.new.length > 0) { + const pairsWithParams = subscriptions.new.map((param) => ({ + ticker: buildTicker(param), + param: param, + })) + pairsWithParams.forEach(({ ticker, param }) => transport.setReverseMapping(ticker, param)) + messages.push( + buildBlocksizeWebsocketTickersMessage( + 'bidask_subscribe', + subscriptions.new.map(buildTicker), + ), + ) + } + if (subscriptions.stale.length > 0) { + messages.push( + buildBlocksizeWebsocketTickersMessage( + 'bidask_unsubscribe', + subscriptions.stale.map(buildTicker), + ), + ) + } + return messages }, - unsubscribeMessage: (params) => - blocksizeDefaultUnsubscribeMessageBuilder(params.base, params.quote, 'bidask_unsubscribe'), }, }) diff --git a/packages/sources/blocksize-capital/src/transport/price.ts b/packages/sources/blocksize-capital/src/transport/price.ts index 3da2516a34e..59320f80216 100644 --- a/packages/sources/blocksize-capital/src/transport/price.ts +++ b/packages/sources/blocksize-capital/src/transport/price.ts @@ -1,11 +1,13 @@ -import { BaseEndpointTypes } from '../endpoint/price' -import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { SubscriptionDeltas } from '@chainlink/external-adapter-framework/transports/abstract/streaming' import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket' +import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { BaseEndpointTypes } from '../endpoint/price' import { BaseMessage, - blocksizeDefaultUnsubscribeMessageBuilder, blocksizeDefaultWebsocketOpenHandler, buildBlocksizeWebsocketTickersMessage, + buildTicker, handlePriceUpdates, VwapUpdate, } from './utils' @@ -52,12 +54,33 @@ export const transport: WebsocketReverseMappingTransport { - const pair = `${params.base}${params.quote}`.toUpperCase() - transport.setReverseMapping(pair, params) - return buildBlocksizeWebsocketTickersMessage('vwap_subscribe', pair) + customSubscriptionMessages: ( + _context: EndpointContext, + subscriptions: SubscriptionDeltas<{ quote: string; base: string }>, + ) => { + const messages = [] + if (subscriptions.new.length > 0) { + const pairsWithParams = subscriptions.new.map((param) => ({ + ticker: buildTicker(param), + param: param, + })) + pairsWithParams.forEach(({ ticker, param }) => transport.setReverseMapping(ticker, param)) + messages.push( + buildBlocksizeWebsocketTickersMessage( + 'vwap_subscribe', + subscriptions.new.map(buildTicker), + ), + ) + } + if (subscriptions.stale.length > 0) { + messages.push( + buildBlocksizeWebsocketTickersMessage( + 'vwap_unsubscribe', + subscriptions.stale.map(buildTicker), + ), + ) + } + return messages }, - unsubscribeMessage: (params) => - blocksizeDefaultUnsubscribeMessageBuilder(params.base, params.quote, 'vwap_unsubscribe'), }, }) diff --git a/packages/sources/blocksize-capital/src/transport/utils.ts b/packages/sources/blocksize-capital/src/transport/utils.ts index 3ae153c952f..b964449fa49 100644 --- a/packages/sources/blocksize-capital/src/transport/utils.ts +++ b/packages/sources/blocksize-capital/src/transport/utils.ts @@ -19,31 +19,28 @@ export type VwapUpdate = { ts: number } -export type ProviderParams = { - tickers?: string[] - api_key?: string +export const buildBlocksizeWebsocketAuthMessage = (apiKey: string) => { + return { + jsonrpc: '2.0', + method: 'authentication_logon', + params: { + api_key: apiKey, + }, + } } -const buildBlocksizeWebsocketMessage = (method: string, params: ProviderParams): unknown => { +export const buildBlocksizeWebsocketTickersMessage = (method: string, pairs: string[]) => { return { jsonrpc: '2.0', method: method, - params: params, + params: { + tickers: pairs, + }, } } -export const buildBlocksizeWebsocketAuthMessage = (apiKey: string) => - buildBlocksizeWebsocketMessage('authentication_logon', { api_key: apiKey }) -export const buildBlocksizeWebsocketTickersMessage = (method: string, pair: string) => - buildBlocksizeWebsocketMessage(method, { tickers: [pair] }) - -export const blocksizeDefaultUnsubscribeMessageBuilder = ( - base: string, - quote: string, - method: string, -): unknown => { - const pair = `${base}${quote}`.toUpperCase() - return buildBlocksizeWebsocketTickersMessage(method, pair) +export const buildTicker = (pair: { base: string; quote: string }) => { + return `${pair.base}${pair.quote}`.toUpperCase() } // use as open handler for standard WS connections diff --git a/packages/sources/blocksize-capital/src/transport/vwap.ts b/packages/sources/blocksize-capital/src/transport/vwap.ts index b8e134f6460..55bfa4754cd 100644 --- a/packages/sources/blocksize-capital/src/transport/vwap.ts +++ b/packages/sources/blocksize-capital/src/transport/vwap.ts @@ -1,12 +1,14 @@ -import { BaseEndpointTypes } from '../endpoint/vwap' +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { SubscriptionDeltas } from '@chainlink/external-adapter-framework/transports/abstract/streaming' import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket' import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { BaseEndpointTypes } from '../endpoint/vwap' import { BaseMessage, VwapUpdate, - blocksizeDefaultUnsubscribeMessageBuilder, blocksizeDefaultWebsocketOpenHandler, buildBlocksizeWebsocketTickersMessage, + buildTicker, handlePriceUpdates, } from './utils' @@ -60,16 +62,33 @@ export const transport: WebsocketReverseMappingTransport { - const pair = `${params.base}${params.quote}`.toUpperCase() - transport.setReverseMapping(pair, params) - return buildBlocksizeWebsocketTickersMessage('fixedvwap_subscribe', pair) + customSubscriptionMessages: ( + _context: EndpointContext, + subscriptions: SubscriptionDeltas<{ quote: string; base: string }>, + ) => { + const messages = [] + if (subscriptions.new.length > 0) { + const pairsWithParams = subscriptions.new.map((param) => ({ + ticker: buildTicker(param), + param: param, + })) + pairsWithParams.forEach(({ ticker, param }) => transport.setReverseMapping(ticker, param)) + messages.push( + buildBlocksizeWebsocketTickersMessage( + 'fixedvwap_subscribe', + subscriptions.new.map(buildTicker), + ), + ) + } + if (subscriptions.stale.length > 0) { + messages.push( + buildBlocksizeWebsocketTickersMessage( + 'fixedvwap_unsubscribe', + subscriptions.stale.map(buildTicker), + ), + ) + } + return messages }, - unsubscribeMessage: (params) => - blocksizeDefaultUnsubscribeMessageBuilder( - params.base, - params.quote, - 'fixedvwap_unsubscribe', - ), }, }) diff --git a/packages/sources/blocksize-capital/test/unit/transport-utils.test.ts b/packages/sources/blocksize-capital/test/unit/transport-utils.test.ts new file mode 100644 index 00000000000..645bc4fe934 --- /dev/null +++ b/packages/sources/blocksize-capital/test/unit/transport-utils.test.ts @@ -0,0 +1,175 @@ +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { SubscriptionDeltas } from '@chainlink/external-adapter-framework/transports/abstract/streaming' +import { + WebsocketReverseMappingTransport, + WebsocketTransportGenerics, +} from '@chainlink/external-adapter-framework/transports/websocket' +import { LoggerFactoryProvider } from '@chainlink/external-adapter-framework/util' +import { + transport as lwbaTransport, + WsTransportTypes as LwbaWsTransportTypes, +} from '../../src/transport/crypto-lwba' +import { + transport as priceTransport, + WsTransportTypes as PriceWsTransportTypes, +} from '../../src/transport/price' +import { + transport as vwapTransport, + WsTransportTypes as VwapWsTransportTypes, +} from '../../src/transport/vwap' + +LoggerFactoryProvider.set() + +type TickerParam = { base: string; quote: string } + +type CustomSubscriptionMessagesBuilder = ( + context: EndpointContext, + subscriptions: SubscriptionDeltas, +) => unknown[] + +const getCustomSubscriptionMessages = ( + transport: WebsocketReverseMappingTransport, +): CustomSubscriptionMessagesBuilder => + ( + transport as unknown as { + config: { builders: { customSubscriptionMessages: CustomSubscriptionMessagesBuilder } } + } + ).config.builders.customSubscriptionMessages + +const runCustomSubscriptionMessages = ( + transport: WebsocketReverseMappingTransport, + subscriptions: SubscriptionDeltas, +) => getCustomSubscriptionMessages(transport)({} as EndpointContext, subscriptions) + +const newParams: TickerParam[] = [ + { base: 'ETH', quote: 'EUR' }, + { base: 'LINK', quote: 'ETH' }, +] +const staleParams: TickerParam[] = [{ base: 'BTC', quote: 'USD' }] + +const testCustomSubscriptionMessages = ( + transport: WebsocketReverseMappingTransport, + subscribeMethod: string, + unsubscribeMethod: string, +) => { + it('sends one subscribe message containing all new tickers', () => { + const setReverseMapping = jest.spyOn(transport, 'setReverseMapping') + const subscriptions: SubscriptionDeltas = { + desired: newParams, + new: newParams, + stale: [], + } + + const messages = runCustomSubscriptionMessages(transport, subscriptions) + + expect(messages).toEqual([ + { + jsonrpc: '2.0', + method: subscribeMethod, + params: { tickers: ['ETHEUR', 'LINKETH'] }, + }, + ]) + expect(setReverseMapping).toHaveBeenCalledTimes(2) + expect(setReverseMapping).toHaveBeenCalledWith('ETHEUR', newParams[0]) + expect(setReverseMapping).toHaveBeenCalledWith('LINKETH', newParams[1]) + + setReverseMapping.mockRestore() + }) + + it('sends one unsubscribe message containing all stale tickers', () => { + const subscriptions: SubscriptionDeltas = { + desired: [], + new: [], + stale: staleParams, + } + + const messages = runCustomSubscriptionMessages(transport, subscriptions) + + expect(messages).toEqual([ + { + jsonrpc: '2.0', + method: unsubscribeMethod, + params: { tickers: ['BTCUSD'] }, + }, + ]) + }) + + it('batches subscribe and unsubscribe into separate messages when both change', () => { + const setReverseMapping = jest.spyOn(transport, 'setReverseMapping') + const subscriptions: SubscriptionDeltas = { + desired: newParams, + new: newParams, + stale: staleParams, + } + + const messages = runCustomSubscriptionMessages(transport, subscriptions) + + expect(messages).toEqual([ + { + jsonrpc: '2.0', + method: subscribeMethod, + params: { tickers: ['ETHEUR', 'LINKETH'] }, + }, + { + jsonrpc: '2.0', + method: unsubscribeMethod, + params: { tickers: ['BTCUSD'] }, + }, + ]) + expect(setReverseMapping).toHaveBeenCalledTimes(2) + + setReverseMapping.mockRestore() + }) + + it('returns no messages when there are no subscription changes', () => { + const subscriptions: SubscriptionDeltas = { + desired: newParams, + new: [], + stale: [], + } + + expect(runCustomSubscriptionMessages(transport, subscriptions)).toEqual([]) + }) + + it('uses customSubscriptionMessages instead of per-ticker subscribeMessage', () => { + const builders = ( + transport as unknown as { + config: { + builders: { + customSubscriptionMessages?: unknown + subscribeMessage?: unknown + unsubscribeMessage?: unknown + } + } + } + ).config.builders + + expect(builders.customSubscriptionMessages).toBeDefined() + expect(builders.subscribeMessage).toBeUndefined() + expect(builders.unsubscribeMessage).toBeUndefined() + }) +} + +describe('price transport customSubscriptionMessages', () => { + testCustomSubscriptionMessages( + priceTransport, + 'vwap_subscribe', + 'vwap_unsubscribe', + ) +}) + +describe('vwap transport customSubscriptionMessages', () => { + testCustomSubscriptionMessages( + vwapTransport, + 'fixedvwap_subscribe', + 'fixedvwap_unsubscribe', + ) +}) + +describe('crypto-lwba transport customSubscriptionMessages', () => { + testCustomSubscriptionMessages( + lwbaTransport, + 'bidask_subscribe', + 'bidask_unsubscribe', + ) +})