Skip to content

Commit eac4f77

Browse files
committed
Improve naming and add documentations
1 parent 083f55c commit eac4f77

1 file changed

Lines changed: 95 additions & 72 deletions

File tree

src/BinanceBot.Market/MarketDepthManager.cs

Lines changed: 95 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public class MarketDepthManager
3131
private readonly Logger _logger;
3232

3333
private readonly Queue<IBinanceEventOrderBook> _eventBuffer = new();
34-
private long _localOrderBookUpdateId = 0;
35-
private bool _isSnapshotLoaded = false;
34+
private long _lastProcessedUpdateId = 0;
35+
private bool _snapshotApplied = false;
3636

3737
private readonly TimeSpan _defaultUpdateInterval = TimeSpan.FromMilliseconds(100);
3838

@@ -56,6 +56,15 @@ public MarketDepthManager(IBinanceClient restClient, IBinanceSocketClient webSoc
5656
}
5757

5858

59+
/// <summary>
60+
/// Retrieves an order book snapshot from the Binance REST API.
61+
/// </summary>
62+
/// <param name="symbol">The market symbol to get the order book for</param>
63+
/// <param name="orderBookDepth">Maximum number of price levels to retrieve</param>
64+
/// <param name="ct">Cancellation token</param>
65+
/// <returns>Order book snapshot containing asks, bids, and last update ID</returns>
66+
/// <exception cref="ArgumentOutOfRangeException">Thrown when contract type is unknown</exception>
67+
/// <exception cref="InvalidOperationException">Thrown when the API request fails</exception>
5968
private async Task<IBinanceOrderBook> GetOrderBookSnapshotAsync(MarketSymbol symbol, short orderBookDepth, CancellationToken ct)
6069
{
6170
(bool Success, IBinanceOrderBook Data, Error Error) response;
@@ -86,6 +95,56 @@ private async Task<IBinanceOrderBook> GetOrderBookSnapshotAsync(MarketSymbol sym
8695
return response.Data;
8796
}
8897

98+
/// <summary>
99+
/// Subscribes to order book updates via WebSocket for the specified market.
100+
/// </summary>
101+
/// <param name="symbol">The market symbol to subscribe to</param>
102+
/// <param name="updateIntervalMs">Update interval in milliseconds</param>
103+
/// <param name="onUpdate">Callback for order book updates</param>
104+
/// <param name="ct">Cancellation token</param>
105+
/// <returns>Update subscription</returns>
106+
/// <exception cref="ArgumentOutOfRangeException">Thrown when contract type is unknown</exception>
107+
/// <exception cref="InvalidOperationException">Thrown when subscription fails</exception>
108+
private async Task<UpdateSubscription> SubscribeToOrderBookAsync(
109+
MarketSymbol symbol,
110+
int updateIntervalMs,
111+
Action<IBinanceEventOrderBook> onUpdate,
112+
CancellationToken ct)
113+
{
114+
CallResult<UpdateSubscription> result;
115+
switch (symbol.ContractType)
116+
{
117+
case ContractType.Spot:
118+
result = await _webSocketClient.SpotStreams.SubscribeToOrderBookUpdatesAsync(
119+
symbol.FullName,
120+
updateIntervalMs,
121+
data => onUpdate(data.Data),
122+
ct)
123+
.ConfigureAwait(false);
124+
break;
125+
case ContractType.Futures:
126+
result = await _webSocketClient.UsdFuturesStreams.SubscribeToOrderBookUpdatesAsync(
127+
symbol.FullName,
128+
updateIntervalMs,
129+
data => onUpdate(data.Data),
130+
ct)
131+
.ConfigureAwait(false);
132+
break;
133+
default:
134+
throw new ArgumentOutOfRangeException(nameof(symbol.ContractType), "Unknown contract type.");
135+
}
136+
137+
if (!result.Success || result.Data == null)
138+
throw new InvalidOperationException($"Failed to subscribe to order book updates: {result.Error?.Message}");
139+
140+
return result.Data;
141+
}
142+
143+
/// <summary>
144+
/// Calculates the update interval in milliseconds, using the default if not specified.
145+
/// </summary>
146+
/// <param name="updateInterval">Optional update interval (100ms or 1000ms recommended)</param>
147+
/// <returns>Update interval in milliseconds</returns>
89148
private int GetUpdateIntervalMs(TimeSpan? updateInterval) =>
90149
updateInterval.HasValue ? (int)updateInterval.Value.TotalMilliseconds : (int)_defaultUpdateInterval.TotalMilliseconds;
91150

@@ -114,33 +173,11 @@ public async Task BuildAsync(MarketDepth marketDepth, short orderBookDepth = 10,
114173
_logger.Debug($"1: Opening WebSocket stream for {marketDepth.Symbol}");
115174

116175
var updateIntervalMs = GetUpdateIntervalMs(updateInterval);
117-
118-
CallResult<UpdateSubscription> subscriptionResult;
119-
120-
switch (marketDepth.Symbol.ContractType)
121-
{
122-
case ContractType.Spot:
123-
subscriptionResult = await _webSocketClient.SpotStreams.SubscribeToOrderBookUpdatesAsync(
124-
marketDepth.Symbol.FullName, updateIntervalMs,
125-
data => OnSpotOrderBookUpdate(marketDepth, data),
126-
ct)
127-
.ConfigureAwait(false);
128-
break;
129-
case ContractType.Futures:
130-
subscriptionResult = await _webSocketClient.UsdFuturesStreams.SubscribeToOrderBookUpdatesAsync(
131-
marketDepth.Symbol.FullName, updateIntervalMs,
132-
data => OnFuturesOrderBookUpdate(marketDepth, data),
133-
ct)
134-
.ConfigureAwait(false);
135-
break;
136-
default:
137-
throw new ArgumentOutOfRangeException(nameof(marketDepth.Symbol.ContractType), "Unknown contract type.");
138-
}
139-
140-
if (!subscriptionResult.Success || subscriptionResult.Data == null)
141-
throw new InvalidOperationException($"Failed to subscribe to order book updates: {subscriptionResult.Error?.Message}");
142-
143-
_subscription = subscriptionResult.Data;
176+
_subscription = await SubscribeToOrderBookAsync(
177+
marketDepth.Symbol,
178+
updateIntervalMs,
179+
data => OnDepthUpdate(marketDepth, data),
180+
ct);
144181

145182
// Step 2: Wait a bit to buffer some events
146183
// Use longer buffer time to ensure we have enough events before snapshot
@@ -193,8 +230,8 @@ public async Task BuildAsync(MarketDepth marketDepth, short orderBookDepth = 10,
193230
// Step 6: Set local order book to snapshot
194231
_logger.Debug($"6: Applying snapshot with {snapshot.Asks.Count()} asks and {snapshot.Bids.Count()} bids");
195232
marketDepth.UpdateDepth(snapshot.Asks, snapshot.Bids, snapshot.LastUpdateId);
196-
_localOrderBookUpdateId = marketDepth.LastUpdateId ?? throw new InvalidOperationException("MarketDepth.LastUpdateId is null after applying snapshot");
197-
_isSnapshotLoaded = marketDepth.LastUpdateId == snapshot.LastUpdateId;
233+
_lastProcessedUpdateId = marketDepth.LastUpdateId ?? throw new InvalidOperationException("MarketDepth.LastUpdateId is null after applying snapshot");
234+
_snapshotApplied = marketDepth.LastUpdateId == snapshot.LastUpdateId;
198235

199236
// Step 7: Apply buffered updates
200237
int appliedCount = 0;
@@ -203,7 +240,7 @@ public async Task BuildAsync(MarketDepth marketDepth, short orderBookDepth = 10,
203240
var bufferedEvent = _eventBuffer.Peek();
204241
if (bufferedEvent != null)
205242
{
206-
ApplyDepthUpdate(marketDepth, bufferedEvent);
243+
ProcessDepthUpdate(marketDepth, bufferedEvent);
207244
appliedCount++;
208245
}
209246
_eventBuffer.Dequeue();
@@ -228,31 +265,11 @@ public async Task StreamUpdatesAsync(MarketDepth marketDepth, TimeSpan? updateIn
228265
_logger.Debug($"1 & 2: Streaming updates: Opening WebSocket stream for {marketDepth.Symbol}");
229266

230267
var updateIntervalMs = GetUpdateIntervalMs(updateInterval);
231-
CallResult<UpdateSubscription> subscriptionResult;
232-
233-
switch (marketDepth.Symbol.ContractType)
234-
{
235-
case ContractType.Spot:
236-
subscriptionResult = await _webSocketClient.SpotStreams.SubscribeToOrderBookUpdatesAsync(
237-
marketDepth.Symbol.FullName,
238-
updateIntervalMs,
239-
data => OnSpotOrderBookUpdate(marketDepth, data),
240-
ct)
241-
.ConfigureAwait(false);
242-
break;
243-
case ContractType.Futures:
244-
subscriptionResult = await _webSocketClient.UsdFuturesStreams.SubscribeToOrderBookUpdatesAsync(
245-
marketDepth.Symbol.FullName,
246-
updateIntervalMs,
247-
data => OnFuturesOrderBookUpdate(marketDepth, data),
248-
ct)
249-
.ConfigureAwait(false);
250-
break;
251-
default:
252-
throw new ArgumentOutOfRangeException(nameof(marketDepth.Symbol.ContractType), "Unknown contract type.");
253-
}
254-
255-
_subscription = subscriptionResult.Data;
268+
_subscription = await SubscribeToOrderBookAsync(
269+
marketDepth.Symbol,
270+
updateIntervalMs,
271+
data => OnDepthUpdate(marketDepth, data),
272+
ct);
256273
}
257274

258275
/// <summary>
@@ -267,19 +284,18 @@ public async Task StopStreamingAsync(CancellationToken ct = default)
267284
}
268285
}
269286

270-
private void OnSpotOrderBookUpdate(MarketDepth marketDepth, DataEvent<IBinanceEventOrderBook> eventData) =>
271-
OnDepthUpdate(marketDepth, eventData.Data);
272-
273-
private void OnFuturesOrderBookUpdate(MarketDepth marketDepth, DataEvent<IBinanceFuturesEventOrderBook> eventData) =>
274-
OnDepthUpdate(marketDepth, eventData.Data);
275-
287+
/// <summary>
288+
/// Processes incoming order book updates, buffering before snapshot or applying after.
289+
/// </summary>
290+
/// <param name="marketDepth">The market depth to update</param>
291+
/// <param name="eventData">The order book update event</param>
276292
private void OnDepthUpdate(MarketDepth marketDepth, IBinanceEventOrderBook eventData)
277293
{
278294
if (eventData == null) return;
279295

280296
lock (_eventBuffer)
281297
{
282-
if (!_isSnapshotLoaded)
298+
if (!_snapshotApplied)
283299
{
284300
// Step 2: Buffer events before snapshot is loaded
285301
_eventBuffer.Enqueue(eventData);
@@ -288,38 +304,45 @@ private void OnDepthUpdate(MarketDepth marketDepth, IBinanceEventOrderBook event
288304
}
289305

290306
// Apply update to local order book
291-
ApplyDepthUpdate(marketDepth, eventData);
307+
ProcessDepthUpdate(marketDepth, eventData);
292308
}
293309
}
294310

295-
private void ApplyDepthUpdate(MarketDepth marketDepth, IBinanceEventOrderBook eventData)
311+
/// <summary>
312+
/// Process and apply a depth update event to the market depth.
313+
/// Validates event sequence and updates the local order book.
314+
/// </summary>
315+
/// <param name="marketDepth">The market depth to update</param>
316+
/// <param name="eventData">The order book update event</param>
317+
/// <exception cref="InvalidOperationException">Thrown when updates are missed (Spot markets only)</exception>
318+
private void ProcessDepthUpdate(MarketDepth marketDepth, IBinanceEventOrderBook eventData)
296319
{
297320
// Step 7: Apply update procedure
298321
long lastUpdateId = eventData.LastUpdateId;
299322

300323
// 1. Decide whether the update event can be applied
301-
if (lastUpdateId <= _localOrderBookUpdateId)
324+
if (lastUpdateId <= _lastProcessedUpdateId)
302325
{
303326
// Event is older than local order book, ignore
304-
_logger.Debug($"Ignoring old event: u={lastUpdateId} <= local={_localOrderBookUpdateId}");
327+
_logger.Debug($"Ignoring old event: u={lastUpdateId} <= local={_lastProcessedUpdateId}");
305328
return;
306329
}
307330

308331
// Check for missed updates. WARN: not worked for Futures API
309-
if (eventData.FirstUpdateId > _localOrderBookUpdateId + 1 && marketDepth.Symbol.ContractType == ContractType.Spot)
332+
if (eventData.FirstUpdateId > _lastProcessedUpdateId + 1 && marketDepth.Symbol.ContractType == ContractType.Spot)
310333
{
311-
string errorMsg = $"Missed order book updates. Expected U <= {_localOrderBookUpdateId + 1}, got U={eventData.FirstUpdateId}";
334+
string errorMsg = $"Missed order book updates. Expected U <= {_lastProcessedUpdateId + 1}, got U={eventData.FirstUpdateId}";
312335
_logger.Error(errorMsg);
313336
throw new InvalidOperationException(errorMsg);
314337
}
315338

316339
// 2. Update price levels
317-
if (_localOrderBookUpdateId % 100 == 0) // Log every 100th update to avoid flooding
340+
if (_lastProcessedUpdateId % 100 == 0) // Log every 100th update to avoid flooding
318341
_logger.Debug($"Applying update: U={eventData.FirstUpdateId}, u={lastUpdateId}, Asks={eventData.Asks.Count()}, Bids={eventData.Bids.Count()}");
319342

320343
marketDepth.UpdateDepth(eventData.Asks, eventData.Bids, lastUpdateId);
321344

322345
// 3. Set order book update ID
323-
_localOrderBookUpdateId = lastUpdateId;
346+
_lastProcessedUpdateId = lastUpdateId;
324347
}
325348
}

0 commit comments

Comments
 (0)