Skip to content

Commit 660f2a2

Browse files
committed
Convert security access to async (SaveAsync/GetSecurityAsync) end to end
The security get-or-create path was bridged to the async storage through a blocking AsyncHelper.Run, turning SaveAsync into a sync-over-async call inside the hot message-processing path. Propagate real async up the whole chain. TraderHelper: - Add GetOrCreateAsync that awaits ISecurityStorage.SaveAsync. It does not hold the storage Lock.Scope across the await (the lock is monitor based and thread affine); SaveAsync(forced: false) is an atomic add-if-absent, and the canonical stored instance is re-resolved to stay correct under concurrent creation. - Mark the sync GetOrCreate(out bool) obsolete; it bridges to the async variant and has no internal callers left. Connector / Connector_ProcessMessage: - Add GetSecurityAsync/EnsureGetSecurityAsync and route the message-processing handlers (Security, Level1, News, Quotes, Trade, Transaction, PositionChange, Execution) through them; the already-async dispatcher awaits them. No blocking Run remains on this path. - TryGetSecurity (entity cache snapshot delegate) becomes a pure LookupById: a snapshot lookup must not create or persist a security. IConnector: - Add GetSecurityAsync(SecurityId, CancellationToken) and mark the sync GetSecurity(SecurityId) obsolete. The sync method stays as a deprecated bridge for external callers only. Migrate the in-repo callers (tests and the RemoteSource sample) to the async API.
1 parent e67deb2 commit 660f2a2

9 files changed

Lines changed: 119 additions & 83 deletions

File tree

Algo/Connector.cs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -955,13 +955,16 @@ private void ProcessTimeInterval(Message message)
955955
}
956956

957957
/// <inheritdoc />
958+
[Obsolete("Use GetSecurityAsync instead.")]
958959
public Security GetSecurity(SecurityId securityId)
959-
=> GetSecurity(securityId, s => false);
960+
// Sync bridge over the async path kept only for the deprecated IConnector contract.
961+
=> AsyncHelper.Run(() => GetSecurityAsync(securityId, default));
960962

961963
private Security TryGetSecurity(SecurityId? securityId)
962-
=> securityId == null || securityId.Value == default ? null : GetSecurity(securityId.Value);
964+
// Pure lookup used by the entity cache snapshots: it must not create/persist a security.
965+
=> securityId == null || securityId.Value == default ? null : SecurityStorage.LookupById(securityId.Value);
963966

964-
private Security EnsureGetSecurity<TMessage>(TMessage message)
967+
private async ValueTask<Security> EnsureGetSecurityAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
965968
where TMessage : ISecurityIdMessage, ISubscriptionIdMessage
966969
{
967970
var secId = message.SecurityId;
@@ -983,24 +986,29 @@ private Security EnsureGetSecurity<TMessage>(TMessage message)
983986
secId = subscrSecId.Value;
984987
}
985988

986-
return TryGetSecurity(secId) ?? throw new ArgumentOutOfRangeException(nameof(message), message, LocalizedStrings.SecurityNoFound.Put(secId));
989+
return await GetSecurityAsync(secId, cancellationToken) ?? throw new ArgumentOutOfRangeException(nameof(message), message, LocalizedStrings.SecurityNoFound.Put(secId));
987990
}
988991

992+
/// <inheritdoc />
993+
public ValueTask<Security> GetSecurityAsync(SecurityId securityId, CancellationToken cancellationToken)
994+
=> GetSecurityAsync(securityId, s => false, cancellationToken);
995+
989996
/// <summary>
990997
/// To get the instrument by the code.
991998
/// </summary>
992999
/// <param name="id">Security ID.</param>
9931000
/// <param name="changeSecurity">The handler changing the instrument. It returns <see langword="true" /> if the instrument has been changed and the <see cref="SecurityReceived"/> should be called.</param>
1001+
/// <param name="cancellationToken"><see cref="CancellationToken"/></param>
9941002
/// <returns>Security.</returns>
995-
private Security GetSecurity(SecurityId id, Func<Security, bool> changeSecurity)
1003+
private async ValueTask<Security> GetSecurityAsync(SecurityId id, Func<Security, bool> changeSecurity, CancellationToken cancellationToken)
9961004
{
9971005
if (id == default)
9981006
throw new ArgumentNullException(nameof(id));
9991007

10001008
if (changeSecurity == null)
10011009
throw new ArgumentNullException(nameof(changeSecurity));
10021010

1003-
var security = SecurityStorage.GetOrCreate(id, key =>
1011+
var (security, isNew) = await SecurityStorage.GetOrCreateAsync(id, key =>
10041012
{
10051013
var idInfo = SecurityIdGenerator.Split(key);
10061014

@@ -1013,7 +1021,7 @@ private Security GetSecurity(SecurityId id, Func<Security, bool> changeSecurity)
10131021
Code = code,
10141022
Board = board,
10151023
};
1016-
}, out var isNew);
1024+
}, cancellationToken);
10171025

