Skip to content

Commit a26ce59

Browse files
committed
SubscriptionHolder: fan-out fallback + multi-owner order TxId for Transactions
Fast path unchanged when OrigTxId resolves to a live (Online && !Suspend) sub. Otherwise look up _subscriptionsByOrderId[OrigTxId] for any subs that already saw an Execution carrying this order TxId, and fall through to fan-out across every live OrderStatus sub on the channel. Replaces the last-writer-wins _subscriptionsById[execMsg.TransactionId] slot (the "Many clients can subscribe on the same order id" TODO): when two OrderStatus subs both watched the same order, only the most recent writer received the follow-up Execution. Now every owner does. Tests: 7 new cases, 36 existing pass.
1 parent 66d9b6a commit a26ce59

2 files changed

Lines changed: 223 additions & 9 deletions

File tree

Messages/SubscriptionHolder.cs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class SubscriptionHolder<TSubscription, TSession>(ILogReceiver logs) : Di
5252
private readonly Dictionary<DataType, HashSet<TSubscription>> _subscriptionsByAllSec = [];
5353
private readonly Dictionary<(DataType dt, SecurityId secId), HashSet<TSubscription>> _subscriptionsBySec = [];
5454
private readonly Dictionary<long, TSubscription> _subscriptionsById = [];
55+
private readonly Dictionary<long, HashSet<TSubscription>> _subscriptionsByOrderId = [];
5556
private readonly Dictionary<MessageTypes, HashSet<TSubscription>> _subscriptionsByType = [];
5657
private readonly Dictionary<TSession, HashSet<TSubscription>> _subscriptionsBySession = [];
5758
private readonly Dictionary<long, long> _unsubscribeRequests = [];
@@ -243,6 +244,7 @@ void tryRemove<TKey>(Dictionary<TKey, HashSet<TSubscription>> dict)
243244
tryRemove(_subscriptionsByAllSec);
244245
tryRemove(_subscriptionsBySec);
245246
tryRemove(_subscriptionsByType);
247+
tryRemove(_subscriptionsByOrderId);
246248

247249
if (_subscriptionsBySession.TryGetAndRemove(session, out var sessionSubs))
248250
subscriptions.AddRange(sessionSubs);
@@ -298,6 +300,7 @@ void tryRemove<TKey>(Dictionary<TKey, HashSet<TSubscription>> dict)
298300
tryRemove(_subscriptionsBySec);
299301
tryRemove(_subscriptionsByType);
300302
tryRemove(_subscriptionsBySession);
303+
tryRemove(_subscriptionsByOrderId);
301304

302305
_subscriptionsById.Remove(info.Id);
303306
}
@@ -338,6 +341,7 @@ public void Clear()
338341
{
339342
_subscriptionsByType.Clear();
340343
_subscriptionsById.Clear();
344+
_subscriptionsByOrderId.Clear();
341345
_subscriptionsBySec.Clear();
342346
_subscriptionsByAllSec.Clear();
343347
_subscriptionsBySession.Clear();
@@ -546,25 +550,58 @@ public IEnumerable<TSubscription> GetSubscriptions(Message message)
546550
return GetSubscriptions(execMsg.DataType, execMsg.SecurityId, originTransId);
547551
else if (execMsg.DataType == DataType.Transactions)
548552
{
549-
if (!TryGetById(originTransId, out var subscription))
550-
return [];
551-
552-
if (execMsg.TransactionId != 0)
553+
if (originTransId != 0
554+
&& TryGetById(originTransId, out var subscription)
555+
&& subscription.State == SubscriptionStates.Online
556+
&& !subscription.Suspend)
553557
{
554-
// TODO Many clients can subscribe on the same order id
555-
_rw.EnterWriteLock();
558+
if (execMsg.TransactionId != 0)
559+
{
560+
_rw.EnterWriteLock();
561+
562+
try
563+
{
564+
_subscriptionsByOrderId.SafeAdd(execMsg.TransactionId).Add(subscription);
565+
}
566+
finally
567+
{
568+
_rw.ExitWriteLock();
569+
}
570+
}
571+
572+
return ToSet(subscription, checkSuspend: false);
573+
}
556574

575+
if (originTransId != 0)
576+
{
577+
HashSet<TSubscription> orderOwners;
578+
_rw.EnterReadLock();
557579
try
558580
{
559-
_subscriptionsById[execMsg.TransactionId] = subscription;
581+
orderOwners = _subscriptionsByOrderId.TryGetValue(originTransId, out var set)
582+
? [.. set]
583+
: null;
560584
}
561585
finally
562586
{
563-
_rw.ExitWriteLock();
587+
_rw.ExitReadLock();
588+
}
589+
590+
if (orderOwners is { Count: > 0 })
591+
{
592+
var owners = orderOwners
593+
.Where(s => s.State == SubscriptionStates.Online && !s.Suspend)
594+
.ToArray();
595+
if (owners.Length > 0)
596+
return owners;
564597
}
565598
}
566599

567-
return ToSet(subscription);
600+
return GetSubscriptions(MessageTypes.Execution, 0)
601+
.Where(s => s.DataType == DataType.Transactions
602+
&& s.State == SubscriptionStates.Online
603+
&& !s.Suspend)
604+
.ToArray();
568605
}
569606
else
570607
throw new ArgumentOutOfRangeException(execMsg.DataType.To<string>());

Tests/SubscriptionHolderTests.cs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,4 +573,181 @@ public void GetSubscriptions_BySession_IncludesIdZero()
573573
}
574574

575575
#endregion
576+
577+
#region GetSubscriptions(Message) — Transactions Execution targeting
578+
579+
private static TestSubscription CreateOrderStatusSub(long id, string session, SubscriptionStates state)
580+
=> new()
581+
{
582+
Id = id,
583+
Session = session,
584+
DataType = DataType.Transactions,
585+
State = state,
586+
Responses = [MessageTypes.Execution],
587+
};
588+
589+
[TestMethod]
590+
public void GetSubscriptions_TransactionsExecution_OrigTxIdNonZero_OnlineSub_ReturnsOne()
591+
{
592+
using var holder = CreateHolder();
593+
holder.Add(CreateOrderStatusSub(100, "sessionA", SubscriptionStates.Online));
594+
holder.Add(CreateOrderStatusSub(101, "sessionB", SubscriptionStates.Online));
595+
596+
var exec = new ExecutionMessage
597+
{
598+
DataTypeEx = DataType.Transactions,
599+
OriginalTransactionId = 100,
600+
OrderState = OrderStates.Active,
601+
};
602+
603+
var matched = holder.GetSubscriptions(exec).ToArray();
604+
605+
matched.Length.AssertEqual(1);
606+
matched[0].Id.AssertEqual(100);
607+
}
608+
609+
[TestMethod]
610+
public void GetSubscriptions_TransactionsExecution_OrigTxIdNonZero_StoppedSub_FansOutToOnlineSiblings()
611+
{
612+
using var holder = CreateHolder();
613+
holder.Add(CreateOrderStatusSub(200, "sessionA", SubscriptionStates.Stopped));
614+
holder.Add(CreateOrderStatusSub(201, "sessionB", SubscriptionStates.Online));
615+
616+
var exec = new ExecutionMessage
617+
{
618+
DataTypeEx = DataType.Transactions,
619+
OriginalTransactionId = 200,
620+
OrderState = OrderStates.Done,
621+
};
622+
623+
var matched = holder.GetSubscriptions(exec).ToArray();
624+
625+
matched.Length.AssertEqual(1);
626+
matched[0].Id.AssertEqual(201);
627+
}
628+
629+
[TestMethod]
630+
public void GetSubscriptions_TransactionsExecution_OrigTxIdNonZero_Unknown_FansOutToAllOnline()
631+
{
632+
using var holder = CreateHolder();
633+
holder.Add(CreateOrderStatusSub(300, "sessionA", SubscriptionStates.Online));
634+
holder.Add(CreateOrderStatusSub(301, "sessionB", SubscriptionStates.Online));
635+
636+
var exec = new ExecutionMessage
637+
{
638+
DataTypeEx = DataType.Transactions,
639+
OriginalTransactionId = 99999,
640+
};
641+
642+
var matched = holder.GetSubscriptions(exec).ToArray();
643+
644+
matched.Length.AssertEqual(2);
645+
matched.Select(s => s.Id).OrderBy(i => i).SequenceEqual([300L, 301L]).AssertTrue();
646+
}
647+
648+
[TestMethod]
649+
public void GetSubscriptions_TransactionsExecution_OrigTxIdZero_FansOutToAllOnline()
650+
{
651+
using var holder = CreateHolder();
652+
holder.Add(CreateOrderStatusSub(400, "sessionA", SubscriptionStates.Online));
653+
holder.Add(CreateOrderStatusSub(401, "sessionB", SubscriptionStates.Online));
654+
holder.Add(CreateOrderStatusSub(402, "sessionC", SubscriptionStates.Stopped));
655+
656+
var exec = new ExecutionMessage
657+
{
658+
DataTypeEx = DataType.Transactions,
659+
OriginalTransactionId = 0,
660+
};
661+
662+
var matched = holder.GetSubscriptions(exec).ToArray();
663+
664+
matched.Length.AssertEqual(2);
665+
matched.Select(s => s.Id).OrderBy(i => i).SequenceEqual([400L, 401L]).AssertTrue();
666+
}
667+
668+
[TestMethod]
669+
public void GetSubscriptions_TransactionsExecution_OrigTxIdZero_SkipsSuspended()
670+
{
671+
using var holder = CreateHolder();
672+
var suspended = CreateOrderStatusSub(500, "sessionA", SubscriptionStates.Online);
673+
suspended.Suspend = true;
674+
holder.Add(suspended);
675+
holder.Add(CreateOrderStatusSub(501, "sessionB", SubscriptionStates.Online));
676+
677+
var exec = new ExecutionMessage
678+
{
679+
DataTypeEx = DataType.Transactions,
680+
OriginalTransactionId = 0,
681+
};
682+
683+
var matched = holder.GetSubscriptions(exec).ToArray();
684+
685+
matched.Length.AssertEqual(1);
686+
matched[0].Id.AssertEqual(501);
687+
}
688+
689+
[TestMethod]
690+
public void GetSubscriptions_TransactionsExecution_FollowUpByExecutionTxId_ReachesAllOwners()
691+
{
692+
using var holder = CreateHolder();
693+
var subA = CreateOrderStatusSub(700, "sessionA", SubscriptionStates.Online);
694+
var subB = CreateOrderStatusSub(701, "sessionB", SubscriptionStates.Online);
695+
holder.Add(subA);
696+
holder.Add(subB);
697+
698+
const long orderTxId = 12345L;
699+
700+
holder.GetSubscriptions(new ExecutionMessage
701+
{
702+
DataTypeEx = DataType.Transactions,
703+
OriginalTransactionId = 700,
704+
TransactionId = orderTxId,
705+
}).Count().AssertEqual(1);
706+
707+
holder.GetSubscriptions(new ExecutionMessage
708+
{
709+
DataTypeEx = DataType.Transactions,
710+
OriginalTransactionId = 701,
711+
TransactionId = orderTxId,
712+
}).Count().AssertEqual(1);
713+
714+
var matched = holder.GetSubscriptions(new ExecutionMessage
715+
{
716+
DataTypeEx = DataType.Transactions,
717+
OriginalTransactionId = orderTxId,
718+
TradeId = 99,
719+
}).ToArray();
720+
721+
matched.Length.AssertEqual(2);
722+
matched.Select(s => s.Id).OrderBy(i => i).SequenceEqual([700L, 701L]).AssertTrue();
723+
}
724+
725+
[TestMethod]
726+
public void GetSubscriptions_TransactionsExecution_OrigTxIdZero_FiltersByDataType()
727+
{
728+
using var holder = CreateHolder();
729+
holder.Add(CreateOrderStatusSub(600, "sessionA", SubscriptionStates.Online));
730+
var ticks = new TestSubscription
731+
{
732+
Id = 601,
733+
Session = "sessionB",
734+
DataType = DataType.Ticks,
735+
State = SubscriptionStates.Online,
736+
Responses = [MessageTypes.Execution],
737+
};
738+
holder.Add(ticks);
739+
740+
var exec = new ExecutionMessage
741+
{
742+
DataTypeEx = DataType.Transactions,
743+
OriginalTransactionId = 0,
744+
};
745+
746+
var matched = holder.GetSubscriptions(exec).ToArray();
747+
748+
matched.Length.AssertEqual(1);
749+
matched[0].Id.AssertEqual(600);
750+
}
751+
752+
#endregion
576753
}

0 commit comments

Comments
 (0)