Skip to content

feat(blocksize-capital): optional WebSocket subscription sharding#5007

Open
azgms wants to merge 1 commit into
smartcontractkit:mainfrom
linkpoolio:feat/blocksize-capital-ws-sharding-upstream
Open

feat(blocksize-capital): optional WebSocket subscription sharding#5007
azgms wants to merge 1 commit into
smartcontractkit:mainfrom
linkpoolio:feat/blocksize-capital-ws-sharding-upstream

Conversation

@azgms

@azgms azgms commented May 28, 2026

Copy link
Copy Markdown

Summary

Add an optional, opt-in WebSocket subscription-sharding wrapper to the blocksize-capital adapter. When the new WS_NUM_SHARDS env var is set > 1, each WS-based endpoint (price, crypto-lwba, vwap) splits its subscriptions across N inner WebsocketReverseMappingTransport instances — each opening its own WebSocket — keyed by a stable hash of (base, quote). Default is 1, so existing deployments see no behavior change.

Why

Some data providers enforce a per-connection limit on how many subscriptions a single WebSocket can hold. In high-volume deployments where the number of distinct pairs exceeds that cap, the upstream server closes the connection (typically TCP RST → close code 1006) on a roughly fixed interval. Each reconnect costs a re-subscribe burst plus a brief window where requests for pairs whose tick hasn't been re-published yet exhaust the framework's cache-polling budget and return 504 timeoutError.

In our deployment this resulted in a steady-state timeout rate of ~9% with disconnect/reconnect cycles every ~30-60s. Bisecting the threshold from a clean WS client (Python websockets, same API key, from inside a pod on the same node) reproduced an abrupt 1006 close at exactly the ~85-subscription mark — single-pair connections held indefinitely; ≥90 subscriptions died at ~60s.

Approach

A thin ShardedWebsocketReverseMappingTransport wrapper implements Transport<T> and owns N inner WebsocketReverseMappingTransport instances:

  • registerRequest is delegated to the shard chosen by hash(base+quote) % N. Only that shard's WS subscribes to the pair.
  • backgroundExecute fans out to every shard in parallel.
  • During initialize, the wrapper passes a per-shard transportName (${name}-shard-${i}) so the framework's SubscriptionSetFactory.buildSet(endpointName, name) builds a distinct subscription set per shard. Without this, with CACHE_TYPE=redis all shards would share the same redis zset (its key embeds transportName) and each shard would subscribe to every pair via its own WS — defeating the sharding.
  • Immediately after initialize, the wrapper restores transport.name to the canonical name. The framework's WS message handler writes responses via this.responseCache.write(this.name, results), so this restoration ensures cache keys are stable across shards and the foreground HTTP request finds them under the expected key.
  • All shards share the same ResponseCache instance (passed through unchanged via TransportDependencies).

The three existing endpoint transports (price.ts, crypto-lwba.ts, vwap.ts) each wrap their existing inner WebsocketReverseMappingTransport config in a single-line addition; no other framework changes are required.

Backwards compatibility

WS_NUM_SHARDS defaults to 1. With one shard the wrapper is effectively a pass-through (single inner WS connection, same subscription set, same cache write path). Verified locally: with no env var set, the adapter logs Initializing 1 WS shards for default_single_transport, opens a single WS, and serves requests as before.

Test plan

Local repro (150 distinct pairs, traffic at ~3 req/sec per pair):

Stock WS_NUM_SHARDS=4
WS disconnects over 200s several per minute 0
Steady-state timeoutError rate (after warmup) ~9% ~0% (cold-start only)

Verified opt-in default (no env var):

Result
Shard initialization log Initializing 1 WS shards for default_single_transport
Single inner WS connection opens
Requests served + cache populated

Notes

  • The wrapper is intentionally kept in this adapter's package rather than added to the framework. If other adapters need the same pattern, the pattern is small enough to be lifted into external-adapter-framework later, and the type signature is already generic on WebsocketTransportGenerics.
  • The README change is the output of yarn generate:readme blocksize-capital after adding WS_NUM_SHARDS to the adapter's AdapterConfig.

Add a ShardedWebsocketReverseMappingTransport that wraps N inner
WebsocketReverseMappingTransport instances and routes each subscription
to a shard via a stable hash of (base, quote). Useful when the data
provider enforces a per-connection subscription cap and a deployment
needs to subscribe to more pairs than a single WS can hold.

- New WS_NUM_SHARDS env var, default 1 (single connection — identical
  behavior to before this change), added to the adapter's AdapterConfig
  so the README generator picks it up.
- All three WS-based endpoints (price, crypto-lwba, vwap) wrap their
  existing transport in the sharded wrapper.
- Each shard gets its own per-shard transportName during initialize()
  so SubscriptionSetFactory builds an isolated subscription set per
  shard (otherwise with CACHE_TYPE=redis every shard would share one
  zset and subscribe to every pair, defeating the sharding). After
  initialize, `transport.name` is restored to the canonical name so
  response cache writes land under the key the foreground HTTP request
  reads.
- All shards share the framework-provided response cache via the
  initialize() dependencies, so callers don't need to know which shard
  owns which pair.
@changeset-bot

changeset-bot Bot commented May 28, 2026

Copy link
Copy Markdown

🦋 Changeset detected

Latest commit: 0ede33d

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 7 packages
Name Type
@chainlink/blocksize-capital-adapter Minor
@chainlink/token-allocation-adapter Patch
@chainlink/bsol-price-adapter Patch
@chainlink/crypto-volatility-index-adapter Patch
@chainlink/savax-price-adapter Patch
@chainlink/set-token-index-adapter Patch
@chainlink/xsushi-price-adapter Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant