Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/shard-blocksize-ws.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@chainlink/blocksize-capital-adapter': minor
---

Add optional WebSocket subscription sharding. Set `WS_NUM_SHARDS` > 1 to spread
subscriptions across multiple connections — useful when the data provider has
a per-connection subscription cap. Default is 1 (single connection, same as
before).
9 changes: 5 additions & 4 deletions packages/sources/blocksize-capital/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ This document was generated automatically. Please see [README Generator](../../s

## Environment Variables

| Required? | Name | Description | Type | Options | Default |
| :-------: | :-------------: | :----------------------------------: | :----: | :-----: | :---------------------------------------------: |
| ✅ | API_KEY | The Blocksize Capital API key to use | string | | |
| | WS_API_ENDPOINT | The default WebSocket API base url | string | | `wss://data.blocksize.capital/marketdata/v1/ws` |
| Required? | Name | Description | Type | Options | Default |
| :-------: | :-------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: | :----: | :-----: | :---------------------------------------------: |
| ✅ | API_KEY | The Blocksize Capital API key to use | string | | |
| | WS_API_ENDPOINT | The default WebSocket API base url | string | | `wss://data.blocksize.capital/marketdata/v1/ws` |
| | WS_NUM_SHARDS | Number of WebSocket connections to shard subscriptions across (each WS endpoint). Useful when the provider has a per-connection subscription cap. Default 1 = single connection (no sharding). | number | | `1` |

---

Expand Down
8 changes: 8 additions & 0 deletions packages/sources/blocksize-capital/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ export const config = new AdapterConfig({
default: 'wss://data.blocksize.capital/marketdata/v1/ws',
sensitive: false,
},
WS_NUM_SHARDS: {
description:
'Number of WebSocket connections to shard subscriptions across (each WS endpoint). Useful when the provider has a per-connection subscription cap. Default 1 = single connection (no sharding).',
type: 'number',
required: false,
default: 1,
sensitive: false,
},
})
22 changes: 17 additions & 5 deletions packages/sources/blocksize-capital/src/transport/crypto-lwba.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BaseEndpointTypes } from '../endpoint/crypto-lwba'
import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket'
import { makeLogger, ProviderResult } from '@chainlink/external-adapter-framework/util'
import { BaseEndpointTypes } from '../endpoint/crypto-lwba'
import { ShardedWebsocketReverseMappingTransport } from './sharded'
import {
BaseMessage,
blocksizeDefaultUnsubscribeMessageBuilder,
Expand Down Expand Up @@ -31,8 +32,12 @@ export type WsTransportTypes = BaseEndpointTypes & {
}
}

