feat(blocksize-capital): optional WebSocket subscription sharding#5007
Open
azgms wants to merge 1 commit into
Open
feat(blocksize-capital): optional WebSocket subscription sharding#5007azgms wants to merge 1 commit into
azgms wants to merge 1 commit into
Conversation
Add a ShardedWebsocketReverseMappingTransport that wraps N inner WebsocketReverseMappingTransport instances and routes each subscription to a shard via a stable hash of (base, quote). Useful when the data provider enforces a per-connection subscription cap and a deployment needs to subscribe to more pairs than a single WS can hold. - New WS_NUM_SHARDS env var, default 1 (single connection — identical behavior to before this change), added to the adapter's AdapterConfig so the README generator picks it up. - All three WS-based endpoints (price, crypto-lwba, vwap) wrap their existing transport in the sharded wrapper. - Each shard gets its own per-shard transportName during initialize() so SubscriptionSetFactory builds an isolated subscription set per shard (otherwise with CACHE_TYPE=redis every shard would share one zset and subscribe to every pair, defeating the sharding). After initialize, `transport.name` is restored to the canonical name so response cache writes land under the key the foreground HTTP request reads. - All shards share the framework-provided response cache via the initialize() dependencies, so callers don't need to know which shard owns which pair.
🦋 Changeset detectedLatest commit: 0ede33d The changes in this PR will be included in the next version bump. This PR includes changesets to release 7 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add an optional, opt-in WebSocket subscription-sharding wrapper to the
blocksize-capitaladapter. When the newWS_NUM_SHARDSenv var is set > 1, each WS-based endpoint (price,crypto-lwba,vwap) splits its subscriptions across N innerWebsocketReverseMappingTransportinstances — each opening its own WebSocket — keyed by a stable hash of(base, quote). Default is1, so existing deployments see no behavior change.Why
Some data providers enforce a per-connection limit on how many subscriptions a single WebSocket can hold. In high-volume deployments where the number of distinct pairs exceeds that cap, the upstream server closes the connection (typically TCP RST → close code
1006) on a roughly fixed interval. Each reconnect costs a re-subscribe burst plus a brief window where requests for pairs whose tick hasn't been re-published yet exhaust the framework's cache-polling budget and return504 timeoutError.In our deployment this resulted in a steady-state timeout rate of ~9% with disconnect/reconnect cycles every ~30-60s. Bisecting the threshold from a clean WS client (Python
websockets, same API key, from inside a pod on the same node) reproduced an abrupt 1006 close at exactly the ~85-subscription mark — single-pair connections held indefinitely; ≥90 subscriptions died at ~60s.Approach
A thin
ShardedWebsocketReverseMappingTransportwrapper implementsTransport<T>and owns N innerWebsocketReverseMappingTransportinstances:registerRequestis delegated to the shard chosen byhash(base+quote) % N. Only that shard's WS subscribes to the pair.backgroundExecutefans out to every shard in parallel.initialize, the wrapper passes a per-shardtransportName(${name}-shard-${i}) so the framework'sSubscriptionSetFactory.buildSet(endpointName, name)builds a distinct subscription set per shard. Without this, withCACHE_TYPE=redisall shards would share the same redis zset (its key embedstransportName) and each shard would subscribe to every pair via its own WS — defeating the sharding.initialize, the wrapper restorestransport.nameto the canonical name. The framework's WS message handler writes responses viathis.responseCache.write(this.name, results), so this restoration ensures cache keys are stable across shards and the foreground HTTP request finds them under the expected key.ResponseCacheinstance (passed through unchanged viaTransportDependencies).The three existing endpoint transports (
price.ts,crypto-lwba.ts,vwap.ts) each wrap their existing innerWebsocketReverseMappingTransportconfig in a single-line addition; no other framework changes are required.Backwards compatibility
WS_NUM_SHARDSdefaults to1. With one shard the wrapper is effectively a pass-through (single inner WS connection, same subscription set, same cache write path). Verified locally: with no env var set, the adapter logsInitializing 1 WS shards for default_single_transport, opens a single WS, and serves requests as before.Test plan
Local repro (150 distinct pairs, traffic at ~3 req/sec per pair):
WS_NUM_SHARDS=4timeoutErrorrate (after warmup)Verified opt-in default (no env var):
Initializing 1 WS shards for default_single_transportNotes
external-adapter-frameworklater, and the type signature is already generic onWebsocketTransportGenerics.yarn generate:readme blocksize-capitalafter addingWS_NUM_SHARDSto the adapter'sAdapterConfig.