Skip to content

Commit 6318592

Browse files
committed
Calculate OB metrics from snapshots only when diffs are not available
1 parent 0ea94c0 commit 6318592

10 files changed

Lines changed: 160 additions & 80 deletions

File tree

src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBook.cs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -186,16 +186,15 @@ protected override void UpdateLevels(IEnumerable<OrderBookLevel> levels)
186186
}
187187

188188
var previousPrice = existing.Price;
189-
var previousAmount = existing.Amount;
190189

191-
ComputeUpdate(existing, level);
190+
CalculateMetricsForUpdatedLevel(existing, level);
192191

193-
InsertToCollection(collection, ordering, existing, previousPrice, previousAmount);
192+
InsertToCollection(collection, ordering, existing, previousPrice);
194193
}
195194
}
196195

197196
private void InsertToCollection(IDictionary<double, OrderedDictionary>? collection, OrderBookLevelsOrderPerPrice? ordering,
198-
OrderBookLevel level, double? previousPrice = null, double? previousAmount = null)
197+
OrderBookLevel level, double? previousPrice = null)
199198
{
200199
if (collection == null)
201200
return;
@@ -207,11 +206,11 @@ private void InsertToCollection(IDictionary<double, OrderedDictionary>? collecti
207206
}
208207

209208
GetAllCollection(level.Side)[level.Id] = level;
210-
InsertLevelIntoPriceGroup(level, collection, ordering, previousPrice, previousAmount);
209+
InsertLevelIntoPriceGroup(level, collection, ordering, previousPrice);
211210
}
212211

