From 9ec45c639138391d4af6e00f2ca0d4ac48405baf Mon Sep 17 00:00:00 2001 From: Michal Szorad Date: Sat, 6 Dec 2025 13:37:46 -0500 Subject: [PATCH] fix(perps): add fallback subscriptions for missing HyperLiquid API fields cp-7.60.4 (#23753) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## **Description** This PR fixes missing data issues in HyperLiquid WebSocket subscriptions by implementing fallback subscription mechanisms. The HyperLiquid API has changed its structure, and fields like `clearinghouseState` and `openOrders` that were previously available in `webData2` and `webData3` subscriptions are now missing or have been moved to separate subscription types. ## **Changelog** CHANGELOG entry: Fixed missing position and order data in HyperLiquid perpetuals by implementing fallback subscription mechanisms for API changes ## **Related issues** Fixes: https://github.com/MetaMask/metamask-mobile/issues/23721 Incident: https://consensys.slack.com/archives/C0A1HK3MZ6K ## **Manual testing steps** ```gherkin Feature: HyperLiquid Perpetuals Data Subscription Scenario: user views positions when API fields are missing Given user has an active HyperLiquid perpetuals position And the HyperLiquid API is missing clearinghouseState in webData3 response When user opens the perpetuals screen Then positions should be displayed correctly using fallback subscription And position data should update in real-time Scenario: user views orders when API fields are missing Given user has open orders on HyperLiquid And the HyperLiquid API is missing openOrders in webData3 response When user opens the perpetuals screen Then orders should be displayed correctly using fallback subscription And order data should update in real-time Scenario: user views account state when API fields are missing Given user is connected to HyperLiquid And the HyperLiquid API is missing clearinghouseState in webData2 response When user views their account balance Then account state should be displayed correctly using fallback subscription And account data should update in real-time ``` ## **Screenshots/Recordings** ### **Before** Simulator Screenshot - iPhone 17
Pro - 2025-12-06 at 10 07 33 ### **After** Simulator Screenshot - iPhone 17
Pro - 2025-12-06 at 10 03 55 ## **Pre-merge author checklist** - [x] I’ve followed [MetaMask Contributor Docs](https://github.com/MetaMask/contributor-docs) and [MetaMask Mobile Coding Standards](https://github.com/MetaMask/metamask-mobile/blob/main/.github/guidelines/CODING_GUIDELINES.md). - [x] I've completed the PR template to the best of my ability - [x] I’ve included tests if applicable - [x] I’ve documented my code using [JSDoc](https://jsdoc.app/) format if applicable - [x] I’ve applied the right labels on the PR (see [labeling guidelines](https://github.com/MetaMask/metamask-mobile/blob/main/.github/guidelines/LABELING_GUIDELINES.md)). Not required for external contributors. ## **Pre-merge reviewer checklist** - [ ] I've manually tested the PR (e.g. pull and build branch, run the app, test code being changed). - [ ] I confirm that this PR addresses all acceptance criteria described in the ticket it closes and includes the necessary testing evidence such as recordings and or screenshots. --- > [!NOTE] > Adds fallback `clearinghouseState` and `openOrders` subscriptions and centralized aggregation to handle missing HyperLiquid WebSocket fields, with robust error handling and cleanup. > > - **HyperLiquid perps subscription service (`app/components/UI/Perps/services/HyperLiquidSubscriptionService.ts`)**: > - **Fallback data sources**: > - Add per-DEX fallback subscriptions for `clearinghouseState` and `openOrders` when absent in `webData3`, with caches (`fallbackClearinghouseStateCache`, `fallbackOpenOrdersCache`). > - Auto-ensure fallbacks during `webData3` processing; use cached fallback data when available. > - **Aggregation & notification**: > - Introduce `aggregateAndNotifySubscribers()` to unify per-DEX caches into aggregated positions, orders, and account and notify only on change. > - **Resilience & logging**: > - Wrap `webData2`/`webData3` callbacks in try/catch with detailed `Logger.error` context. > - Guard against missing fields in `webData2`/`webData3` before access. > - **Lifecycle management**: > - Track/manage fallback subscriptions (`clearinghouseStateSubscriptions`, `openOrdersSubscriptions`). > - Cleanup fallbacks and caches in `cleanupSharedWebData3Subscription()` and `clearAll()`. > - **Misc**: > - Maintain OI caps extraction and notification; preserve per-DEX caches and HIP-3 handling. > > Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 98a7365cb2ea8fabd17b0cdb2bdbe47e303c44a3. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot). --- .../HyperLiquidSubscriptionService.ts | 766 +++++++++++++----- 1 file changed, 564 insertions(+), 202 deletions(-) diff --git a/app/components/UI/Perps/services/HyperLiquidSubscriptionService.ts b/app/components/UI/Perps/services/HyperLiquidSubscriptionService.ts index cad148b8b8b..55dd669fc21 100644 --- a/app/components/UI/Perps/services/HyperLiquidSubscriptionService.ts +++ b/app/components/UI/Perps/services/HyperLiquidSubscriptionService.ts @@ -9,6 +9,8 @@ import { type L2BookResponse, type WsAssetCtxsEvent, type FrontendOpenOrdersResponse, + type WsClearinghouseStateEvent, + type WsOpenOrdersEvent, } from '@nktkas/hyperliquid'; import { DevLogger } from '../../../../core/SDKConnect/utils/DevLogger'; import Logger, { type LoggerErrorOptions } from '../../../../util/Logger'; @@ -127,6 +129,21 @@ export class HyperLiquidSubscriptionService { >(); // Per-DEX asset contexts private assetCtxsSubscriptionPromises = new Map>(); // Track in-progress subscriptions + // Fallback subscriptions for missing fields in webData3 + private readonly clearinghouseStateSubscriptions = new Map< + string, + Subscription + >(); // Key: dex name ('' for main) + private readonly openOrdersSubscriptions = new Map(); // Key: dex name ('' for main) + private readonly fallbackClearinghouseStateCache = new Map< + string, + WsClearinghouseStateEvent['clearinghouseState'] + >(); // Per-DEX fallback clearinghouse state + private readonly fallbackOpenOrdersCache = new Map< + string, + WsOpenOrdersEvent['orders'] + >(); // Per-DEX fallback orders + // Meta cache per DEX - populated by metaAndAssetCtxs, used by createAssetCtxsSubscription // This avoids redundant meta() API calls since metaAndAssetCtxs already returns meta data private readonly dexMetaCache = new Map< @@ -774,77 +791,96 @@ export class HyperLiquidSubscriptionService { // HIP-3 disabled: Use webData2 (main DEX only) subscriptionClient .webData2({ user: userAddress }, (data: WsWebData2Event) => { - // webData2 returns clearinghouseState for main DEX only - const currentDexName = ''; // Main DEX - - // Extract and process positions from clearinghouseState - const positions = data.clearinghouseState.assetPositions - .filter((assetPos) => assetPos.position.szi !== '0') - .map((assetPos) => adaptPositionFromSDK(assetPos)); + try { + // webData2 returns clearinghouseState for main DEX only + const currentDexName = ''; // Main DEX - // Extract TP/SL from orders - const { - tpslMap, - tpslCountMap, - processedOrders: orders, - } = this.extractTPSLFromOrders(data.openOrders || [], positions); + // Check for removed fields before accessing + if (!data.clearinghouseState) { + return; + } - // Merge TP/SL data into positions - const positionsWithTPSL = this.mergeTPSLIntoPositions( - positions, - tpslMap, - tpslCountMap, - ); + // Extract and process positions from clearinghouseState + const positions = data.clearinghouseState.assetPositions + .filter((assetPos) => assetPos.position.szi !== '0') + .map((assetPos) => adaptPositionFromSDK(assetPos)); - // Extract account data (webData2 provides clearinghouseState) - const accountState: AccountState = adaptAccountStateFromSDK( - data.clearinghouseState, - undefined, // webData2 doesn't include spotState - ); + // Extract TP/SL from orders + const { + tpslMap, + tpslCountMap, + processedOrders: orders, + } = this.extractTPSLFromOrders(data.openOrders || [], positions); - // Store in caches (main DEX only) - this.dexPositionsCache.set(currentDexName, positionsWithTPSL); - this.dexOrdersCache.set(currentDexName, orders); - this.dexAccountCache.set(currentDexName, accountState); - - // OI caps (main DEX only) - const oiCaps = data.perpsAtOpenInterestCap || []; - const oiCapsHash = [...oiCaps] - .sort((a: string, b: string) => a.localeCompare(b)) - .join(','); - if (oiCapsHash !== this.cachedOICapsHash) { - this.cachedOICaps = oiCaps; - this.cachedOICapsHash = oiCapsHash; - this.oiCapsCacheInitialized = true; - this.oiCapSubscribers.forEach((callback) => callback(oiCaps)); - } + // Merge TP/SL data into positions + const positionsWithTPSL = this.mergeTPSLIntoPositions( + positions, + tpslMap, + tpslCountMap, + ); - // Notify subscribers (no aggregation needed - only main DEX) - const positionsHash = this.hashPositions(positionsWithTPSL); - const ordersHash = this.hashOrders(orders); - const accountHash = this.hashAccountState(accountState); - - if (positionsHash !== this.cachedPositionsHash) { - this.cachedPositions = positionsWithTPSL; - this.cachedPositionsHash = positionsHash; - this.positionsCacheInitialized = true; - this.positionSubscribers.forEach((callback) => - callback(positionsWithTPSL), + // Extract account data (webData2 provides clearinghouseState) + const accountState: AccountState = adaptAccountStateFromSDK( + data.clearinghouseState, + undefined, // webData2 doesn't include spotState ); - } - if (ordersHash !== this.cachedOrdersHash) { - this.cachedOrders = orders; - this.cachedOrdersHash = ordersHash; - this.ordersCacheInitialized = true; - this.orderSubscribers.forEach((callback) => callback(orders)); - } + // Store in caches (main DEX only) + this.dexPositionsCache.set(currentDexName, positionsWithTPSL); + this.dexOrdersCache.set(currentDexName, orders); + this.dexAccountCache.set(currentDexName, accountState); - if (accountHash !== this.cachedAccountHash) { - this.cachedAccount = accountState; - this.cachedAccountHash = accountHash; - this.accountSubscribers.forEach((callback) => - callback(accountState), + // OI caps (main DEX only) + const oiCaps = data.perpsAtOpenInterestCap || []; + const oiCapsHash = [...oiCaps] + .sort((a: string, b: string) => a.localeCompare(b)) + .join(','); + if (oiCapsHash !== this.cachedOICapsHash) { + this.cachedOICaps = oiCaps; + this.cachedOICapsHash = oiCapsHash; + this.oiCapsCacheInitialized = true; + this.oiCapSubscribers.forEach((callback) => callback(oiCaps)); + } + + // Notify subscribers (no aggregation needed - only main DEX) + const positionsHash = this.hashPositions(positionsWithTPSL); + const ordersHash = this.hashOrders(orders); + const accountHash = this.hashAccountState(accountState); + + if (positionsHash !== this.cachedPositionsHash) { + this.cachedPositions = positionsWithTPSL; + this.cachedPositionsHash = positionsHash; + this.positionsCacheInitialized = true; + this.positionSubscribers.forEach((callback) => + callback(positionsWithTPSL), + ); + } + + if (ordersHash !== this.cachedOrdersHash) { + this.cachedOrders = orders; + this.cachedOrdersHash = ordersHash; + this.ordersCacheInitialized = true; + this.orderSubscribers.forEach((callback) => callback(orders)); + } + + if (accountHash !== this.cachedAccountHash) { + this.cachedAccount = accountState; + this.cachedAccountHash = accountHash; + this.accountSubscribers.forEach((callback) => + callback(accountState), + ); + } + } catch (error) { + Logger.error( + ensureError(error), + this.getErrorContext('webData2 callback error', { + user: userAddress, + dataKeys: data ? Object.keys(data) : 'data is null/undefined', + hasClearinghouseState: data?.clearinghouseState !== undefined, + hasOpenOrders: data?.openOrders !== undefined, + hasPerpsAtOpenInterestCap: + data?.perpsAtOpenInterestCap !== undefined, + }), ); } }) @@ -868,152 +904,160 @@ export class HyperLiquidSubscriptionService { // HIP-3 enabled: Use webData3 (main + HIP-3 DEXs) subscriptionClient .webData3({ user: userAddress }, (data: WsWebData3Event) => { - // Process data from each DEX in perpDexStates array - // webData3 returns data for ALL protocol DEXs, but we only process the ones we care about - data.perpDexStates.forEach((dexState, index) => { - // Map webData3 index to DEX name - // Index 0 = main DEX (null), Index 1+ = HIP-3 DEXs from discoveredDexNames - const dexIdentifier = - index === 0 ? null : this.discoveredDexNames[index - 1]; - - // Skip unknown DEXs (not in discoveredDexNames) to prevent main DEX cache corruption - if (index > 0 && dexIdentifier === undefined) { - return; // Unknown DEX - skip to prevent misidentifying as main DEX - } - - // Only process DEXs we care about (skip others silently) - // webData3 API returns all protocol DEXs regardless of our config - if (!this.isDexEnabled(dexIdentifier ?? null)) { - return; // Skip this DEX - not enabled in our configuration - } - - const currentDexName = dexIdentifier ?? ''; // null -> '' for Map keys - - // Extract and process positions for this DEX - const positions = dexState.clearinghouseState.assetPositions - .filter((assetPos) => assetPos.position.szi !== '0') - .map((assetPos) => adaptPositionFromSDK(assetPos)); - - // Extract TP/SL from orders and process orders using shared helper - const { - tpslMap, - tpslCountMap, - processedOrders: orders, - } = this.extractTPSLFromOrders( - dexState.openOrders || [], - positions, - ); - - // Merge TP/SL data into positions using shared helper - const positionsWithTPSL = this.mergeTPSLIntoPositions( - positions, - tpslMap, - tpslCountMap, - ); - - // Extract account data for this DEX - // Note: spotState is not included in webData3 - const accountState: AccountState = adaptAccountStateFromSDK( - dexState.clearinghouseState, - undefined, // webData3 doesn't include spotState - ); - - // Store per-DEX data in caches - this.dexPositionsCache.set(currentDexName, positionsWithTPSL); - this.dexOrdersCache.set(currentDexName, orders); - this.dexAccountCache.set(currentDexName, accountState); - }); - - // Extract OI caps from all DEXs (main + HIP-3) - const allOICaps: string[] = []; - data.perpDexStates.forEach((dexState, index) => { - // Map webData3 index to DEX name - // Index 0 = main DEX (null), Index 1+ = HIP-3 DEXs from discoveredDexNames - const dexIdentifier = - index === 0 ? null : this.discoveredDexNames[index - 1]; - - // Skip unknown DEXs (not in discoveredDexNames) to prevent main DEX cache corruption - if (index > 0 && dexIdentifier === undefined) { - return; // Unknown DEX - skip to prevent misidentifying as main DEX - } - - // Only process DEXs we care about (skip others silently) - if (!this.isDexEnabled(dexIdentifier ?? null)) { - return; // Skip this DEX - not enabled in our configuration - } - - const currentDexName = dexIdentifier ?? ''; - const oiCaps = dexState.perpsAtOpenInterestCap || []; + try { + // Process data from each DEX in perpDexStates array + // webData3 returns data for ALL protocol DEXs, but we only process the ones we care about + data.perpDexStates.forEach((dexState, index) => { + // Map webData3 index to DEX name + // Index 0 = main DEX (null), Index 1+ = HIP-3 DEXs from discoveredDexNames + const dexIdentifier = + index === 0 ? null : this.discoveredDexNames[index - 1]; + + // Skip unknown DEXs (not in discoveredDexNames) to prevent main DEX cache corruption + if (index > 0 && dexIdentifier === undefined) { + return; // Unknown DEX - skip to prevent misidentifying as main DEX + } + + // Only process DEXs we care about (skip others silently) + // webData3 API returns all protocol DEXs regardless of our config + if (!this.isDexEnabled(dexIdentifier ?? null)) { + return; // Skip this DEX - not enabled in our configuration + } + + const currentDexName = dexIdentifier ?? ''; // null -> '' for Map keys + + // HOTFIX: Handle missing fields by using fallback subscriptions + // Check if clearinghouseState is missing and ensure fallback subscription + if (!dexState.clearinghouseState) { + // Ensure fallback subscription exists + this.setupFallbackClearinghouseStateSubscription( + userAddress, + currentDexName, + ); + // Try to use cached fallback data + const fallbackState = + this.fallbackClearinghouseStateCache.get(currentDexName); + if (!fallbackState) { + // No fallback data yet, skip this update + return; + } + // Use fallback data + dexState.clearinghouseState = fallbackState; + } + + // Check if openOrders is missing and ensure fallback subscription + if (!('openOrders' in dexState) || !dexState.openOrders) { + // Ensure fallback subscription exists + this.setupFallbackOpenOrdersSubscription( + userAddress, + currentDexName, + ); + // Use fallback data if available + const fallbackOrders = + this.fallbackOpenOrdersCache.get(currentDexName); + if (fallbackOrders) { + dexState.openOrders = fallbackOrders; + } + } + + // Extract and process positions for this DEX + const positions = dexState.clearinghouseState.assetPositions + .filter((assetPos) => assetPos.position.szi !== '0') + .map((assetPos) => adaptPositionFromSDK(assetPos)); + + // Extract TP/SL from orders and process orders using shared helper + const { + tpslMap, + tpslCountMap, + processedOrders: orders, + } = this.extractTPSLFromOrders( + dexState.openOrders || [], + positions, + ); - // Add DEX prefix for HIP-3 symbols (e.g., "xyz:TSLA") - if (currentDexName) { - allOICaps.push( - ...oiCaps.map((symbol) => `${currentDexName}:${symbol}`), + // Merge TP/SL data into positions using shared helper + const positionsWithTPSL = this.mergeTPSLIntoPositions( + positions, + tpslMap, + tpslCountMap, ); - } else { - // Main DEX - no prefix needed - allOICaps.push(...oiCaps); - } - }); - // Update OI caps cache and notify if changed - const oiCapsHash = [...allOICaps] - .sort((a: string, b: string) => a.localeCompare(b)) - .join(','); - if (oiCapsHash !== this.cachedOICapsHash) { - this.cachedOICaps = allOICaps; - this.cachedOICapsHash = oiCapsHash; - this.oiCapsCacheInitialized = true; - - // Notify all subscribers - this.oiCapSubscribers.forEach((callback) => callback(allOICaps)); - } + // Extract account data for this DEX + // Note: spotState is not included in webData3 + const accountState: AccountState = adaptAccountStateFromSDK( + dexState.clearinghouseState, + undefined, // webData3 doesn't include spotState + ); - // Aggregate data from all DEX caches - const aggregatedPositions = Array.from( - this.dexPositionsCache.values(), - ).flat(); - - const aggregatedOrders = Array.from( - this.dexOrdersCache.values(), - ).flat(); - - const aggregatedAccount = this.aggregateAccountStates(); - - // Check if aggregated data changed using fast hash comparison - const positionsHash = this.hashPositions(aggregatedPositions); - const ordersHash = this.hashOrders(aggregatedOrders); - const accountHash = this.hashAccountState(aggregatedAccount); - - const positionsChanged = positionsHash !== this.cachedPositionsHash; - const ordersChanged = ordersHash !== this.cachedOrdersHash; - const accountChanged = accountHash !== this.cachedAccountHash; - - // Only notify subscribers if aggregated data changed - if (positionsChanged) { - this.cachedPositions = aggregatedPositions; - this.cachedPositionsHash = positionsHash; - this.positionsCacheInitialized = true; // Mark cache as initialized - this.positionSubscribers.forEach((callback) => { - callback(aggregatedPositions); + // Store per-DEX data in caches + this.dexPositionsCache.set(currentDexName, positionsWithTPSL); + this.dexOrdersCache.set(currentDexName, orders); + this.dexAccountCache.set(currentDexName, accountState); }); - } - if (ordersChanged) { - this.cachedOrders = aggregatedOrders; - this.cachedOrdersHash = ordersHash; - this.ordersCacheInitialized = true; // Mark cache as initialized - this.orderSubscribers.forEach((callback) => { - callback(aggregatedOrders); + // Extract OI caps from all DEXs (main + HIP-3) + const allOICaps: string[] = []; + data.perpDexStates.forEach((dexState, index) => { + // Map webData3 index to DEX name + // Index 0 = main DEX (null), Index 1+ = HIP-3 DEXs from discoveredDexNames + const dexIdentifier = + index === 0 ? null : this.discoveredDexNames[index - 1]; + + // Skip unknown DEXs (not in discoveredDexNames) to prevent main DEX cache corruption + if (index > 0 && dexIdentifier === undefined) { + return; // Unknown DEX - skip to prevent misidentifying as main DEX + } + + // Only process DEXs we care about (skip others silently) + if (!this.isDexEnabled(dexIdentifier ?? null)) { + return; // Skip this DEX - not enabled in our configuration + } + + const currentDexName = dexIdentifier ?? ''; + + const oiCaps = dexState.perpsAtOpenInterestCap || []; + + // Add DEX prefix for HIP-3 symbols (e.g., "xyz:TSLA") + if (currentDexName) { + allOICaps.push( + ...oiCaps.map((symbol) => `${currentDexName}:${symbol}`), + ); + } else { + // Main DEX - no prefix needed + allOICaps.push(...oiCaps); + } }); - } - if (accountChanged) { - this.cachedAccount = aggregatedAccount; - this.cachedAccountHash = accountHash; - this.accountSubscribers.forEach((callback) => { - callback(aggregatedAccount); - }); + // Update OI caps cache and notify if changed + const oiCapsHash = [...allOICaps] + .sort((a: string, b: string) => a.localeCompare(b)) + .join(','); + if (oiCapsHash !== this.cachedOICapsHash) { + this.cachedOICaps = allOICaps; + this.cachedOICapsHash = oiCapsHash; + this.oiCapsCacheInitialized = true; + + // Notify all subscribers + this.oiCapSubscribers.forEach((callback) => + callback(allOICaps), + ); + } + + // Aggregate and notify subscribers + this.aggregateAndNotifySubscribers(); + } catch (error) { + Logger.error( + ensureError(error), + this.getErrorContext('webData3 callback error', { + user: userAddress, + hasPerpDexStates: data?.perpDexStates !== undefined, + perpDexStatesLength: data?.perpDexStates?.length ?? 0, + dataKeys: data ? Object.keys(data) : 'data is null/undefined', + firstDexStateKeys: data?.perpDexStates?.[0] + ? Object.keys(data.perpDexStates[0]) + : [], + }), + ); } }) .then((sub) => { @@ -1036,6 +1080,256 @@ export class HyperLiquidSubscriptionService { }); // Close Promise wrapper } + /** + * Handle error from fallback subscription setup + */ + private handleFallbackSubscriptionError( + error: unknown, + method: string, + dexName: string, + ): void { + Logger.error( + ensureError(error), + this.getErrorContext(method, { + dex: dexName, + }), + ); + } + + /** + * Setup fallback clearinghouseState subscription with error handling + */ + private setupFallbackClearinghouseStateSubscription( + userAddress: string, + dexName: string, + ): void { + this.ensureFallbackClearinghouseStateSubscription( + userAddress, + dexName, + ).catch((error) => + this.handleFallbackSubscriptionError( + error, + 'ensureFallbackClearinghouseState', + dexName, + ), + ); + } + + /** + * Setup fallback openOrders subscription with error handling + */ + private setupFallbackOpenOrdersSubscription( + userAddress: string, + dexName: string, + ): void { + this.ensureFallbackOpenOrdersSubscription(userAddress, dexName).catch( + (error) => + this.handleFallbackSubscriptionError( + error, + 'ensureFallbackOpenOrders', + dexName, + ), + ); + } + + /** + * HOTFIX: Ensure fallback clearinghouseState subscription exists for a DEX + * Used when clearinghouseState is missing from webData3.perpDexStates + */ + private async ensureFallbackClearinghouseStateSubscription( + userAddress: string, + dexName: string, + ): Promise { + if (this.clearinghouseStateSubscriptions.has(dexName)) { + return; // Already subscribed + } + + const subscriptionClient = this.clientService.getSubscriptionClient(); + if (!subscriptionClient) { + throw new Error('Subscription client not available'); + } + + try { + const subscription = await subscriptionClient.clearinghouseState( + { + user: userAddress, + dex: dexName || undefined, // Empty string -> undefined for main DEX + }, + (data: WsClearinghouseStateEvent) => { + // Cache the fallback clearinghouse state + const cacheKey = data.dex || ''; + this.fallbackClearinghouseStateCache.set( + cacheKey, + data.clearinghouseState, + ); + // Update caches and notify subscribers if we have positions/account subscribers + if ( + this.positionSubscriberCount > 0 || + this.accountSubscriberCount > 0 + ) { + // Process positions from fallback clearinghouse state + const positions = data.clearinghouseState.assetPositions + .filter((assetPos) => assetPos.position.szi !== '0') + .map((assetPos) => adaptPositionFromSDK(assetPos)); + + // For fallback clearinghouseState, we don't have orders yet + // Process positions without TP/SL (will be added when orders arrive) + const positionsWithTPSL = positions; + + // Update account state + const accountState: AccountState = adaptAccountStateFromSDK( + data.clearinghouseState, + undefined, + ); + + // Update caches + this.dexPositionsCache.set(cacheKey, positionsWithTPSL); + this.dexAccountCache.set(cacheKey, accountState); + + // Trigger aggregation and notify subscribers + this.aggregateAndNotifySubscribers(); + } + }, + ); + + this.clearinghouseStateSubscriptions.set(dexName, subscription); + DevLogger.log( + `Fallback clearinghouseState subscription established for DEX: ${dexName || 'main'}`, + ); + } catch (error) { + Logger.error( + ensureError(error), + this.getErrorContext('ensureFallbackClearinghouseState', { + dex: dexName, + }), + ); + throw error; + } + } + + /** + * HOTFIX: Ensure fallback openOrders subscription exists for a DEX + * Used when openOrders is missing from webData3.perpDexStates + */ + private async ensureFallbackOpenOrdersSubscription( + userAddress: string, + dexName: string, + ): Promise { + if (this.openOrdersSubscriptions.has(dexName)) { + return; // Already subscribed + } + + const subscriptionClient = this.clientService.getSubscriptionClient(); + if (!subscriptionClient) { + throw new Error('Subscription client not available'); + } + + try { + const subscription = await subscriptionClient.openOrders( + { + user: userAddress, + dex: dexName || undefined, // Empty string -> undefined for main DEX + }, + (data: WsOpenOrdersEvent) => { + // Cache the fallback orders + const cacheKey = data.dex || ''; + this.fallbackOpenOrdersCache.set(cacheKey, data.orders); + // Update caches and notify subscribers if we have order subscribers + if (this.orderSubscriberCount > 0) { + // Get cached positions for TP/SL processing + const cachedPositions = this.dexPositionsCache.get(cacheKey) || []; + // Extract TP/SL and process orders (data.orders is FrontendOpenOrdersResponse - correct type) + const { + tpslMap, + tpslCountMap, + processedOrders: orders, + } = this.extractTPSLFromOrders(data.orders, cachedPositions); + + // Update orders cache with processed orders + this.dexOrdersCache.set(cacheKey, orders); + + // Update positions with TP/SL if we have positions + if (cachedPositions.length > 0) { + const positionsWithTPSL = this.mergeTPSLIntoPositions( + cachedPositions, + tpslMap, + tpslCountMap, + ); + this.dexPositionsCache.set(cacheKey, positionsWithTPSL); + } + + // Trigger aggregation and notify subscribers + this.aggregateAndNotifySubscribers(); + } + }, + ); + + this.openOrdersSubscriptions.set(dexName, subscription); + DevLogger.log( + `Fallback openOrders subscription established for DEX: ${dexName || 'main'}`, + ); + } catch (error) { + Logger.error( + ensureError(error), + this.getErrorContext('ensureFallbackOpenOrders', { + dex: dexName, + }), + ); + throw error; + } + } + + /** + * Aggregate data from all DEX caches and notify subscribers if data changed + * Used by both webData3 callback and fallback subscription callbacks + */ + private aggregateAndNotifySubscribers(): void { + // Aggregate data from all DEX caches + const aggregatedPositions = Array.from( + this.dexPositionsCache.values(), + ).flat(); + + const aggregatedOrders = Array.from(this.dexOrdersCache.values()).flat(); + + const aggregatedAccount = this.aggregateAccountStates(); + + // Check if aggregated data changed using fast hash comparison + const positionsHash = this.hashPositions(aggregatedPositions); + const ordersHash = this.hashOrders(aggregatedOrders); + const accountHash = this.hashAccountState(aggregatedAccount); + + const positionsChanged = positionsHash !== this.cachedPositionsHash; + const ordersChanged = ordersHash !== this.cachedOrdersHash; + const accountChanged = accountHash !== this.cachedAccountHash; + + // Only notify subscribers if aggregated data changed + if (positionsChanged) { + this.cachedPositions = aggregatedPositions; + this.cachedPositionsHash = positionsHash; + this.positionsCacheInitialized = true; // Mark cache as initialized + this.positionSubscribers.forEach((callback) => { + callback(aggregatedPositions); + }); + } + + if (ordersChanged) { + this.cachedOrders = aggregatedOrders; + this.cachedOrdersHash = ordersHash; + this.ordersCacheInitialized = true; // Mark cache as initialized + this.orderSubscribers.forEach((callback) => { + callback(aggregatedOrders); + }); + } + + if (accountChanged) { + this.cachedAccount = aggregatedAccount; + this.cachedAccountHash = accountHash; + this.accountSubscribers.forEach((callback) => { + callback(aggregatedAccount); + }); + } + } + /** * Clean up webData3 subscription when no longer needed */ @@ -1066,7 +1360,44 @@ export class HyperLiquidSubscriptionService { this.webData3SubscriptionPromise = undefined; } - // Note: No separate clearinghouseState cleanup needed (webData3 handles all DEXs) + // Cleanup fallback subscriptions (HOTFIX for missing fields) + if (this.clearinghouseStateSubscriptions.size > 0) { + this.clearinghouseStateSubscriptions.forEach( + (subscription, dexName) => { + subscription.unsubscribe().catch((error: Error) => { + Logger.error( + ensureError(error), + this.getErrorContext( + 'cleanupSharedWebData3Subscription.clearinghouseState', + { + dex: dexName, + }, + ), + ); + }); + }, + ); + this.clearinghouseStateSubscriptions.clear(); + this.fallbackClearinghouseStateCache.clear(); + } + + if (this.openOrdersSubscriptions.size > 0) { + this.openOrdersSubscriptions.forEach((subscription, dexName) => { + subscription.unsubscribe().catch((error: Error) => { + Logger.error( + ensureError(error), + this.getErrorContext( + 'cleanupSharedWebData3Subscription.openOrders', + { + dex: dexName, + }, + ), + ); + }); + }); + this.openOrdersSubscriptions.clear(); + this.fallbackOpenOrdersCache.clear(); + } // Clear subscriber counts this.positionSubscriberCount = 0; @@ -2154,6 +2485,37 @@ export class HyperLiquidSubscriptionService { this.assetCtxsSubscriptions.clear(); this.assetCtxsSubscriptionPromises.clear(); + // Cleanup fallback subscriptions (HOTFIX for missing fields) + if (this.clearinghouseStateSubscriptions.size > 0) { + this.clearinghouseStateSubscriptions.forEach((subscription, dexName) => { + subscription.unsubscribe().catch((error: Error) => { + Logger.error( + ensureError(error), + this.getErrorContext('clearAll.clearinghouseState', { + dex: dexName, + }), + ); + }); + }); + this.clearinghouseStateSubscriptions.clear(); + this.fallbackClearinghouseStateCache.clear(); + } + + if (this.openOrdersSubscriptions.size > 0) { + this.openOrdersSubscriptions.forEach((subscription, dexName) => { + subscription.unsubscribe().catch((error: Error) => { + Logger.error( + ensureError(error), + this.getErrorContext('clearAll.openOrders', { + dex: dexName, + }), + ); + }); + }); + this.openOrdersSubscriptions.clear(); + this.fallbackOpenOrdersCache.clear(); + } + DevLogger.log( 'HyperLiquid: Subscription service cleared (multi-DEX with webData3)', {