11using System ;
22using System . Collections . Generic ;
33using System . Linq ;
4+ using System . Threading ;
45using System . Threading . Tasks ;
56using Binance . Net . Interfaces ;
67using Binance . Net . Interfaces . Clients ;
@@ -31,22 +32,24 @@ public class MarketDepthManager
3132 private readonly Queue < IBinanceEventOrderBook > _eventBuffer = new ( ) ;
3233 private long _localOrderBookUpdateId = 0 ;
3334 private bool _isSnapshotLoaded = false ;
35+
36+ private readonly TimeSpan _defaultUpdateInterval = TimeSpan . FromMilliseconds ( 100 ) ;
3437
3538 private UpdateSubscription _subscription ;
3639
3740
3841 /// <summary>
3942 /// Create instance of <see cref="MarketDepthManager"/>
4043 /// </summary>
41- /// <param name="binanceRestClient ">Binance REST client</param>
44+ /// <param name="restClient ">Binance REST client</param>
4245 /// <param name="webSocketClient">Binance WebSocket client</param>
4346 /// <param name="logger">Logger instance</param>
44- /// <exception cref="ArgumentNullException"><paramref name="binanceRestClient "/> cannot be <see langword="null"/></exception>
47+ /// <exception cref="ArgumentNullException"><paramref name="restClient "/> cannot be <see langword="null"/></exception>
4548 /// <exception cref="ArgumentNullException"><paramref name="webSocketClient"/> cannot be <see langword="null"/></exception>
4649 /// <exception cref="ArgumentNullException"><paramref name="logger"/> cannot be <see langword="null"/></exception>
47- public MarketDepthManager ( IBinanceClient binanceRestClient , IBinanceSocketClient webSocketClient , Logger logger )
50+ public MarketDepthManager ( IBinanceClient restClient , IBinanceSocketClient webSocketClient , Logger logger )
4851 {
49- _restClient = binanceRestClient ?? throw new ArgumentNullException ( nameof ( binanceRestClient ) ) ;
52+ _restClient = restClient ?? throw new ArgumentNullException ( nameof ( restClient ) ) ;
5053 _webSocketClient = webSocketClient ?? throw new ArgumentNullException ( nameof ( webSocketClient ) ) ;
5154 _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
5255 }
@@ -56,33 +59,45 @@ public MarketDepthManager(IBinanceClient binanceRestClient, IBinanceSocketClient
5659 /// Build <see cref="MarketDepth"/> following Binance official guidelines
5760 /// </summary>
5861 /// <param name="marketDepth">Market depth</param>
59- /// <param name="limit">Limit of returned orders count</param>
60- /// <param name="updateLimit">Update speed limit (100ms, 1000ms)</param>
61- public async Task BuildAsync ( MarketDepth marketDepth , short limit = 10 , int updateLimit = 1000 )
62+ /// <param name="orderBookDepth">Limit of returned orders count (default 10)</param>
63+ /// <param name="updateInterval">Update speed limit (100ms, 1000ms)</param>
64+ /// <param name="ct">Cancellation token</param>
65+ /// <exception cref="ArgumentNullException"><paramref name="marketDepth"/> cannot be <see langword="null"/></exception>
66+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="updateInterval"/> must be greater than zero</exception>
67+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="orderBookDepth"/> must be greater than zero</exception>
68+ /// <exception cref="OperationCanceledException">The operation was canceled.</exception>
69+ /// <exception cref="InvalidOperationException">Failed to subscribe to order book updates or get order book snapshot</exception>
70+ public async Task BuildAsync ( MarketDepth marketDepth , TimeSpan ? updateInterval = default , short orderBookDepth = 10 , CancellationToken ct = default )
6271 {
6372 if ( marketDepth == null )
6473 throw new ArgumentNullException ( nameof ( marketDepth ) ) ;
65- if ( limit <= 0 )
66- throw new ArgumentOutOfRangeException ( nameof ( limit ) ) ;
74+ if ( updateInterval . HasValue && updateInterval <= TimeSpan . Zero )
75+ throw new ArgumentOutOfRangeException ( nameof ( updateInterval ) ) ;
76+ if ( orderBookDepth <= 0 )
77+ throw new ArgumentOutOfRangeException ( nameof ( orderBookDepth ) ) ;
6778
6879 // Step 1: Open WebSocket stream and start buffering
69- _logger . Debug ( $ "Step 1: Opening WebSocket stream for { marketDepth . Symbol } ") ;
80+ _logger . Debug ( $ "1: Opening WebSocket stream for { marketDepth . Symbol } ") ;
81+
82+ var updateIntervalMs = updateInterval . HasValue ? ( int ) updateInterval . Value . TotalMilliseconds : ( int ) _defaultUpdateInterval . TotalMilliseconds ;
7083 var subscriptionResult = await _webSocketClient . SpotStreams . SubscribeToOrderBookUpdatesAsync (
71- marketDepth . Symbol , updateLimit ,
72- data => OnDepthUpdate ( marketDepth , data ) ) . ConfigureAwait ( false ) ;
84+ marketDepth . Symbol , updateIntervalMs ,
85+ data => OnDepthUpdate ( marketDepth , data ) ,
86+ ct )
87+ . ConfigureAwait ( false ) ;
7388
7489 if ( ! subscriptionResult . Success || subscriptionResult . Data == null )
7590 throw new InvalidOperationException ( $ "Failed to subscribe to order book updates: { subscriptionResult . Error ? . Message } ") ;
7691
7792 _subscription = subscriptionResult . Data ;
7893
7994 // Step 2: Wait a bit to buffer some events
80- _logger . Debug ( $ "Step 2: Buffering events for 200ms ") ;
81- await Task . Delay ( 200 ) . ConfigureAwait ( false ) ;
95+ _logger . Debug ( $ "2: Buffering events for { updateIntervalMs * 2 } ms ") ;
96+ await Task . Delay ( updateIntervalMs * 2 , ct ) . ConfigureAwait ( false ) ;
8297
83- _logger . Debug ( $ "Step 3: Getting order book snapshot for { marketDepth . Symbol } ") ;
98+ _logger . Debug ( $ "3: Getting order book snapshot for { marketDepth . Symbol } ") ;
8499 // Step 3: Get depth snapshot
85- WebCallResult < BinanceOrderBook > response = await _restClient . SpotApi . ExchangeData . GetOrderBookAsync ( marketDepth . Symbol , limit ) ;
100+ WebCallResult < BinanceOrderBook > response = await _restClient . SpotApi . ExchangeData . GetOrderBookAsync ( marketDepth . Symbol , orderBookDepth , ct ) . ConfigureAwait ( false ) ;
86101 if ( ! response . Success || response . Data == null )
87102 throw new InvalidOperationException ( $ "Failed to get order book snapshot: { response . Error ? . Message } ") ;
88103
@@ -100,14 +115,14 @@ public async Task BuildAsync(MarketDepth marketDepth, short limit = 10, int upda
100115
101116 if ( firstEvent != null )
102117 {
103- _logger . Debug ( $ "Step 4: Validating snapshot. FirstEvent.U={ firstEvent . FirstUpdateId } , Snapshot.LastUpdateId={ snapshot . LastUpdateId } ") ;
118+ _logger . Debug ( $ "4: Validating snapshot. FirstEvent.U={ firstEvent . FirstUpdateId } , Snapshot.LastUpdateId={ snapshot . LastUpdateId } ") ;
104119 }
105120
106121 while ( firstEvent != null && snapshot . LastUpdateId < firstEvent . FirstUpdateId )
107122 {
108123 _logger . Warn ( $ "Snapshot too old: LastUpdateId={ snapshot . LastUpdateId } < FirstEvent.U={ firstEvent . FirstUpdateId } . Retrying...") ;
109124 // Snapshot is too old, need to get a new one
110- response = await _restClient . SpotApi . ExchangeData . GetOrderBookAsync ( marketDepth . Symbol , limit ) ;
125+ response = await _restClient . SpotApi . ExchangeData . GetOrderBookAsync ( marketDepth . Symbol , orderBookDepth , ct ) . ConfigureAwait ( false ) ;
111126 if ( ! response . Success || response . Data == null )
112127 throw new InvalidOperationException ( $ "Failed to get order book snapshot: { response . Error ? . Message } ") ;
113128 snapshot = response . Data ;
@@ -131,10 +146,10 @@ public async Task BuildAsync(MarketDepth marketDepth, short limit = 10, int upda
131146 _eventBuffer . Dequeue ( ) ;
132147 discardedCount ++ ;
133148 }
134- _logger . Debug ( $ "Step 5: Discarded { discardedCount } outdated events (u <= { snapshot . LastUpdateId } )") ;
149+ _logger . Debug ( $ "5: Discarded { discardedCount } outdated events (u <= { snapshot . LastUpdateId } )") ;
135150
136151 // Step 6: Set local order book to snapshot
137- _logger . Debug ( $ "Step 6: Applying snapshot with { snapshot . Asks . Count ( ) } asks and { snapshot . Bids . Count ( ) } bids") ;
152+ _logger . Debug ( $ "6: Applying snapshot with { snapshot . Asks . Count ( ) } asks and { snapshot . Bids . Count ( ) } bids") ;
138153 marketDepth . UpdateDepth ( snapshot . Asks , snapshot . Bids , snapshot . LastUpdateId ) ;
139154 _localOrderBookUpdateId = snapshot . LastUpdateId ;
140155 _isSnapshotLoaded = true ;
@@ -151,7 +166,7 @@ public async Task BuildAsync(MarketDepth marketDepth, short limit = 10, int upda
151166 }
152167 _eventBuffer . Dequeue ( ) ;
153168 }
154- _logger . Debug ( $ "Step 7: Applied { appliedCount } buffered events") ;
169+ _logger . Debug ( $ "7: Applied { appliedCount } buffered events") ;
155170 }
156171 }
157172
@@ -161,22 +176,24 @@ public async Task BuildAsync(MarketDepth marketDepth, short limit = 10, int upda
161176 /// </summary>
162177 /// <param name="marketDepth">Market depth</param>
163178 /// <param name="updateInterval">Update interval (100ms or 1000ms)</param>
164- public void StreamUpdates ( MarketDepth marketDepth , TimeSpan ? updateInterval = default )
179+ public void StreamUpdates ( MarketDepth marketDepth , TimeSpan ? updateInterval = default , CancellationToken ct = default )
165180 {
166181 if ( marketDepth == null )
167182 throw new ArgumentNullException ( nameof ( marketDepth ) ) ;
168183
169184 // Step 1 & 2: Open WebSocket and buffer events
170185 _subscription = _webSocketClient . SpotStreams . SubscribeToOrderBookUpdatesAsync (
171186 marketDepth . Symbol ,
172- updateInterval . HasValue ? ( int ) updateInterval . Value . TotalMilliseconds : 1000 ,
173- data => OnDepthUpdate ( marketDepth , data ) ) . Result . Data ;
187+ updateInterval . HasValue ? ( int ) updateInterval . Value . TotalMilliseconds : ( int ) _defaultUpdateInterval . TotalMilliseconds ,
188+ data => OnDepthUpdate ( marketDepth , data ) ,
189+ ct )
190+ . Result . Data ;
174191 }
175192
176193 /// <summary>
177194 /// Stop streaming updates and unsubscribe
178195 /// </summary>
179- public async Task StopStreamingAsync ( )
196+ public async Task StopStreamingAsync ( CancellationToken ct = default )
180197 {
181198 if ( _subscription != null )
182199 {
0 commit comments