diff --git a/.changeset/shard-blocksize-ws.md b/.changeset/shard-blocksize-ws.md new file mode 100644 index 00000000000..963dde4520f --- /dev/null +++ b/.changeset/shard-blocksize-ws.md @@ -0,0 +1,8 @@ +--- +'@chainlink/blocksize-capital-adapter': minor +--- + +Add optional WebSocket subscription sharding. Set `WS_NUM_SHARDS` > 1 to spread +subscriptions across multiple connections — useful when the data provider has +a per-connection subscription cap. Default is 1 (single connection, same as +before). diff --git a/packages/sources/blocksize-capital/README.md b/packages/sources/blocksize-capital/README.md index 50c45d171f5..e7c975b77aa 100644 --- a/packages/sources/blocksize-capital/README.md +++ b/packages/sources/blocksize-capital/README.md @@ -6,10 +6,11 @@ This document was generated automatically. Please see [README Generator](../../s ## Environment Variables -| Required? | Name | Description | Type | Options | Default | -| :-------: | :-------------: | :----------------------------------: | :----: | :-----: | :---------------------------------------------: | -| ✅ | API_KEY | The Blocksize Capital API key to use | string | | | -| | WS_API_ENDPOINT | The default WebSocket API base url | string | | `wss://data.blocksize.capital/marketdata/v1/ws` | +| Required? | Name | Description | Type | Options | Default | +| :-------: | :-------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: | :----: | :-----: | :---------------------------------------------: | +| ✅ | API_KEY | The Blocksize Capital API key to use | string | | | +| | WS_API_ENDPOINT | The default WebSocket API base url | string | | `wss://data.blocksize.capital/marketdata/v1/ws` | +| | WS_NUM_SHARDS | Number of WebSocket connections to shard subscriptions across (each WS endpoint). Useful when the provider has a per-connection subscription cap. Default 1 = single connection (no sharding). | number | | `1` | --- diff --git a/packages/sources/blocksize-capital/src/config/index.ts b/packages/sources/blocksize-capital/src/config/index.ts index d9082b7766f..df5c4286506 100644 --- a/packages/sources/blocksize-capital/src/config/index.ts +++ b/packages/sources/blocksize-capital/src/config/index.ts @@ -16,4 +16,12 @@ export const config = new AdapterConfig({ default: 'wss://data.blocksize.capital/marketdata/v1/ws', sensitive: false, }, + WS_NUM_SHARDS: { + description: + 'Number of WebSocket connections to shard subscriptions across (each WS endpoint). Useful when the provider has a per-connection subscription cap. Default 1 = single connection (no sharding).', + type: 'number', + required: false, + default: 1, + sensitive: false, + }, }) diff --git a/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts b/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts index e4cb24c06b1..3c816422ae5 100644 --- a/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts +++ b/packages/sources/blocksize-capital/src/transport/crypto-lwba.ts @@ -1,6 +1,7 @@ -import { BaseEndpointTypes } from '../endpoint/crypto-lwba' import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket' import { makeLogger, ProviderResult } from '@chainlink/external-adapter-framework/util' +import { BaseEndpointTypes } from '../endpoint/crypto-lwba' +import { ShardedWebsocketReverseMappingTransport } from './sharded' import { BaseMessage, blocksizeDefaultUnsubscribeMessageBuilder, @@ -31,8 +32,12 @@ export type WsTransportTypes = BaseEndpointTypes & { } } -export const transport: WebsocketReverseMappingTransport = - new WebsocketReverseMappingTransport({ +// See price.ts for sharding rationale. Default is 1 — opt-in via env var. +const NUM_SHARDS = Number(process.env.WS_NUM_SHARDS || 1) + +const createInnerLwbaTransport = (): WebsocketReverseMappingTransport => { + let t: WebsocketReverseMappingTransport + t = new WebsocketReverseMappingTransport({ url: (context) => context.adapterSettings.WS_API_ENDPOINT, handlers: { open: (connection, context) => @@ -42,7 +47,7 @@ export const transport: WebsocketReverseMappingTransport[] = [] for (const update of updates) { - const params = transport.getReverseMapping(update.ticker) + const params = t.getReverseMapping(update.ticker) if (!params) { continue } @@ -79,10 +84,17 @@ export const transport: WebsocketReverseMappingTransport { const pair = `${params.base}${params.quote}`.toUpperCase() - transport.setReverseMapping(pair, params) + t.setReverseMapping(pair, params) return buildBlocksizeWebsocketTickersMessage('bidask_subscribe', pair) }, unsubscribeMessage: (params) => blocksizeDefaultUnsubscribeMessageBuilder(params.base, params.quote, 'bidask_unsubscribe'), }, }) + return t +} + +export const transport = new ShardedWebsocketReverseMappingTransport( + NUM_SHARDS, + () => createInnerLwbaTransport(), +) diff --git a/packages/sources/blocksize-capital/src/transport/price.ts b/packages/sources/blocksize-capital/src/transport/price.ts index 3da2516a34e..41b152a9a58 100644 --- a/packages/sources/blocksize-capital/src/transport/price.ts +++ b/packages/sources/blocksize-capital/src/transport/price.ts @@ -1,6 +1,7 @@ -import { BaseEndpointTypes } from '../endpoint/price' -import { makeLogger } from '@chainlink/external-adapter-framework/util' import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket' +import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { BaseEndpointTypes } from '../endpoint/price' +import { ShardedWebsocketReverseMappingTransport } from './sharded' import { BaseMessage, blocksizeDefaultUnsubscribeMessageBuilder, @@ -31,8 +32,17 @@ export type WsTransportTypes = BaseEndpointTypes & { } } -export const transport: WebsocketReverseMappingTransport = - new WebsocketReverseMappingTransport({ +// Optional: shard subscriptions across multiple WS connections to work around +// per-connection subscription caps on the data provider side. Default is 1 +// (single connection — same behavior as before this feature was added). +const NUM_SHARDS = Number(process.env.WS_NUM_SHARDS || 1) + +const createInnerPriceTransport = (): WebsocketReverseMappingTransport< + WsTransportTypes, + string +> => { + let t: WebsocketReverseMappingTransport + t = new WebsocketReverseMappingTransport({ url: (context) => context.adapterSettings.WS_API_ENDPOINT, handlers: { open: (connection, context) => @@ -48,16 +58,23 @@ export const transport: WebsocketReverseMappingTransport { const pair = `${params.base}${params.quote}`.toUpperCase() - transport.setReverseMapping(pair, params) + t.setReverseMapping(pair, params) return buildBlocksizeWebsocketTickersMessage('vwap_subscribe', pair) }, unsubscribeMessage: (params) => blocksizeDefaultUnsubscribeMessageBuilder(params.base, params.quote, 'vwap_unsubscribe'), }, }) + return t +} + +export const transport = new ShardedWebsocketReverseMappingTransport( + NUM_SHARDS, + () => createInnerPriceTransport(), +) diff --git a/packages/sources/blocksize-capital/src/transport/sharded.ts b/packages/sources/blocksize-capital/src/transport/sharded.ts new file mode 100644 index 00000000000..521b78fb04b --- /dev/null +++ b/packages/sources/blocksize-capital/src/transport/sharded.ts @@ -0,0 +1,98 @@ +/** + * Sharded WebSocket transport. + * + * Wraps N inner `WebsocketReverseMappingTransport` instances and routes each + * subscription to one of them based on a stable hash of (base, quote). Each + * inner transport opens its own WebSocket connection. This is useful when the + * data provider enforces a per-connection subscription cap and high-volume + * deployments need to fit more pairs than a single connection can hold. + * + * The inner transports all share the same response cache, so callers (and the + * cache key seen by the foreground HTTP request) don't need to know which + * shard owns which pair. Each shard gets its own subscription set so a shard + * only ever subscribes to pairs that hash to it. + */ +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { ResponseCache } from '@chainlink/external-adapter-framework/cache/response' +import { Transport, TransportDependencies } from '@chainlink/external-adapter-framework/transports' +import { + WebsocketReverseMappingTransport, + WebsocketTransportGenerics, +} from '@chainlink/external-adapter-framework/transports/websocket' +import { AdapterRequest, makeLogger } from '@chainlink/external-adapter-framework/util' +import { TypeFromDefinition } from '@chainlink/external-adapter-framework/validation/input-params' + +const logger = makeLogger('ShardedWsTransport') + +type PairParams = { base: string; quote: string } + +export class ShardedWebsocketReverseMappingTransport + implements Transport +{ + name = 'sharded-ws' + responseCache!: ResponseCache + private shards: WebsocketReverseMappingTransport[] + + constructor( + private numShards: number, + factory: (shardIndex: number) => WebsocketReverseMappingTransport, + ) { + if (numShards < 1) throw new Error('numShards must be >= 1') + this.shards = Array.from({ length: numShards }, (_, i) => factory(i)) + } + + // Stable hash of (base+quote) → shard index. Same pair always lands on the + // same shard, even across pod restarts. + private shardFor(params: PairParams): number { + const key = `${params.base}${params.quote}`.toUpperCase() + let hash = 0 + for (let i = 0; i < key.length; i++) { + hash = (Math.imul(31, hash) + key.charCodeAt(i)) | 0 + } + return Math.abs(hash) % this.numShards + } + + async initialize( + dependencies: TransportDependencies, + adapterSettings: T['Settings'], + endpointName: string, + transportName: string, + ): Promise { + logger.info(`Initializing ${this.numShards} WS shards for ${transportName}`) + // Each shard must have a UNIQUE name when initialize() runs so the framework + // builds a per-shard subscription set (otherwise with CACHE_TYPE=redis all + // shards share a single redis-backed set and every shard ends up subscribing + // to every pair via its own WS — defeating the sharding). + // + // After initialize, we restore `name` to the canonical transportName so the + // response cache writes from the inner transport land under the key the + // foreground HTTP request expects (`...-${transportName}-${params}`). + await Promise.all( + this.shards.map((s, i) => + s + .initialize(dependencies, adapterSettings, endpointName, `${transportName}-shard-${i}`) + .then(() => { + ;(s as unknown as { name: string }).name = transportName + }), + ), + ) + // All shards share the same response cache (passed via dependencies). + this.responseCache = this.shards[0]!.responseCache + } + + async registerRequest( + req: AdapterRequest>, + adapterSettings: T['Settings'], + ): Promise { + const data = req.requestContext.data as unknown as PairParams + const idx = this.shardFor(data) + const shard = this.shards[idx]! + if (shard.registerRequest) return shard.registerRequest(req, adapterSettings) + } + + async backgroundExecute(context: EndpointContext): Promise { + await Promise.all( + this.shards.map((s) => (s.backgroundExecute ? s.backgroundExecute(context) : undefined)), + ) + } +} diff --git a/packages/sources/blocksize-capital/src/transport/vwap.ts b/packages/sources/blocksize-capital/src/transport/vwap.ts index b8e134f6460..7fa629ab477 100644 --- a/packages/sources/blocksize-capital/src/transport/vwap.ts +++ b/packages/sources/blocksize-capital/src/transport/vwap.ts @@ -1,6 +1,7 @@ -import { BaseEndpointTypes } from '../endpoint/vwap' import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket' import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { BaseEndpointTypes } from '../endpoint/vwap' +import { ShardedWebsocketReverseMappingTransport } from './sharded' import { BaseMessage, VwapUpdate, @@ -48,21 +49,25 @@ const preProcessFixedVwapMessage = (message: FixedVwapMessage): VwapUpdate[] => return updates } -export const transport: WebsocketReverseMappingTransport = - new WebsocketReverseMappingTransport({ +// See price.ts for sharding rationale. Default is 1 — opt-in via env var. +const NUM_SHARDS = Number(process.env.WS_NUM_SHARDS || 1) + +const createInnerVwapTransport = (): WebsocketReverseMappingTransport => { + let t: WebsocketReverseMappingTransport + t = new WebsocketReverseMappingTransport({ url: (context) => context.adapterSettings.WS_API_ENDPOINT, handlers: { open: (connection, context) => blocksizeDefaultWebsocketOpenHandler(connection, context.adapterSettings.API_KEY), message: (message) => { const updates = preProcessFixedVwapMessage(message) - return handlePriceUpdates(updates, transport) + return handlePriceUpdates(updates, t) }, }, builders: { subscribeMessage: (params) => { const pair = `${params.base}${params.quote}`.toUpperCase() - transport.setReverseMapping(pair, params) + t.setReverseMapping(pair, params) return buildBlocksizeWebsocketTickersMessage('fixedvwap_subscribe', pair) }, unsubscribeMessage: (params) => @@ -73,3 +78,10 @@ export const transport: WebsocketReverseMappingTransport( + NUM_SHARDS, + () => createInnerVwapTransport(), +)