213212
private void InsertLevelIntoPriceGroup(OrderBookLevel level, IDictionary<double, OrderedDictionary> collection,
214-
OrderBookLevelsOrderPerPrice? orderingGroup, double? previousPrice = null, double? previousAmount = null)
213+
OrderBookLevelsOrderPerPrice? orderingGroup, double? previousPrice = null)
215214
{
216215
if (orderingGroup == null || level.Price == null)
217216
return;
@@ -252,12 +251,6 @@ private void InsertLevelIntoPriceGroup(OrderBookLevel level, IDictionary<double,
252251
currentGroup.Remove(level.Id);
253252
}
254253

255-
// amount changed, increase counter
256-
if (previousAmount.HasValue && !CryptoMathUtils.IsSame(previousAmount, level.Amount))
257-
{
258-
level.AmountUpdatedCount++;
259-
}
260-
261254
currentGroup[level.Id] = level;
262255
level.Ordering = ordering;
263256
}
@@ -282,7 +275,7 @@ protected override void DeleteLevels(IReadOnlyCollection<OrderBookLevel> levels)
282275
{
283276
var existing = allLevels[level.Id];
284277

285-
ComputeDelete(existing, level);
278+
CalculateMetricsForDeletedLevel(existing, level);
286279

287280
price = existing.Price ?? -1;
288281
allLevels.Remove(level.Id);

src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookBase.cs

Lines changed: 111 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1+
using Crypto.Websocket.Extensions.Core.Models;
2+
using Crypto.Websocket.Extensions.Core.OrderBooks.Models;
3+
using Crypto.Websocket.Extensions.Core.OrderBooks.Sources;
4+
using Crypto.Websocket.Extensions.Core.Utils;
5+
using Crypto.Websocket.Extensions.Core.Validations;
6+
using Microsoft.Extensions.Logging;
17
using System;
28
using System.Collections.Generic;
9+
using System.Collections.ObjectModel;
310
using System.Diagnostics;
411
using System.Linq;
512
using System.Reactive.Linq;
613
using System.Reactive.Subjects;
714
using System.Threading;
815
using System.Threading.Tasks;
9-
using Crypto.Websocket.Extensions.Core.Models;
10-
using Crypto.Websocket.Extensions.Core.OrderBooks.Models;
11-
using Crypto.Websocket.Extensions.Core.OrderBooks.Sources;
12-
using Crypto.Websocket.Extensions.Core.Utils;
13-
using Crypto.Websocket.Extensions.Core.Validations;
14-
using Microsoft.Extensions.Logging;
1516

1617
namespace Crypto.Websocket.Extensions.Core.OrderBooks
1718
{
@@ -20,19 +21,19 @@ namespace Crypto.Websocket.Extensions.Core.OrderBooks
2021
/// </summary>
2122
public abstract class CryptoOrderBookBase<T> : ICryptoOrderBook
2223
{
23-
/// <summary>
24-
/// Object to use for synchronization.
25-
/// </summary>
24+
/// <summary>
25+
/// Object to use for synchronization.
26+
/// </summary>
2627
#if NET9_0_OR_GREATER
2728
protected readonly Lock Locker = new();
2829
#else
29-
protected readonly object Locker = new();
30+
protected readonly object Locker = new();
3031
#endif
3132

32-
/// <summary>
33-
/// The source.
34-
/// </summary>
35-
protected readonly IOrderBookSource Source;
33+
/// <summary>
34+
/// The source.
35+
/// </summary>
36+
protected readonly IOrderBookSource Source;
3637

3738
/// <summary>
3839
/// The internal collection of bid levels.
@@ -108,6 +109,9 @@ protected CryptoOrderBookBase(string targetPair, IOrderBookSource source)
108109
TargetPairOriginal = targetPair;
109110
TargetPair = CryptoPairsHelper.Clean(targetPair);
110111
Source = source;
112+
113+
// diffs are not supported by the source, we need to calculate metrics from snapshots
114+
CalculateMetricsFromSnapshots = !Source.DiffsSupported;
111115
}
112116

113117
/// <summary>
@@ -208,6 +212,11 @@ public bool ValidityCheckEnabled
208212
/// <inheritdoc />
209213
public bool IsIndexComputationEnabled { get; set; }
210214

215+
/// <summary>
216+
/// If exchange provides only snapshots with no diffs, enable this to calculate metrics
217+
/// </summary>
218+
public bool CalculateMetricsFromSnapshots { get; set; }
219+
211220
/// <inheritdoc />
212221
public IObservable<IOrderBookChangeInfo> BidAskUpdatedStream => BidAskUpdated.AsObservable();
213222

@@ -382,32 +391,15 @@ private void NotifyOrderBookChanges(TopNLevelsChangeInfo levelsChange)
382391

383392
private void HandleSnapshot(OrderBookLevel[] levels)
384393
{
385-
ClearLevels();
386-
387394
LogDebug($"Handling snapshot: {levels.Length} levels");
388-
foreach (var level in levels)
389-
{
390-
var price = level.Price;
391-
if (price is null or < 0)
392-
{
393-
LogAlways($"Received snapshot level with weird price, ignoring. [{level.Side}] Id: {level.Id}, price: {level.Price}, amount: {level.Amount}");
394-
continue;
395-
}
396-
397-
level.AmountDifference = level.Amount ?? 0;
398-
level.CountDifference = level.Count ?? 0;
399395

400-
switch (level.Side)
401-
{
402-
case CryptoOrderSide.Bid:
403-
HandleSnapshotBidLevel(level);
404-
AllBidLevels[level.Id] = level;
405-
break;
406-
case CryptoOrderSide.Ask:
407-
HandleSnapshotAskLevel(level);
408-
AllAskLevels[level.Id] = level;
409-
break;
410-
}
396+
if (CalculateMetricsFromSnapshots)
397+
{
398+
HandleSnapshotWithMetrics(levels);
399+
}
400+
else
401+
{
402+
HandleSnapshotWithNoMetrics(levels);
411403
}
412404

413405
RecomputeAfterChangeAndSetIndexes(levels);
@@ -471,11 +463,11 @@ private void HandleDiff(OrderBookLevelBulk bulk, IReadOnlyCollection<OrderBookLe
471463
protected abstract void UpdateLevels(IEnumerable<OrderBookLevel> levels);
472464

473465
/// <summary>
474-
/// Computes differences and copies state between existing and new level.
466+
/// Calculates differences and copies state between existing and new level.
475467
/// </summary>
476468
/// <param name="existing">The existing level.</param>
477469
/// <param name="level">The new level.</param>
478-
protected static void ComputeUpdate(OrderBookLevel existing, OrderBookLevel level)
470+
protected static void CalculateMetricsForUpdatedLevel(OrderBookLevel existing, OrderBookLevel level)
479471
{
480472
var amountDiff = (level.Amount ?? existing.Amount ?? 0) - (existing.Amount ?? 0);
481473
var countDiff = (level.Count ?? existing.Count ?? 0) - (existing.Count ?? 0);
@@ -497,6 +489,9 @@ protected static void ComputeUpdate(OrderBookLevel existing, OrderBookLevel leve
497489
level.CountDifferenceAggregated += countDiff;
498490
existing.CountDifferenceAggregated += countDiff;
499491

492+
existing.AmountUpdatedCount += amountDiff != 0 ? 1 : 0;
493+
level.AmountUpdatedCount = existing.AmountUpdatedCount;
494+
500495
level.Amount ??= existing.Amount;
501496
level.Count ??= existing.Count;
502497
level.Price ??= existing.Price;
@@ -523,7 +518,7 @@ protected static bool IsInvalidLevel(OrderBookLevel level) =>
523518
/// </summary>
524519
/// <param name="existing">The existing level.</param>
525520
/// <param name="level">The new level.</param>
526-
protected static void ComputeDelete(OrderBookLevel existing, OrderBookLevel level)
521+
protected static void CalculateMetricsForDeletedLevel(OrderBookLevel existing, OrderBookLevel level)
527522
{
528523
var amountDiff = -(existing.Amount ?? 0);
529524
var countDiff = -(existing.Count ?? 0);
@@ -545,6 +540,81 @@ protected static void ComputeDelete(OrderBookLevel existing, OrderBookLevel leve
545540
existing.CountDifferenceAggregated += countDiff;
546541
}
547542

543+
private void HandleSnapshotWithNoMetrics(OrderBookLevel[] levels)
544+
{
545+
ClearLevels();
546+
foreach (var level in levels)
547+
{
548+
var price = level.Price;
549+
if (price is null or < 0)
550+
{
551+
LogAlways($"Received snapshot level with weird price, ignoring. [{level.Side}] Id: {level.Id}, price: {level.Price}, amount: {level.Amount}");
552+
continue;
553+
}
554+
555+
level.AmountDifference = level.Amount ?? 0;
556+
level.CountDifference = level.Count ?? 0;
557+
558+
switch (level.Side)
559+
{
560+
case CryptoOrderSide.Bid:
561+
HandleSnapshotBidLevel(level);
562+
AllBidLevels[level.Id] = level;
563+
break;
564+
case CryptoOrderSide.Ask:
565+
HandleSnapshotAskLevel(level);
566+
AllAskLevels[level.Id] = level;
567+
break;
568+
}
569+
}
570+
}
571+
572+
private void HandleSnapshotWithMetrics(OrderBookLevel[] levels)
573+
{
574+
foreach (var level in levels)
575+
{
576+
var price = level.Price;
577+
if (price is null or < 0)
578+
{
579+
LogAlways($"Received snapshot level with weird price, ignoring. [{level.Side}] Id: {level.Id}, price: {level.Price}, amount: {level.Amount}");
580+
continue;
581+
}
582+
583+
var existing = FindLevelById(level.Id, level.Side);
584+
if (existing == null)
585+
{
586+
level.AmountDifference = level.Amount ?? 0;
587+
level.CountDifference = level.Count ?? 0;
588+
level.AmountUpdatedCount = 0;
589+
continue;
590+
}
591+
592+
CalculateMetricsForUpdatedLevel(existing, level);
593+
}
594+
595+
ClearLevels();
596+
foreach (var level in levels)
597+
{
598+
var price = level.Price;
599+
if (price is null or < 0)
600+
{
601+
continue;
602+
}
603+
604+
switch (level.Side)
605+
{
606+
case CryptoOrderSide.Bid:
607+
HandleSnapshotBidLevel(level);
608+
AllBidLevels[level.Id] = level;
609+
break;
610+
case CryptoOrderSide.Ask:
611+
HandleSnapshotAskLevel(level);
612+
AllAskLevels[level.Id] = level;
613+
break;
614+
}
615+
}
616+
}
617+
548618
private void RecomputeAfterChange()
549619
{
550620
var firstBid = GetFirstBid();

src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookL2.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,13 @@ protected override void UpdateLevels(IEnumerable<OrderBookLevel> levels)
127127
{
128128
level.AmountDifference = level.Amount ?? 0;
129129
level.CountDifference = level.Count ?? 0;
130-
level.AmountUpdatedCount = -1;
130+
level.AmountUpdatedCount = 0;
131131

132132
InsertToCollection(collection, level);
133133
continue;
134134
}
135135

136-
ComputeUpdate(existing, level);
136+
CalculateMetricsForUpdatedLevel(existing, level);
137137

138138
InsertToCollection(collection, existing);
139139
}
@@ -152,7 +152,6 @@ private void InsertToCollection(IDictionary<double, OrderBookLevel>? collection,
152152
// ReSharper disable once PossibleInvalidOperationException
153153
collection[level.Price!.Value] = level;
154154
GetAllCollection(level.Side)[level.Id] = level;
155-
level.AmountUpdatedCount++;
156155
}
157156

158157
/// <inheritdoc />
@@ -185,7 +184,7 @@ protected override void DeleteLevels(IReadOnlyCollection<OrderBookLevel> levels)
185184
allLevels.Remove(level.Id);
186185

187186
if (existing != null)
188-
ComputeDelete(existing, level);
187+
CalculateMetricsForDeletedLevel(existing, level);
189188
}
190189
}
191190

src/Crypto.Websocket.Extensions.Core/OrderBooks/ICryptoOrderBook.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ public interface ICryptoOrderBook : IDisposable
8989
/// </summary>
9090
bool IsIndexComputationEnabled { get; set; }
9191

92+
/// <summary>
93+
/// If exchange provides only snapshots with no diffs, enable this to calculate metrics
94+
/// </summary>
95+
bool CalculateMetricsFromSnapshots { get; set; }
96+
9297
/// <summary>
9398
/// Streams data when top level bid or ask price was updated
9499
/// </summary>

src/Crypto.Websocket.Extensions.Core/OrderBooks/Sources/IOrderBookSource.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ public interface IOrderBookSource : IDisposable
2121
/// </summary>
2222
bool LoadSnapshotEnabled { get; set; }
2323

24+
/// <summary>
25+
/// If source provides only snapshots with no diffs, this will return false
26+
/// </summary>
27+
bool DiffsSupported { get; }
28+
2429
/// <summary>
2530
/// Whenever messages should be buffered before processing.
2631
/// Use property `BufferInterval` to configure buffering interval.

src/Crypto.Websocket.Extensions.Core/OrderBooks/Sources/OrderBookSourceBase.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ public abstract class OrderBookSourceBase : IOrderBookSource
1818
#if NET9_0_OR_GREATER
1919
private readonly Lock _bufferLocker = new Lock();
2020
#else
21-
private readonly object _bufferLocker = new object();
21+
private readonly object _bufferLocker = new object();
2222
#endif
23-
private readonly CryptoAsyncLock _snapshotLocker = new CryptoAsyncLock();
23+
private readonly CryptoAsyncLock _snapshotLocker = new CryptoAsyncLock();
2424
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
2525

2626
private readonly Queue<object> _dataBuffer = new Queue<object>();
@@ -65,6 +65,11 @@ public virtual void Dispose()
6565
/// <inheritdoc />
6666
public bool LoadSnapshotEnabled { get; set; } = false;
6767

68+
/// <summary>
69+
/// If source provides only snapshots with no diffs, this will return false
70+
/// </summary>
71+
public virtual bool DiffsSupported { get; } = true;
72+
6873
/// <inheritdoc />
6974
public bool BufferEnabled
7075
{
@@ -130,7 +135,7 @@ public virtual bool IsValid()
130135
/// </summary>
131136
protected void StreamSnapshot(OrderBookLevelBulk? data)
132137
{
133-
if (data?.Levels is {Length: > 0})
138+
if (data?.Levels is { Length: > 0 })
134139
{
135140
_orderBookSnapshotSubject.OnNext(data);
136141
}
@@ -188,12 +193,12 @@ private async Task ProcessData()
188193

189194
StreamDataSynchronized();
190195
}
191-
catch (Exception e)
196+
catch (Exception e)
192197
{
193198
_logger.LogDebug("[{exchangeName}] Failed while buffering orderbook changes." +
194-
"Error: {error}", ExchangeName, e.Message);
195-
}
196-
}
199+
"Error: {error}", ExchangeName, e.Message);
200+
}
201+
}
197202
}
198203

199204
private void StreamDataSynchronized()

0 commit comments

Comments
 (0)