Skip to content

Commit e066e18

Browse files
committed
Fix parallel processing.
1 parent 2449c1d commit e066e18

2 files changed

Lines changed: 18 additions & 8 deletions

File tree

Connectors/Tinkoff/TinkoffMessageAdapter.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ namespace StockSharp.Tinkoff;
33
using Ecng.ComponentModel;
44
using Ecng.Net;
55

6+
using Nito.AsyncEx;
7+
68
public partial class TinkoffMessageAdapter
79
{
810
private GrpcChannel _channel;
@@ -16,6 +18,8 @@ public partial class TinkoffMessageAdapter
1618
private static TimeSpan GetCurrentDelay(TimeSpan currentDelay)
1719
=> currentDelay.Multiply(2).Min(_maxDelay);
1820

21+
private readonly AsyncLock _lock = new();
22+
1923
/// <summary>
2024
/// Initializes a new instance of the <see cref="TinkoffMessageAdapter"/>.
2125
/// </summary>

Connectors/Tinkoff/TinkoffMessageAdapter_MarketData.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ protected override async ValueTask OnTFCandlesSubscriptionAsync(MarketDataMessag
536536
if (!TryGetAndRemove(mdMsg.OriginalTransactionId, out var t))
537537
return;
538538

539-
await _mdStream.RequestStream.WriteAsync(new()
539+
await WriteMdRequest(new()
540540
{
541541
SubscribeCandlesRequest = new()
542542
{
@@ -555,7 +555,7 @@ await _mdStream.RequestStream.WriteAsync(new()
555555
}
556556

557557
private Task SubscribeCandles(MarketDataMessage mdMsg, CancellationToken cancellationToken)
558-
=> _mdStream.RequestStream.WriteAsync(new()
558+
=> WriteMdRequest(new()
559559
{
560560
SubscribeCandlesRequest = new()
561561
{
@@ -673,7 +673,7 @@ protected override async ValueTask OnMarketDepthSubscriptionAsync(MarketDataMess
673673
if (!TryGetAndRemove(mdMsg.OriginalTransactionId, out var t))
674674
return;
675675

676-
await _mdStream.RequestStream.WriteAsync(new()
676+
await WriteMdRequest(new()
677677
{
678678
SubscribeOrderBookRequest = new()
679679
{
@@ -698,7 +698,7 @@ await _mdStream.RequestStream.WriteAsync(new()
698698
}
699699

700700
private Task SubscribeMarketDepth(MarketDataMessage mdMsg, CancellationToken cancellationToken)
701-
=> _mdStream.RequestStream.WriteAsync(new()
701+
=> WriteMdRequest(new()
702702
{
703703
SubscribeOrderBookRequest = new()
704704
{
@@ -741,7 +741,7 @@ protected override async ValueTask OnTicksSubscriptionAsync(MarketDataMessage md
741741
if (!TryGetAndRemove(mdMsg.OriginalTransactionId, out var t))
742742
return;
743743

744-
await _mdStream.RequestStream.WriteAsync(new()
744+
await WriteMdRequest(new()
745745
{
746746
SubscribeTradesRequest = new()
747747
{
@@ -765,7 +765,7 @@ await _mdStream.RequestStream.WriteAsync(new()
765765
}
766766

767767
private Task SubscribeTicks(MarketDataMessage mdMsg, CancellationToken cancellationToken)
768-
=> _mdStream.RequestStream.WriteAsync(new()
768+
=> WriteMdRequest(new()
769769
{
770770
SubscribeTradesRequest = new()
771771
{
@@ -807,7 +807,7 @@ protected override async ValueTask OnLevel1SubscriptionAsync(MarketDataMessage m
807807
if (!TryGetAndRemove(mdMsg.OriginalTransactionId, out var t))
808808
return;
809809

810-
await _mdStream.RequestStream.WriteAsync(new()
810+
await WriteMdRequest(new()
811811
{
812812
SubscribeLastPriceRequest = new()
813813
{
@@ -836,7 +836,7 @@ await _mdStream.RequestStream.WriteAsync(new()
836836
}
837837

838838
private Task SubscribeLevel1(MarketDataMessage mdMsg, CancellationToken cancellationToken)
839-
=> _mdStream.RequestStream.WriteAsync(new()
839+
=> WriteMdRequest(new()
840840
{
841841
SubscribeLastPriceRequest = new()
842842
{
@@ -861,4 +861,10 @@ private Task SubscribeLevel1(MarketDataMessage mdMsg, CancellationToken cancella
861861
SubscriptionAction = SubscriptionAction.Subscribe,
862862
},
863863
}, cancellationToken);
864+
865+
private async Task WriteMdRequest(MarketDataRequest message, CancellationToken cancellationToken)
866+
{
867+
using var _ = await _lock.LockAsync(cancellationToken);
868+
await _mdStream.RequestStream.WriteAsync(message, cancellationToken);
869+
}
864870
}

0 commit comments

Comments
 (0)