10181026
if (_existingSecurities.TryAdd(security))
10191027
_added?.Invoke([security]);

Algo/Connector_ProcessMessage.cs

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ protected virtual async ValueTask OnProcessMessage(Message message, Cancellation
617617
break;
618618

619619
case MessageTypes.QuoteChange:
620-
ProcessQuotesMessage((QuoteChangeMessage)message);
620+
await ProcessQuotesMessage((QuoteChangeMessage)message, cancellationToken);
621621
break;
622622

623623
case MessageTypes.Board:
@@ -629,31 +629,31 @@ protected virtual async ValueTask OnProcessMessage(Message message, Cancellation
629629
break;
630630

631631
case MessageTypes.Security:
632-
ProcessSecurityMessage((SecurityMessage)message);
632+
await ProcessSecurityMessage((SecurityMessage)message, cancellationToken);
633633
break;
634634

635635
case MessageTypes.DataTypeInfo:
636636
ProcessDataTypeInfoMessage((DataTypeInfoMessage)message);
637637
break;
638638

639639
case MessageTypes.Level1Change:
640-
ProcessLevel1ChangeMessage((Level1ChangeMessage)message);
640+
await ProcessLevel1ChangeMessage((Level1ChangeMessage)message, cancellationToken);
641641
break;
642642

643643
case MessageTypes.News:
644-
ProcessNewsMessage((NewsMessage)message);
644+
await ProcessNewsMessage((NewsMessage)message, cancellationToken);
645645
break;
646646

647647
case MessageTypes.Execution:
648-
ProcessExecutionMessage((ExecutionMessage)message);
648+
await ProcessExecutionMessage((ExecutionMessage)message, cancellationToken);
649649
break;
650650

651651
case MessageTypes.Portfolio:
652652
ProcessPortfolioMessage((PortfolioMessage)message);
653653
break;
654654

655655
case MessageTypes.PositionChange:
656-
ProcessPositionChangeMessage((PositionChangeMessage)message);
656+
await ProcessPositionChangeMessage((PositionChangeMessage)message, cancellationToken);
657657
break;
658658

659659
//case MessageTypes.Time:
@@ -771,7 +771,7 @@ private async ValueTask ProcessSubscriptionFinishedMessage(SubscriptionFinishedM
771771

772772
await foreach (var secMsg in message.Body.ExtractSecuritiesAsync().WithCancellation(cancellationToken))
773773
{
774-
ProcessSecurityMessage(secMsg);
774+
await ProcessSecurityMessage(secMsg, cancellationToken);
775775
secMsgs.Add(secMsg);
776776
}
777777

@@ -923,16 +923,16 @@ private void ProcessBoardMessage(BoardMessage message)
923923
RaiseReceived(board, subscriptions, BoardReceived);
924924
}
925925

926-
private void ProcessSecurityMessage(SecurityMessage message)
926+
private async ValueTask ProcessSecurityMessage(SecurityMessage message, CancellationToken cancellationToken)
927927
{
928-
var security = GetSecurity(message.SecurityId, s =>
928+
var security = await GetSecurityAsync(message.SecurityId, s =>
929929
{
930930
if (!UpdateSecurityByDefinition)
931931
return false;
932932

933933
s.ApplyChanges(message, ExchangeInfoProvider, OverrideSecurityData);
934934
return true;
935-
});
935+
}, cancellationToken);
936936

937937
var subscriptions = _subscriptionManager.ProcessLookupResponse(message, security);
938938
RaiseReceived(security, subscriptions, SecurityReceived);
@@ -946,7 +946,7 @@ private void ProcessDataTypeInfoMessage(DataTypeInfoMessage message)
946946
RaiseReceived(dt, message, DataTypeReceived);
947947
}
948948

949-
private void ProcessLevel1ChangeMessage(Level1ChangeMessage message)
949+
private async ValueTask ProcessLevel1ChangeMessage(Level1ChangeMessage message, CancellationToken cancellationToken)
950950
{
951951
Security security = null;
952952

@@ -955,7 +955,7 @@ private void ProcessLevel1ChangeMessage(Level1ChangeMessage message)
955955
if (anyCanOnline != true)
956956
return;
957957

958-
security = EnsureGetSecurity(message);
958+
security = await EnsureGetSecurityAsync(message, cancellationToken);
959959

960960
if (_entityCache.HasLevel1Info(security))
961961
return;
@@ -964,15 +964,15 @@ private void ProcessLevel1ChangeMessage(Level1ChangeMessage message)
964964
#pragma warning disable CS0618 // Type or member is obsolete
965965
if (UpdateSecurityByLevel1)
966966
{
967-
security ??= EnsureGetSecurity(message);
967+
security ??= await EnsureGetSecurityAsync(message, cancellationToken);
968968

969969
security.ApplyChanges(message);
970970
}
971971
#pragma warning restore CS0618 // Type or member is obsolete
972972

973973
if (ValuesChanged is not null)
974974
{
975-
security ??= EnsureGetSecurity(message);
975+
security ??= await EnsureGetSecurityAsync(message, cancellationToken);
976976

977977
var time = message.ServerTime;
978978
var info = _entityCache.GetSecurityValues(security, time);
@@ -1069,7 +1069,7 @@ private void ProcessPortfolioMessage(PortfolioMessage message)
10691069
RaiseReceived(portfolio, message, PortfolioReceived);
10701070
}
10711071

1072-
private void ProcessPositionChangeMessage(PositionChangeMessage message)
1072+
private async ValueTask ProcessPositionChangeMessage(PositionChangeMessage message, CancellationToken cancellationToken)
10731073
{
10741074
if (!message.StrategyId.IsEmpty())
10751075
return;
@@ -1090,7 +1090,7 @@ private void ProcessPositionChangeMessage(PositionChangeMessage message)
10901090
RaiseReceived(portfolio, message, PortfolioReceived);
10911091
}
10921092

1093-
var security = EnsureGetSecurity(message);
1093+
var security = await EnsureGetSecurityAsync(message, cancellationToken);
10941094
portfolio = LookupByPortfolioName(message.PortfolioName);
10951095

10961096
var valueInLots = message.TryGetDecimal(PositionChangeTypes.CurrentValueInLots);
@@ -1112,17 +1112,17 @@ private void ProcessPositionChangeMessage(PositionChangeMessage message)
11121112
RaiseReceived(position, message, PositionReceived);
11131113
}
11141114

1115-
private void ProcessNewsMessage(NewsMessage message)
1115+
private async ValueTask ProcessNewsMessage(NewsMessage message, CancellationToken cancellationToken)
11161116
{
1117-
var security = message.SecurityId == null ? null : GetSecurity(message.SecurityId.Value);
1117+
var security = message.SecurityId == null ? null : await GetSecurityAsync(message.SecurityId.Value, cancellationToken);
11181118

11191119
var news = _entityCache.ProcessNewsMessage(security, message);
11201120

11211121
if (RaiseReceived(news.news, message, NewsReceived) == false)
11221122
return;
11231123
}
11241124

1125-
private void ProcessQuotesMessage(QuoteChangeMessage message)
1125+
private async ValueTask ProcessQuotesMessage(QuoteChangeMessage message, CancellationToken cancellationToken)
11261126
{
11271127
if (RaiseReceived(message, message, OrderBookReceived) != true)
11281128
return;
@@ -1141,7 +1141,7 @@ private void ProcessQuotesMessage(QuoteChangeMessage message)
11411141

11421142
if (ValuesChanged is not null && !fromLevel1 && !Adapter.Level1Extend && (bestBid != null || bestAsk != null))
11431143
{
1144-
security ??= EnsureGetSecurity(message);
1144+
security ??= await EnsureGetSecurityAsync(message, cancellationToken);
11451145

11461146
var info = _entityCache.GetSecurityValues(security, time);
11471147

@@ -1183,7 +1183,7 @@ private void ProcessQuotesMessage(QuoteChangeMessage message)
11831183
#pragma warning disable CS0618 // Type or member is obsolete
11841184
if (UpdateSecurityLastQuotes)
11851185
{
1186-
security ??= EnsureGetSecurity(message);
1186+
security ??= await EnsureGetSecurityAsync(message, cancellationToken);
11871187

11881188
var updated = false;
11891189

@@ -1214,11 +1214,11 @@ private void ProcessQuotesMessage(QuoteChangeMessage message)
12141214
if (bid.BoardCode.IsEmpty())
12151215
continue;
12161216

1217-
var innerSecurity = GetSecurity(new SecurityId
1217+
var innerSecurity = await GetSecurityAsync(new SecurityId
12181218
{
12191219
SecurityCode = security.Code,
12201220
BoardCode = bid.BoardCode
1221-
});
1221+
}, cancellationToken);
12221222

12231223
var info = changedSecurities.SafeAdd(innerSecurity);
12241224

@@ -1237,11 +1237,11 @@ private void ProcessQuotesMessage(QuoteChangeMessage message)
12371237
if (ask.BoardCode.IsEmpty())
12381238
continue;
12391239

1240-
var innerSecurity = GetSecurity(new SecurityId
1240+
var innerSecurity = await GetSecurityAsync(new SecurityId
12411241
{
12421242
SecurityCode = security.Code,
12431243
BoardCode = ask.BoardCode
1244-
});
1244+
}, cancellationToken);
12451245

12461246
var info = changedSecurities.SafeAdd(innerSecurity);
12471247

@@ -1266,7 +1266,7 @@ private void ProcessOrderLogMessage(ExecutionMessage message)
12661266
return;
12671267
}
12681268

1269-
private void ProcessTradeMessage(ExecutionMessage message)
1269+
private async ValueTask ProcessTradeMessage(ExecutionMessage message, CancellationToken cancellationToken)
12701270
{
12711271
if (RaiseReceived(message, message, TickTradeReceived) != true)
12721272
return;
@@ -1275,7 +1275,7 @@ private void ProcessTradeMessage(ExecutionMessage message)
12751275

12761276
if (ValuesChanged is not null)
12771277
{
1278-
security ??= EnsureGetSecurity(message);
1278+
security ??= await EnsureGetSecurityAsync(message, cancellationToken);
12791279

12801280
var time = message.ServerTime;
12811281
var info = _entityCache.GetSecurityValues(security, time);
@@ -1335,7 +1335,7 @@ private void ProcessTradeMessage(ExecutionMessage message)
13351335
#pragma warning disable CS0618 // Type or member is obsolete
13361336
if (UpdateSecurityLastQuotes)
13371337
{
1338-
security ??= EnsureGetSecurity(message);
1338+
security ??= await EnsureGetSecurityAsync(message, cancellationToken);
13391339

13401340
security.LastTick = message;
13411341
}
@@ -1438,7 +1438,7 @@ private void ProcessOwnTradeMessage(Order order, Security security, ExecutionMes
14381438
RaiseReceived(trade, message, OwnTradeReceived);
14391439
}
14401440

1441-
private void ProcessTransactionMessage(ExecutionMessage message)
1441+
private async ValueTask ProcessTransactionMessage(ExecutionMessage message, CancellationToken cancellationToken)
14421442
{
14431443
var originId = message.OriginalTransactionId;
14441444

@@ -1492,7 +1492,7 @@ private void ProcessTransactionMessage(ExecutionMessage message)
14921492
return;
14931493
}
14941494

1495-
security = EnsureGetSecurity(message);
1495+
security = await EnsureGetSecurityAsync(message, cancellationToken);
14961496

14971497
if (transactionId == 0 && isStatusRequest)
14981498
transactionId = TransactionIdGenerator.GetNextId();
@@ -1520,12 +1520,12 @@ private void ProcessTransactionMessage(ExecutionMessage message)
15201520
throw new ArgumentOutOfRangeException(nameof(message), message.DataType, LocalizedStrings.UnknownType.Put(message));
15211521
}
15221522

1523-
private void ProcessExecutionMessage(ExecutionMessage message)
1523+
private async ValueTask ProcessExecutionMessage(ExecutionMessage message, CancellationToken cancellationToken)
15241524
{
15251525
if (message.DataType == DataType.Transactions)
1526-
ProcessTransactionMessage(message);
1526+
await ProcessTransactionMessage(message, cancellationToken);
15271527
else if (message.DataType == DataType.Ticks)
1528-
ProcessTradeMessage(message);
1528+
await ProcessTradeMessage(message, cancellationToken);
15291529
else if (message.DataType == DataType.OrderLog)
15301530
ProcessOrderLogMessage(message);
15311531
else

Algo/Storages/StorageHelper_Obsolete.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public static IMarketDataStorage<CandleMessage> GetCandleMessageStorage(this ISt
215215
/// <param name="security">Security.</param>
216216
/// <param name="forced">Forced update.</param>
217217
/// <remarks>Calls async method via <see cref="AsyncHelper.Run{T}(Func{ValueTask{T}})"/> for backward compatibility.</remarks>
218-
//[Obsolete("Use SaveAsync method instead.")]
218+
[Obsolete("Use SaveAsync method instead.")]
219219
public static void Save(this ISecurityStorage storage, Security security, bool forced)
220220
{
221221
if (storage is null)

Algo/Storages/StorageMetaInfoMessageAdapter.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ protected override async ValueTask OnInnerAdapterNewOutMessageAsync(Message mess
9090
else
9191
security.ApplyChanges(secMsg, _exchangeInfoProvider, OverrideSecurityData);
9292

93-
_securityStorage.Save(security, OverrideSecurityData);
93+
await _securityStorage.SaveAsync(security, OverrideSecurityData, cancellationToken);
9494
break;
9595
}
9696
case MessageTypes.Board:
@@ -153,7 +153,7 @@ protected override async ValueTask OnInnerAdapterNewOutMessageAsync(Message mess
153153
}
154154
else
155155
{
156-
var position = GetPosition(positionMsg.SecurityId, positionMsg.PortfolioName, positionMsg.StrategyId, positionMsg.Side);
156+
var position = await GetPositionAsync(positionMsg.SecurityId, positionMsg.PortfolioName, positionMsg.StrategyId, positionMsg.Side, cancellationToken);
157157

158158
if (position == null)
159159
break;
@@ -264,12 +264,12 @@ async ValueTask SendOutAsync<TMessage>(TMessage outMsg)
264264
await base.OnSendInMessageAsync(msg, cancellationToken);
265265
}
266266

267-
private Position GetPosition(SecurityId securityId, string portfolioName, string strategyId, Sides? side)
267+
private async ValueTask<Position> GetPositionAsync(SecurityId securityId, string portfolioName, string strategyId, Sides? side, CancellationToken cancellationToken)
268268
{
269269
var security = (!securityId.SecurityCode.IsEmpty() && !securityId.BoardCode.IsEmpty() ? _securityStorage.LookupById(securityId) : _securityStorage.Lookup(new Security
270270
{
271271
Code = securityId.SecurityCode,
272-
}).FirstOrDefault()) ?? TryCreateSecurity(securityId);
272+
}).FirstOrDefault()) ?? await TryCreateSecurityAsync(securityId, cancellationToken);
273273

274274
if (security == null)
275275
return null;
@@ -295,7 +295,7 @@ private Position GetPosition(SecurityId securityId, string portfolioName, string
295295
};
296296
}
297297

298-
private Security TryCreateSecurity(SecurityId securityId)
298+
private async ValueTask<Security> TryCreateSecurityAsync(SecurityId securityId, CancellationToken cancellationToken)
299299
{
300300
if (securityId.SecurityCode.IsEmpty() || securityId.BoardCode.IsEmpty())
301301
return null;
@@ -307,7 +307,7 @@ private Security TryCreateSecurity(SecurityId securityId)
307307
Board = _exchangeInfoProvider.GetOrCreateBoard(securityId.BoardCode),
308308
};
309309

310-
_securityStorage.Save(security, false);
310+
await _securityStorage.SaveAsync(security, false, cancellationToken);
311311

312312
return security;
313313
}

0 commit comments

Comments
 (0)