export const transport: WebsocketReverseMappingTransport<WsTransportTypes, string> =
new WebsocketReverseMappingTransport<WsTransportTypes, string>({
// See price.ts for sharding rationale. Default is 1 — opt-in via env var.
const NUM_SHARDS = Number(process.env.WS_NUM_SHARDS || 1)

const createInnerLwbaTransport = (): WebsocketReverseMappingTransport<WsTransportTypes, string> => {
let t: WebsocketReverseMappingTransport<WsTransportTypes, string>
t = new WebsocketReverseMappingTransport<WsTransportTypes, string>({
url: (context) => context.adapterSettings.WS_API_ENDPOINT,
handlers: {
open: (connection, context) =>
Expand All @@ -42,7 +47,7 @@ export const transport: WebsocketReverseMappingTransport<WsTransportTypes, strin
const updates = message.params.updates
const results: ProviderResult<WsTransportTypes>[] = []
for (const update of updates) {
const params = transport.getReverseMapping(update.ticker)
const params = t.getReverseMapping(update.ticker)
if (!params) {
continue
}
Expand Down Expand Up @@ -79,10 +84,17 @@ export const transport: WebsocketReverseMappingTransport<WsTransportTypes, strin
builders: {
subscribeMessage: (params) => {
const pair = `${params.base}${params.quote}`.toUpperCase()
transport.setReverseMapping(pair, params)
t.setReverseMapping(pair, params)
return buildBlocksizeWebsocketTickersMessage('bidask_subscribe', pair)
},
unsubscribeMessage: (params) =>
blocksizeDefaultUnsubscribeMessageBuilder(params.base, params.quote, 'bidask_unsubscribe'),
},
})
return t
}

export const transport = new ShardedWebsocketReverseMappingTransport<WsTransportTypes, string>(
NUM_SHARDS,
() => createInnerLwbaTransport(),
)
29 changes: 23 additions & 6 deletions packages/sources/blocksize-capital/src/transport/price.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BaseEndpointTypes } from '../endpoint/price'
import { makeLogger } from '@chainlink/external-adapter-framework/util'
import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket'
import { makeLogger } from '@chainlink/external-adapter-framework/util'
import { BaseEndpointTypes } from '../endpoint/price'
import { ShardedWebsocketReverseMappingTransport } from './sharded'
import {
BaseMessage,
blocksizeDefaultUnsubscribeMessageBuilder,
Expand Down Expand Up @@ -31,8 +32,17 @@ export type WsTransportTypes = BaseEndpointTypes & {
}
}

export const transport: WebsocketReverseMappingTransport<WsTransportTypes, string> =
new WebsocketReverseMappingTransport<WsTransportTypes, string>({
// Optional: shard subscriptions across multiple WS connections to work around
// per-connection subscription caps on the data provider side. Default is 1
// (single connection — same behavior as before this feature was added).
const NUM_SHARDS = Number(process.env.WS_NUM_SHARDS || 1)

const createInnerPriceTransport = (): WebsocketReverseMappingTransport<
WsTransportTypes,
string
> => {
let t: WebsocketReverseMappingTransport<WsTransportTypes, string>
t = new WebsocketReverseMappingTransport<WsTransportTypes, string>({
url: (context) => context.adapterSettings.WS_API_ENDPOINT,
handlers: {
open: (connection, context) =>
Expand All @@ -48,16 +58,23 @@ export const transport: WebsocketReverseMappingTransport<WsTransportTypes, strin
return []
}
const updates = message.params.updates
return handlePriceUpdates(updates, transport)
return handlePriceUpdates(updates, t)
},
},
builders: {
subscribeMessage: (params) => {
const pair = `${params.base}${params.quote}`.toUpperCase()
transport.setReverseMapping(pair, params)
t.setReverseMapping(pair, params)
return buildBlocksizeWebsocketTickersMessage('vwap_subscribe', pair)
},
unsubscribeMessage: (params) =>
blocksizeDefaultUnsubscribeMessageBuilder(params.base, params.quote, 'vwap_unsubscribe'),
},
})
return t
}

export const transport = new ShardedWebsocketReverseMappingTransport<WsTransportTypes, string>(
NUM_SHARDS,
() => createInnerPriceTransport(),
)
98 changes: 98 additions & 0 deletions packages/sources/blocksize-capital/src/transport/sharded.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Sharded WebSocket transport.
*
* Wraps N inner `WebsocketReverseMappingTransport` instances and routes each
* subscription to one of them based on a stable hash of (base, quote). Each
* inner transport opens its own WebSocket connection. This is useful when the
* data provider enforces a per-connection subscription cap and high-volume
* deployments need to fit more pairs than a single connection can hold.
*
* The inner transports all share the same response cache, so callers (and the
* cache key seen by the foreground HTTP request) don't need to know which
* shard owns which pair. Each shard gets its own subscription set so a shard
* only ever subscribes to pairs that hash to it.
*/
import { EndpointContext } from '@chainlink/external-adapter-framework/adapter'
import { ResponseCache } from '@chainlink/external-adapter-framework/cache/response'
import { Transport, TransportDependencies } from '@chainlink/external-adapter-framework/transports'
import {
WebsocketReverseMappingTransport,
WebsocketTransportGenerics,
} from '@chainlink/external-adapter-framework/transports/websocket'
import { AdapterRequest, makeLogger } from '@chainlink/external-adapter-framework/util'
import { TypeFromDefinition } from '@chainlink/external-adapter-framework/validation/input-params'

const logger = makeLogger('ShardedWsTransport')

type PairParams = { base: string; quote: string }

export class ShardedWebsocketReverseMappingTransport<T extends WebsocketTransportGenerics, K>
implements Transport<T>
{
name = 'sharded-ws'
responseCache!: ResponseCache<T>
private shards: WebsocketReverseMappingTransport<T, K>[]

constructor(
private numShards: number,
factory: (shardIndex: number) => WebsocketReverseMappingTransport<T, K>,
) {
if (numShards < 1) throw new Error('numShards must be >= 1')
this.shards = Array.from({ length: numShards }, (_, i) => factory(i))
}

// Stable hash of (base+quote) → shard index. Same pair always lands on the
// same shard, even across pod restarts.
private shardFor(params: PairParams): number {
const key = `${params.base}${params.quote}`.toUpperCase()
let hash = 0
for (let i = 0; i < key.length; i++) {
hash = (Math.imul(31, hash) + key.charCodeAt(i)) | 0
}
return Math.abs(hash) % this.numShards
}

async initialize(
dependencies: TransportDependencies<T>,
adapterSettings: T['Settings'],
endpointName: string,
transportName: string,
): Promise<void> {
logger.info(`Initializing ${this.numShards} WS shards for ${transportName}`)
// Each shard must have a UNIQUE name when initialize() runs so the framework
// builds a per-shard subscription set (otherwise with CACHE_TYPE=redis all
// shards share a single redis-backed set and every shard ends up subscribing
// to every pair via its own WS — defeating the sharding).
//
// After initialize, we restore `name` to the canonical transportName so the
// response cache writes from the inner transport land under the key the
// foreground HTTP request expects (`...-${transportName}-${params}`).
await Promise.all(
this.shards.map((s, i) =>
s
.initialize(dependencies, adapterSettings, endpointName, `${transportName}-shard-${i}`)
.then(() => {
;(s as unknown as { name: string }).name = transportName
}),
),
)
// All shards share the same response cache (passed via dependencies).
this.responseCache = this.shards[0]!.responseCache
}

async registerRequest(
req: AdapterRequest<TypeFromDefinition<T['Parameters']>>,
adapterSettings: T['Settings'],
): Promise<void> {
const data = req.requestContext.data as unknown as PairParams
const idx = this.shardFor(data)
const shard = this.shards[idx]!
if (shard.registerRequest) return shard.registerRequest(req, adapterSettings)
}

async backgroundExecute(context: EndpointContext<T>): Promise<void> {
await Promise.all(
this.shards.map((s) => (s.backgroundExecute ? s.backgroundExecute(context) : undefined)),
)
}
}
22 changes: 17 additions & 5 deletions packages/sources/blocksize-capital/src/transport/vwap.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BaseEndpointTypes } from '../endpoint/vwap'
import { WebsocketReverseMappingTransport } from '@chainlink/external-adapter-framework/transports/websocket'
import { makeLogger } from '@chainlink/external-adapter-framework/util'
import { BaseEndpointTypes } from '../endpoint/vwap'
import { ShardedWebsocketReverseMappingTransport } from './sharded'
import {
BaseMessage,
VwapUpdate,
Expand Down Expand Up @@ -48,21 +49,25 @@ const preProcessFixedVwapMessage = (message: FixedVwapMessage): VwapUpdate[] =>
return updates
}

export const transport: WebsocketReverseMappingTransport<WsTransportTypes, string> =
new WebsocketReverseMappingTransport<WsTransportTypes, string>({
// See price.ts for sharding rationale. Default is 1 — opt-in via env var.
const NUM_SHARDS = Number(process.env.WS_NUM_SHARDS || 1)

const createInnerVwapTransport = (): WebsocketReverseMappingTransport<WsTransportTypes, string> => {
let t: WebsocketReverseMappingTransport<WsTransportTypes, string>
t = new WebsocketReverseMappingTransport<WsTransportTypes, string>({
url: (context) => context.adapterSettings.WS_API_ENDPOINT,
handlers: {
open: (connection, context) =>
blocksizeDefaultWebsocketOpenHandler(connection, context.adapterSettings.API_KEY),
message: (message) => {
const updates = preProcessFixedVwapMessage(message)
return handlePriceUpdates(updates, transport)
return handlePriceUpdates(updates, t)
},
},
builders: {
subscribeMessage: (params) => {
const pair = `${params.base}${params.quote}`.toUpperCase()
transport.setReverseMapping(pair, params)
t.setReverseMapping(pair, params)
return buildBlocksizeWebsocketTickersMessage('fixedvwap_subscribe', pair)
},
unsubscribeMessage: (params) =>
Expand All @@ -73,3 +78,10 @@ export const transport: WebsocketReverseMappingTransport<WsTransportTypes, strin
),
},
})
return t
}

export const transport = new ShardedWebsocketReverseMappingTransport<WsTransportTypes, string>(
NUM_SHARDS,
() => createInnerVwapTransport(),
)