Skip to content

Commit b5abd7e

Browse files
committed
Algo.Testing: replay registered generators in the history market data manager
The replay loop only streamed storage-backed data; registered market data generators were tracked in the manager but never driven, so a subscription served by a generator produced nothing. Drive the generators from the replay clock: a time signal advances every generator, the replayed message feeds the generators of its own security, and each generated message is chained into the other same-security generators, so a generated tick drives that security's order-book generator. Generated messages are interleaved into the stream at their server time, preserving the ascending-time ordering; the generator-less replay path keeps its original allocation-free fast path. Turns the previously-failing TimeOrdering_FullMix tracking test green: generated ticks and order books are now produced alongside the historical data.
1 parent f1c0946 commit b5abd7e

1 file changed

Lines changed: 73 additions & 2 deletions

File tree

Algo.Testing/HistoryMarketDataManager.cs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ namespace StockSharp.Algo.Testing;
88
/// </summary>
99
public class HistoryMarketDataManager : Disposable, IHistoryMarketDataManager
1010
{
11+
private const int _maxGeneratedPerStep = 1000;
12+
1113
private readonly Dictionary<(SecurityId secId, DataType dataType), (MarketDataGenerator generator, long transId)> _generators = [];
1214
private readonly List<(IMarketDataStorage storage, long subscriptionId)> _actions = [];
1315
private readonly AutoResetEvent _syncRoot = new(false);
@@ -274,11 +276,24 @@ async IAsyncEnumerable<Message> Impl([EnumeratorCancellation] CancellationToken
274276

275277
await foreach (var msg in FilterMessages(source, startDateTime, stopDateTime, currentTime).WithCancellation(cancellationToken))
276278
{
277-
LoadedMessageCount++;
278-
279279
if (msg.TryGetServerTime(out var serverTime))
280+
{
280281
_currentTime = serverTime;
281282

283+
// Drive the registered generators up to the current replay time and
284+
// interleave their output ahead of the triggering message (same time).
285+
if (_generators.Count > 0)
286+
{
287+
foreach (var generated in DriveGenerators(msg, serverTime))
288+
{
289+
LoadedMessageCount++;
290+
yield return generated;
291+
}
292+
}
293+
}
294+
295+
LoadedMessageCount++;
296+
282297
yield return msg;
283298

284299
if (_isChanged)
@@ -363,6 +378,62 @@ private static async IAsyncEnumerable<Message> FilterMessages(
363378
}
364379
}
365380

381+
/// <summary>
382+
/// Drives the registered generators with the replay clock and same-security data, returning any
383+
/// generated messages (interleaved by the caller at the same server time). A time signal advances
384+
/// time-based generation for every generator; the triggering message feeds the generators of its own
385+
/// security, and each generated message is chained into the other same-security generators, so a
386+
/// generated tick drives that security's order-book generator. Generators ignore their own output
387+
/// type, so the chain terminates.
388+
/// </summary>
389+
/// <param name="trigger">The replayed message that advanced the clock.</param>
390+
/// <param name="time">The current replay server time.</param>
391+
/// <returns>The generated messages, if any.</returns>
392+
private IEnumerable<Message> DriveGenerators(Message trigger, DateTime time)
393+
{
394+
if (_generators.Count == 0)
395+
yield break;
396+
397+
var generators = _generators.Values.Select(v => v.generator).ToArray();
398+
399+
var feeds = new Queue<Message>();
400+
feeds.Enqueue(new TimeMessage { ServerTime = time, LocalTime = time });
401+
402+
if (trigger.Type != MessageTypes.Time && trigger is ISecurityIdMessage)
403+
feeds.Enqueue(trigger);
404+
405+
var produced = 0;
406+
407+
while (feeds.Count > 0)
408+
{
409+
var feed = feeds.Dequeue();
410+
var isTime = feed.Type == MessageTypes.Time;
411+
var feedSecId = (feed as ISecurityIdMessage)?.SecurityId;
412+
413+
foreach (var generator in generators)
414+
{
415+
// A security-bearing feed only drives generators of the same security; a time signal
416+
// drives all of them.
417+
if (!isTime && feedSecId != generator.SecurityId)
418+
continue;
419+
420+
var output = generator.Process(feed);
421+
422+
if (output is null)
423+
continue;
424+
425+
output.LocalTime = time;
426+
yield return output;
427+
428+
// Chain the generated message into the other same-security generators.
429+
feeds.Enqueue(output);
430+
431+
if (++produced > _maxGeneratedPerStep)
432+
yield break;
433+
}
434+
}
435+
}
436+
366437
private IEnumerable<TimeMessage> GetSimpleTimeLine(BoardMessage[] boards, DateTime date)
367438
{
368439
foreach (var msg in _timeLineGenerator.GetSimpleTimeLine(boards, date, MarketTimeChangedInterval))

0 commit comments

Comments
 (0)