diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index 4f4ddf9a..4969eaed 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -30,10 +30,11 @@ public sealed class DeadlockTortureTest { private const int ItemCount = 200; private const int Iterations = 50; - private const int TimeoutSeconds = 15; + private const int TimeoutSeconds = 60; private static async Task RunBidirectionalDeadlockTest( Func>, IObservable>> pipeline, + Action? subjectPusher = null, int iterations = Iterations) { for (var iter = 0; iter < iterations; iter++) @@ -44,11 +45,13 @@ private static async Task RunBidirectionalDeadlockTest( using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB); using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA); - using var barrier = new Barrier(2); + var participants = subjectPusher is null ? 2 : 3; + using var barrier = new Barrier(participants); var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + var taskC = subjectPusher is null ? null : Task.Run(() => { barrier.SignalAndWait(); subjectPusher(); }); - var completed = Task.WhenAll(taskA, taskB); + var completed = taskC is null ? Task.WhenAll(taskA, taskB) : Task.WhenAll(taskA, taskB, taskC); if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed) return false; } @@ -64,26 +67,116 @@ [Fact] public async Task AutoRefresh_DoesNotDeadlock() => [Fact] public async Task GroupOn_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue(); + [Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey))).Should().BeTrue(); + + [Fact] public async Task GroupOnWithRegrouper_DoesNotDeadlock() + { + using var regrouper = new System.Reactive.Subjects.Subject(); + (await RunBidirectionalDeadlockTest( + s => s.Group(p => p.Age % 3, regrouper).MergeMany(g => g.Cache.Connect()), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) regrouper.OnNext(System.Reactive.Unit.Default); })).Should().BeTrue(); + } + + [Fact] public async Task GroupOnDynamicSelector_DoesNotDeadlock() + { + using var selector = new BehaviorSubject>((p, _) => p.Age % 3); + using var regrouper = new System.Reactive.Subjects.Subject(); + (await RunBidirectionalDeadlockTest( + s => s.Group(selector, regrouper).MergeMany(g => g.Cache.Connect()), + subjectPusher: () => + { + for (var j = 0; j < ItemCount; j++) + { + selector.OnNext((p, _) => p.Age % (2 + (j % 4))); + regrouper.OnNext(System.Reactive.Unit.Default); + } + })).Should().BeTrue(); + } + + [Fact] public async Task TransformAsyncWithForce_DoesNotDeadlock() + { + using var force = new System.Reactive.Subjects.Subject>(); + (await RunBidirectionalDeadlockTest( + s => s.TransformAsync(p => Task.FromResult(new Person("T-" + p.Name, p.Age)), force), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (_, _) => true); })).Should().BeTrue(); + } + [Fact] public async Task Page_DoesNotDeadlock() { using var req = new BehaviorSubject(new PageRequest(1, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task SortAndPage_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new PageRequest(1, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); } [Fact] public async Task Virtualise_DoesNotDeadlock() { using var req = new BehaviorSubject(new VirtualRequest(0, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task SortAndVirtualize_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new VirtualRequest(0, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task QueryWhenChanged_DoesNotDeadlock() + { + for (var iter = 0; iter < Iterations; iter++) + { + using var sourceA = new SourceCache(p => p.UniqueKey); + using var sourceB = new SourceCache(p => p.UniqueKey); + + // QueryWhenChanged with an itemChangedTrigger exercises the Merge branch. + // A side-channel write into the other cache closes the same ABBA cycle that + // PopulateInto would close for changeset-shaped operators. + using var aToB = sourceA.Connect() + .Filter(p => p.Name.StartsWith("A")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceB.AddOrUpdate(new Person("A-marker", 0))); + using var bToA = sourceB.Connect() + .Filter(p => p.Name.StartsWith("B")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceA.AddOrUpdate(new Person("B-marker", 0))); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } } [Fact] public async Task TransformWithForce_DoesNotDeadlock() { using var force = new Subject>(); - (await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (p, _) => true); })).Should().BeTrue(); } - [Fact] public async Task BatchIf_DoesNotDeadlock() => - (await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null))).Should().BeTrue(); + [Fact] public async Task BatchIf_DoesNotDeadlock() + { + using var pause = new BehaviorSubject(false); + (await RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); })).Should().BeTrue(); + } [Fact] public async Task DisposeMany_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue(); @@ -94,31 +187,65 @@ [Fact] public async Task OnItemRemoved_DoesNotDeadlock() => [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); + using var virtReq = new BehaviorSubject(new VirtualRequest(0, 100)); using var force = new Subject>(); (await RunBidirectionalDeadlockTest( - s => s.AutoRefresh(p => p.Age) + s => s.GroupWithImmutableState(p => p.Age % 3) + .TransformMany(g => g.Items, p => p.UniqueKey) + .AutoRefresh(p => p.Age) .Filter(p => p.Age >= 0) .Transform((p, k) => new Person("X-" + p.Name, p.Age), force) .OnItemRemoved(_ => { }) .DisposeMany() .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .Virtualise(virtReq) .Page(pageReq), + subjectPusher: () => + { + for (var j = 0; j < ItemCount; j++) + { + force.OnNext(static (p, _) => true); + pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50)); + virtReq.OnNext(new VirtualRequest(j * 5, 50 + (j % 4) * 50)); + } + }, iterations: Iterations * 2)).Should().BeTrue(); } [Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 50)); + using var pageReq2 = new BehaviorSubject(new PageRequest(1, 50)); using var virtReq = new BehaviorSubject(new VirtualRequest(0, 50)); + using var virtReq2 = new BehaviorSubject(new VirtualRequest(0, 50)); + using var pause = new BehaviorSubject(false); var results = await Task.WhenAll( - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), 30), - RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30), - RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30), - RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30), - RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), 30), - RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null), 30)); + RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), iterations: 30), + RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), iterations: 30), + RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), iterations: 30), + RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), iterations: 30), + RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), iterations: 30), + RunBidirectionalDeadlockTest(s => s.DisposeMany(), iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), pageReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq2.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), virtReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq2.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); }, + iterations: 30)); results.Should().AllSatisfy(r => r.Should().BeTrue()); } @@ -145,4 +272,38 @@ [Fact] public async Task ThreeWayCircular_DoesNotDeadlock() (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); } } + + [Fact] public async Task TransformToTree_DoesNotDeadlock() + { + // Exercises TreeBuilder.cs:200 (_predicateChanged.SynchronizeSafe(queue).UnsynchronizedCombineLatest + // (reFilterObservable.SynchronizeSafe(queue), ...)). Cross-cache cycle is closed via a side-channel + // Subscribe that writes a marker into the other cache for every tree changeset. + for (var iter = 0; iter < Iterations; iter++) + { + using var sourceA = new SourceCache(p => p.UniqueKey); + using var sourceB = new SourceCache(p => p.UniqueKey); + + // The pivotOn function returns the parent's key (or the item's own key for roots). Half the + // items become children of "A-{iter}-0" / "B-{iter}-0", populating the inner tree structure. + using var aToB = sourceA.Connect() + .TransformToTree(p => p.Age == 0 ? p.UniqueKey : "A-" + iter + "-0") + .Subscribe(_ => sourceB.AddOrUpdate(new Person("from-a-tree-" + iter, 0))); + using var bToA = sourceB.Connect() + .TransformToTree(p => p.Age == 0 ? p.UniqueKey : "B-" + iter + "-0") + .Subscribe(_ => sourceA.AddOrUpdate(new Person("from-b-tree-" + iter, 0))); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } + } + + [Fact] public async Task Switch_DoesNotDeadlock() => + // Exercises the refactored Switch.cs (SerialDisposable + UnsynchronizedMerge of destination.Connect() + // and the errors subject). Observable.Return(s).Switch() drives exactly one outer notification, which + // is enough to wire up the destination cache and exercise the gate-free merge on every inner change. + (await RunBidirectionalDeadlockTest(s => System.Reactive.Linq.Observable.Return(s).Switch())).Should().BeTrue(); } diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs new file mode 100644 index 00000000..fe67508a --- /dev/null +++ b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs @@ -0,0 +1,232 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Verifies the Rx Merge-compatible terminal semantics and the queue's serialization guarantee +/// for concurrent producers. +/// +public sealed class DeliveryQueueMergeFixture +{ + [Fact] + public void OnNext_FromAllSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var received = new List(); + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + c.OnNext(3); + a.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterEverySourceCompletes() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse(); + + b.OnCompleted(); + completed.Should().BeFalse(); + + c.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void OnError_AfterFirstError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first); + } + + [Fact] + public void OnCompleted_AfterError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void SynchronousTerminal_AtSubscribe_IsCountedTowardCompletion() + { + var immediate = Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse(); + live.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_AtSubscribe_PropagatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public async Task ConcurrentOnNext_FromManyProducers_IsSerializedToObserver() + { + // The queue's contract is that the downstream observer never sees concurrent OnNext calls, + // regardless of how many producers are racing on the inputs. Subscribe to two sources via + // two concurrent tasks, push interleaved items, and verify that no two OnNext calls overlap + // and every item is delivered exactly once. + const int itemsPerProducer = 1_000; + + using var a = new Subject(); + using var b = new Subject(); + + var inFlight = 0; + var maxInFlight = 0; + var received = new ConcurrentQueue(); + + using var sub = a.DeliveryQueueMerge(b).Subscribe(v => + { + var now = Interlocked.Increment(ref inFlight); + var prev = Volatile.Read(ref maxInFlight); + while (now > prev && Interlocked.CompareExchange(ref maxInFlight, now, prev) != prev) + { + prev = Volatile.Read(ref maxInFlight); + } + received.Enqueue(v); + Interlocked.Decrement(ref inFlight); + }); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) a.OnNext(i); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) b.OnNext(itemsPerProducer + i); }); + + await Task.WhenAll(taskA, taskB); + + received.Count.Should().Be(itemsPerProducer * 2); + maxInFlight.Should().Be(1, "concurrent OnNext to the observer must be serialized by the queue"); + + var expected = Enumerable.Range(0, itemsPerProducer * 2).ToHashSet(); + received.Should().BeEquivalentTo(expected); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.DeliveryQueueMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void Dispose_StopsForwardingFromAnySource() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + var sub = a.DeliveryQueueMerge(b).Subscribe(received.Add); + + a.OnNext(1); + sub.Dispose(); + a.OnNext(2); + b.OnNext(3); + + received.Should().Equal(1); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.DeliveryQueueMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData.Tests/Internal/UnsynchronizedCombineLatestFixture.cs b/src/DynamicData.Tests/Internal/UnsynchronizedCombineLatestFixture.cs new file mode 100644 index 00000000..25693721 --- /dev/null +++ b/src/DynamicData.Tests/Internal/UnsynchronizedCombineLatestFixture.cs @@ -0,0 +1,217 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Reactive.Subjects; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Covers the contract the helper has to honour as a drop-in +/// replacement: emits only after both sources have produced at least one value, then on every subsequent OnNext from either side; first error terminates; +/// completes only after both sources complete. +/// +public sealed class UnsynchronizedCombineLatestFixture +{ + [Fact] + public void OnNext_DoesNotEmit_UntilBothSourcesHaveProduced() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(received.Add); + + a.OnNext(1); + received.Should().BeEmpty("only the first source has produced"); + + a.OnNext(2); + received.Should().BeEmpty("the second source still has not produced"); + + b.OnNext("first"); + received.Should().Equal("2:first"); + } + + [Fact] + public void OnNext_AfterBothHaveProduced_EmitsOnEverySubsequentValue() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(received.Add); + + a.OnNext(1); + b.OnNext("x"); + a.OnNext(2); + b.OnNext("y"); + b.OnNext("z"); + a.OnNext(3); + + received.Should().Equal("1:x", "2:x", "2:y", "2:z", "3:z"); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterBothSourcesComplete() + { + using var a = new Subject(); + using var b = new Subject(); + + var completed = false; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse("the second source is still live"); + + b.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException("first"); + b.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("OnCompleted must not fire after OnError"); + } + + [Fact] + public void OnError_AfterFirstError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first, "first error wins; subsequent errors from other sources must be dropped"); + } + + [Fact] + public void OnNext_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + Exception? captured = null; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(received.Add, e => captured = e); + + a.OnNext(1); + b.OnNext("x"); + received.Should().Equal("1:x"); + + var error = new InvalidOperationException(); + a.OnError(error); + + a.OnNext(2); + b.OnNext("y"); + received.Should().Equal(new[] { "1:x" }, "no further OnNext must arrive after OnError has fired"); + captured.Should().BeSameAs(error); + } + + [Fact] + public void OnCompleted_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("a late OnCompleted from a surviving source must not arrive after OnError"); + } + + [Fact] + public void ResultSelector_ReceivesMostRecentValueFromEachSource() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => x * 10 + y).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + received.Should().Equal(12); + + a.OnNext(3); + received.Should().Equal(new[] { 12, 32 }, "the second source's most recent value (2) must still be in effect"); + + b.OnNext(4); + received.Should().Equal(12, 32, 34); + } + + [Fact] + public void SynchronousValues_AtSubscribeTime_AreCombinedCorrectly() + { + // Behaviour subjects deliver their initial value synchronously at Subscribe time. + // The helper must capture the first source's value before subscribing to the second, + // and immediately emit when the second source's initial value arrives. + using var a = new System.Reactive.Subjects.BehaviorSubject(7); + using var b = new System.Reactive.Subjects.BehaviorSubject(11); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => x + y).Subscribe(received.Add); + + received.Should().Equal(new[] { 18 }, "both subjects delivered synchronously at subscribe time"); + } + + [Fact] + public void SynchronousCompletion_BeforeOther_StillCompletesOnlyAfterBoth() + { + var immediate = System.Reactive.Linq.Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.UnsynchronizedCombineLatest(live, (x, y) => x + y).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse("the live source has not completed yet"); + + live.OnCompleted(); + + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_BeforeOther_TerminatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = System.Reactive.Linq.Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.UnsynchronizedCombineLatest(live, (x, y) => x + y).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } +} diff --git a/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs new file mode 100644 index 00000000..85b95b8e --- /dev/null +++ b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs @@ -0,0 +1,175 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Reactive.Subjects; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Covers the contract the helper has to honour as a drop-in +/// replacement: subscription order, all-must-complete OnCompleted, first-error-wins OnError, and synchronous terminal +/// notifications. +/// +public sealed class UnsynchronizedMergeFixture +{ + [Fact] + public void OnNext_FromBothSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedMerge(b).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + a.OnNext(3); + b.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterAllSourcesComplete() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse("a single source completion must not terminate the merged stream"); + + b.OnCompleted(); + completed.Should().BeFalse("two of three completions still leave one source live"); + + c.OnCompleted(); + completed.Should().BeTrue("after every source has completed the merged stream must emit OnCompleted"); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException("first"); + b.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("OnCompleted must not fire after OnError"); + } + + [Fact] + public void OnError_AfterFirstError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first, "first error wins; subsequent errors from other sources must be dropped"); + } + + [Fact] + public void OnCompleted_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("a late OnCompleted from a surviving source must not arrive after OnError has fired"); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.UnsynchronizedMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void SynchronousTerminal_BeforeOtherSourcesSubscribe_IsHandled() + { + // A source that completes synchronously at subscribe time decrements the pending counter immediately. + // If the helper miscounted, the merged stream would either complete prematurely or never complete. + var immediate = System.Reactive.Linq.Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse("the live source has not completed yet"); + + live.OnCompleted(); + + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_BeforeOtherSourcesSubscribe_TerminatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = System.Reactive.Linq.Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + // Boundary: zero entries in the params array. Behaviour must mirror Observable.Merge over a single source. + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.UnsynchronizedMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData/Cache/Internal/AutoRefresh.cs b/src/DynamicData/Cache/Internal/AutoRefresh.cs index ee81fe58..f1f17d67 100644 --- a/src/DynamicData/Cache/Internal/AutoRefresh.cs +++ b/src/DynamicData/Cache/Internal/AutoRefresh.cs @@ -32,9 +32,8 @@ public IObservable> Run() => Observable.Create list.Count > 0).Select(items => new ChangeSet(items)); // publish refreshes and underlying changes - var queue = new SharedDeliveryQueue(); - var publisher = shared.SynchronizeSafe(queue).Merge(refreshChanges.SynchronizeSafe(queue)).SubscribeSafe(observer); + var publisher = shared.DeliveryQueueMerge(refreshChanges).SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect(), queue); + return new CompositeDisposable(publisher, shared.Connect()); }); } diff --git a/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs b/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs index f65df1b9..17afafeb 100644 --- a/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs +++ b/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs @@ -17,35 +17,35 @@ internal sealed class EditDiffChangeSetOptional(IObservable _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); public IObservable> Run() => Observable.Create>(observer => - { - var previous = Optional.None(); + { + var previous = Optional.None(); - return _source.Synchronize().Subscribe( - nextValue => - { - var current = nextValue.Convert(val => new ValueContainer(val, _keySelector(val))); + return _source.Subscribe( + nextValue => + { + var current = nextValue.Convert(val => new ValueContainer(val, _keySelector(val))); - // Determine the changes - var changes = (previous.HasValue, current.HasValue) switch - { - (true, true) => CreateUpdateChanges(previous.Value, current.Value), - (false, true) => [new Change(ChangeReason.Add, current.Value.Key, current.Value.Object)], - (true, false) => [new Change(ChangeReason.Remove, previous.Value.Key, previous.Value.Object)], - (false, false) => [], - }; + // Determine the changes + var changes = (previous.HasValue, current.HasValue) switch + { + (true, true) => CreateUpdateChanges(previous.Value, current.Value), + (false, true) => [new Change(ChangeReason.Add, current.Value.Key, current.Value.Object)], + (true, false) => [new Change(ChangeReason.Remove, previous.Value.Key, previous.Value.Object)], + (false, false) => [], + }; - // Save the value for the next round - previous = current; + // Save the value for the next round + previous = current; - // If there are changes, emit as a ChangeSet - if (changes.Length > 0) - { - observer.OnNext(new ChangeSet(changes)); - } - }, - observer.OnError, - observer.OnCompleted); - }); + // If there are changes, emit as a ChangeSet + if (changes.Length > 0) + { + observer.OnNext(new ChangeSet(changes)); + } + }, + observer.OnError, + observer.OnCompleted); + }); private Change[] CreateUpdateChanges(in ValueContainer prev, in ValueContainer curr) { diff --git a/src/DynamicData/Cache/Internal/GroupOn.cs b/src/DynamicData/Cache/Internal/GroupOn.cs index fd0936fe..546e2e8f 100644 --- a/src/DynamicData/Cache/Internal/GroupOn.cs +++ b/src/DynamicData/Cache/Internal/GroupOn.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class GroupOn(IObservable> source, Func groupSelectorKey, IObservable? regrouper) @@ -29,7 +31,7 @@ public IObservable> Run() => Observabl var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); - var published = groups.Merge(regroup).Publish(); + var published = groups.UnsynchronizedMerge(regroup).Publish(); var subscriber = published.SubscribeSafe(observer); var disposer = published.DisposeMany().Subscribe(); diff --git a/src/DynamicData/Cache/Internal/GroupOnDynamic.cs b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs index 0e11bc59..91efad22 100644 --- a/src/DynamicData/Cache/Internal/GroupOnDynamic.cs +++ b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class GroupOnDynamic(IObservable> source, IObservable> selectGroupObservable, IObservable? regrouper = null) @@ -71,8 +73,11 @@ public IObservable> Run() => Observabl }, onError: observer.OnError); - // Create an observable that completes when all 3 inputs complete so the downstream can be completed as well - var subOnComplete = Observable.Merge(sharedSource.ToUnit(), sharedGroupSelector.ToUnit(), sharedRegrouper) + // All three inputs are routed through the same SharedDeliveryQueue so their notifications + // are already serialized; the merge is only here to coalesce their completion into a single + // downstream OnCompleted. UnsynchronizedMerge avoids the ABBA-prone gate that Observable.Merge + // would hold across the downstream observer.OnCompleted/observer.OnError call. + var subOnComplete = sharedSource.ToUnit().UnsynchronizedMerge(sharedGroupSelector.ToUnit(), sharedRegrouper) .IgnoreElements() .SubscribeSafe(observer.OnError, observer.OnCompleted); diff --git a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs index ba33db03..fc6f3b16 100644 --- a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs @@ -29,7 +29,7 @@ public IObservable> Run() => var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); - return new CompositeDisposable(groups.Merge(regroup).SubscribeSafe(observer), queue); + return new CompositeDisposable(groups.UnsynchronizedMerge(regroup).SubscribeSafe(observer), queue); }); private sealed class Grouper(Func groupSelectorKey) diff --git a/src/DynamicData/Cache/Internal/Page.cs b/src/DynamicData/Cache/Internal/Page.cs index e4159587..2d91ac29 100644 --- a/src/DynamicData/Cache/Internal/Page.cs +++ b/src/DynamicData/Cache/Internal/Page.cs @@ -19,7 +19,7 @@ public IObservable> Run() => Observable.Create updates is not null) .Select(x => x!) .SubscribeSafe(observer), queue); diff --git a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs index 01828e85..7e976424 100644 --- a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs +++ b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs @@ -49,7 +49,7 @@ public IObservable> Run() return cache; }).Select(list => new AnonymousQuery(list)); - return new CompositeDisposable(sourceChanged.Merge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); + return new CompositeDisposable(sourceChanged.UnsynchronizedMerge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); }); } } diff --git a/src/DynamicData/Cache/Internal/Sort.cs b/src/DynamicData/Cache/Internal/Sort.cs index 11b39035..6afb9632 100644 --- a/src/DynamicData/Cache/Internal/Sort.cs +++ b/src/DynamicData/Cache/Internal/Sort.cs @@ -57,7 +57,7 @@ public IObservable> Run() => Observable.Create result is not null).Select(x => x!).SubscribeSafe(observer), queue); + return new CompositeDisposable(comparerChanged.UnsynchronizedMerge(dataChanged, sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue); }); private sealed class Sorter(SortOptimisations optimisations, IComparer? comparer = null, int resetThreshold = -1) diff --git a/src/DynamicData/Cache/Internal/SortAndPage.cs b/src/DynamicData/Cache/Internal/SortAndPage.cs index 2d612a9b..02ef996d 100644 --- a/src/DynamicData/Cache/Internal/SortAndPage.cs +++ b/src/DynamicData/Cache/Internal/SortAndPage.cs @@ -111,10 +111,10 @@ public IObservable>> Run() => return ApplyPagedChanges(changes); }); - return new CompositeDisposable(Observable.Merge( - comparerChanged.Skip(1), - paramsChanged.Where(changes => changes.Count is not 0), - dataChange.Where(changes => changes.Count is not 0)) + return new CompositeDisposable(comparerChanged.Skip(1) + .UnsynchronizedMerge( + paramsChanged.Where(changes => changes.Count is not 0), + dataChange.Where(changes => changes.Count is not 0)) .SubscribeSafe(observer), queue); ChangeSet> ApplyPagedChanges(IChangeSet? changeSet = null) diff --git a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs index 44c6d3dc..0e6ae550 100644 --- a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs +++ b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs @@ -113,8 +113,7 @@ public IObservable>> Run() => return new CompositeDisposable( comparerChanged - .Merge(paramsChanged) - .Merge(dataChange) + .UnsynchronizedMerge(paramsChanged, dataChange) .Where(changes => changes.Count is not 0) .SubscribeSafe(observer), queue); diff --git a/src/DynamicData/Cache/Internal/Switch.cs b/src/DynamicData/Cache/Internal/Switch.cs index 766f9f14..d5c2302d 100644 --- a/src/DynamicData/Cache/Internal/Switch.cs +++ b/src/DynamicData/Cache/Internal/Switch.cs @@ -6,6 +6,8 @@ using System.Reactive.Linq; using System.Reactive.Subjects; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class Switch(IObservable>> sources) @@ -18,29 +20,35 @@ public IObservable> Run() => Observable.Create { var queue = new SharedDeliveryQueue(); - var destination = new LockFreeObservableCache(); - var errors = new Subject>(); - - var populator = Observable.Switch( - _sources - .SynchronizeSafe(queue) - .Do(onNext: _ => destination.Clear(), - onError: error => errors.OnError(error))) + var innerSubscription = new SerialDisposable(); + + // The outer (sources) and every inner are routed through the same SharedDeliveryQueue. + // Both the per-source clear and the per-changeset destination write happen on the drain + // thread, so destination.Connect() emissions and any errors.OnError calls also originate + // from inside the drain. The downstream merge therefore sees pre-serialized inputs and + // uses UnsynchronizedMerge to avoid the ABBA-prone Observable.Merge gate. + var sourcesSubscription = _sources .SynchronizeSafe(queue) - .Do(onNext: static _ => { }, - onError: error => errors.OnError(error)) - .PopulateInto(destination); + .SubscribeSafe( + onNext: newSource => + { + destination.Clear(); + innerSubscription.Disposable = newSource + .SynchronizeSafe(queue) + .SubscribeSafe( + onNext: changes => destination.Edit(updater => updater.Clone(changes)), + onError: errors.OnError); + }, + onError: errors.OnError); return new CompositeDisposable( destination, errors, - populator, - destination - .Connect() - .Merge(errors) - .SubscribeSafe(observer), + sourcesSubscription, + innerSubscription, + destination.Connect().UnsynchronizedMerge(errors).SubscribeSafe(observer), queue); }); } diff --git a/src/DynamicData/Cache/Internal/TransformAsync.cs b/src/DynamicData/Cache/Internal/TransformAsync.cs index 73b6f62a..66918d9e 100644 --- a/src/DynamicData/Cache/Internal/TransformAsync.cs +++ b/src/DynamicData/Cache/Internal/TransformAsync.cs @@ -6,6 +6,8 @@ using System.Reactive.Linq; using System.Reactive.Threading.Tasks; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal class TransformAsync( @@ -32,7 +34,7 @@ public IObservable> Run() => var forced = forceTransform.SynchronizeSafe(queue) .Select(shouldTransform => DoTransform(cache, shouldTransform)).Concat(); - transformer = transformer.SynchronizeSafe(queue).Merge(forced); + transformer = transformer.SynchronizeSafe(queue).UnsynchronizedMerge(forced); return new CompositeDisposable(transformer.SubscribeSafe(observer), queue); } diff --git a/src/DynamicData/Cache/Internal/TransformMany.cs b/src/DynamicData/Cache/Internal/TransformMany.cs index f38853c0..a356d09c 100644 --- a/src/DynamicData/Cache/Internal/TransformMany.cs +++ b/src/DynamicData/Cache/Internal/TransformMany.cs @@ -8,6 +8,7 @@ using System.Reactive.Linq; using DynamicData.Binding; +using DynamicData.Internal; namespace DynamicData.Cache.Internal; @@ -122,7 +123,7 @@ private IObservable> CreateWithChangeS var subsequent = transformed.MergeMany(x => x.Changes).SynchronizeSafe(queue); - var allChanges = initial.Merge(subsequent).Select( + var allChanges = initial.UnsynchronizedMerge(subsequent).Select( changes => { result.Clone(changes); diff --git a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs index c7c95aa6..4bce97ac 100644 --- a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs +++ b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs @@ -25,7 +25,7 @@ public IObservable> Run() => Observable.Create CaptureChanges(cache, selector)).Select(changes => new ChangeSet(changes)).NotEmpty(); - var sourceAndRefreshes = shared.Merge(refresher); + var sourceAndRefreshes = shared.UnsynchronizedMerge(refresher); // do raw transform var transform = new Transform(sourceAndRefreshes, transformFactory, exceptionCallback, true).Run(); diff --git a/src/DynamicData/Cache/Internal/TreeBuilder.cs b/src/DynamicData/Cache/Internal/TreeBuilder.cs index bf379ef9..d94783e8 100644 --- a/src/DynamicData/Cache/Internal/TreeBuilder.cs +++ b/src/DynamicData/Cache/Internal/TreeBuilder.cs @@ -7,6 +7,8 @@ using System.Reactive.Linq; using System.Reactive.Subjects; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class TreeBuilder(IObservable> source, Func pivotOn, IObservable, bool>>? predicateChanged) @@ -197,7 +199,10 @@ void UpdateChildren(Node parentNode) reFilterObservable.OnNext(Unit.Default); }).DisposeMany().Subscribe(); - var filter = _predicateChanged.SynchronizeSafe(queue).CombineLatest(reFilterObservable, (predicate, _) => predicate); + // Both inputs are routed through the same SharedDeliveryQueue so their delivery is + // serialized; UnsynchronizedCombineLatest avoids the ABBA-prone gate that + // Observable.CombineLatest would hold across downstream delivery. + var filter = _predicateChanged.SynchronizeSafe(queue).UnsynchronizedCombineLatest(reFilterObservable.SynchronizeSafe(queue), (predicate, _) => predicate); var result = allNodes.Connect().Filter(filter).SubscribeSafe(observer); return new CompositeDisposable(result, parentSetter, allData, allNodes, groupedByPivot, Disposable.Create(() => reFilterObservable.OnCompleted()), queue); diff --git a/src/DynamicData/Cache/Internal/Virtualise.cs b/src/DynamicData/Cache/Internal/Virtualise.cs index 9a5fefbc..9e858f5d 100644 --- a/src/DynamicData/Cache/Internal/Virtualise.cs +++ b/src/DynamicData/Cache/Internal/Virtualise.cs @@ -23,7 +23,7 @@ public IObservable> Run() => Observable.Create< var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise).Where(x => x is not null).Select(x => x!); var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update).Where(x => x is not null).Select(x => x!); - return new CompositeDisposable(request.Merge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); + return new CompositeDisposable(request.UnsynchronizedMerge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); }); private sealed class Virtualiser(VirtualRequest? request = null) diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs new file mode 100644 index 00000000..43b3d94b --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -0,0 +1,58 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; + +namespace DynamicData.Internal; + +// Same-type Rx merge that owns a DeliveryQueue. Serializes notifications from +// every input through the queue, which releases its gate before delivering, so +// downstream observers that walk into another cache's writer lock cannot deadlock +// with this operator's serialization point. Used where every input has the same +// element type and no per-input projection is needed inside the drain. When element +// types differ or per-input projections are required, route each input through +// SharedDeliveryQueue with SynchronizeSafe and combine them with UnsynchronizedMerge. +internal static class DeliveryQueueMergeExtensions +{ + // Functionally equivalent to Observable.Merge: completes only after every source + // completes, the first error terminates, subscription occurs in argument order. + public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var queue = new DeliveryQueue(observer); + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources + 1); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + // Subscription first so any terminal notification produced during Rx's disposal + // cascade still flows through the still-active queue. Queue last as cleanup. + subscriptions.Add(queue); + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase + // sets a one-shot stopped flag on the first OnCompleted or OnError. A single + // shared observer would silently drop terminal notifications from every source + // after the first. OnNext and OnError forward straight to the queue (the queue's + // gate serializes concurrent calls). OnCompleted is counter-gated so only the + // last surviving source's completion terminates the merged stream. + IObserver CreateInner() => + Observer.Create( + queue.OnNext, + queue.OnError, + () => + { + if (Interlocked.Decrement(ref remainingSources) == 0) + { + queue.OnCompleted(); + } + }); + }); +} diff --git a/src/DynamicData/Internal/SharedDeliveryQueue.cs b/src/DynamicData/Internal/SharedDeliveryQueue.cs index 6eab2889..9ec57a8e 100644 --- a/src/DynamicData/Internal/SharedDeliveryQueue.cs +++ b/src/DynamicData/Internal/SharedDeliveryQueue.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index ace8ed4a..603f545c 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -2,55 +2,45 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.Internal; -/// -/// Provides SynchronizeSafe extension methods, drop-in replacements -/// for Synchronize(lock) that release the lock before downstream delivery. -/// -/// -/// Disposal ordering matters. CompositeDisposable disposes in -/// declaration order. The queue and the source subscription have different roles: -/// -/// -/// Subscription-first (gate and SDQ overloads) -/// The queue is the IObserver that the source sends notifications to. -/// Disposing the subscription first allows any final terminal notification (OnCompleted/OnError -/// triggered by Rx's disposal cascade or a Finally operator) to flow through the -/// still-active queue. The queue is disposed last as cleanup. -/// -/// -/// Queue-first (parameterless overload) -/// Used by operators with teardown side effects (DisposeMany, OnBeingRemoved). -/// The queue is terminated first via , which ensures -/// all in-flight deliveries complete before the subscription is disposed and teardown logic -/// (e.g., disposing removed items) runs. Terminal notifications are not needed because -/// the subscriber is explicitly tearing down. -/// -/// -/// +// Drop-in replacements for Observable.Synchronize(lock) that release the lock before +// downstream delivery, plus UnsynchronizedMerge for combining streams whose inputs are +// already serialized through the same queue. +// +// Disposal ordering matters. CompositeDisposable disposes in declaration order, and the +// queue and the source subscription have different roles: +// +// Subscription-first (gate and SDQ overloads): the queue is the IObserver that the +// source sends notifications to. Disposing the subscription first allows any final +// terminal notification (OnCompleted or OnError triggered by Rx's disposal cascade +// or a Finally operator) to flow through the still-active queue. The queue is +// disposed last as cleanup. +// +// Queue-first (parameterless overload): used by operators with teardown side effects +// (DisposeMany, OnBeingRemoved). The queue is terminated first via DeliveryQueue.Dispose, +// which ensures all in-flight deliveries complete before the subscription is disposed +// and teardown logic (e.g. disposing removed items) runs. Terminal notifications are +// not needed because the subscriber is explicitly tearing down. internal static class SynchronizeSafeExtensions { - /// - /// Synchronizes the source observable through a . - /// Use when multiple sources of different types share a gate. - /// + // Routes the source through a SharedDeliveryQueue. Use when multiple sources of + // different types share a gate. public static IObservable SynchronizeSafe(this IObservable source, SharedDeliveryQueue queue) => Observable.Create(observer => { var subQueue = queue.CreateQueue(observer); - // Subscription first: terminal notifications flow through the still-active sub-queue + // Subscription first: terminal notifications flow through the still-active sub-queue. return new CompositeDisposable(source.SubscribeSafe(subQueue), subQueue); }); - /// - /// Synchronizes the source observable through an implicitly created . - /// Drop-in replacement for Synchronize(locker). - /// + // Routes the source through an implicitly created DeliveryQueue. Drop-in replacement + // for Observable.Synchronize(locker). #if NET9_0_OR_GREATER public static IObservable SynchronizeSafe(this IObservable source, Lock gate) => #else @@ -60,22 +50,166 @@ public static IObservable SynchronizeSafe(this IObservable source, obje { var queue = new DeliveryQueue(gate, observer); - // Subscription first: terminal notifications flow through the still-active queue + // Subscription first: terminal notifications flow through the still-active queue. return new CompositeDisposable(source.SubscribeSafe(queue), queue); }); - /// - /// Synchronizes the source observable through an implicitly created - /// with automatic delivery completion on dispose. The queue is terminated and drained - /// before the source subscription is disposed, ensuring all in-flight notifications - /// are delivered before teardown. - /// + // Routes the source through an implicitly created DeliveryQueue with automatic + // delivery completion on dispose. The queue is terminated and drained before the + // source subscription is disposed, ensuring all in-flight notifications are delivered + // before teardown. public static IObservable SynchronizeSafe(this IObservable source) => Observable.Create(observer => { var queue = new DeliveryQueue(observer); - // Queue first: ensures in-flight deliveries complete before teardown side effects run + // Queue first: ensures in-flight deliveries complete before teardown side effects run. return new CompositeDisposable(queue, source.SubscribeSafe(queue)); }); + + // Merges every input into a single observable without taking any synchronization gate. + // Functionally equivalent to Observable.Merge: completes only after every source completes, + // the first error terminates, subscription occurs in argument order. + // + // The caller MUST ensure that delivery from every source is already serialized. In this + // library the precondition is satisfied by routing every source through the same + // SharedDeliveryQueue via SynchronizeSafe(queue). The shared queue's drain loop guarantees + // that at most one notification is in flight to the downstream observer at a time, so the + // additional gate that Observable.Merge would install is redundant. + // + // Removing that gate matters in cross-cache pipelines: Observable.Merge holds its private + // _gate for the entire duration of downstream delivery, and when downstream delivery walks + // into another cache's writer lock, two such gates on two operators form an ABBA cycle that + // the queue-drain design is meant to prevent. + // + // Without the external serialization precondition, concurrent OnNext calls into the shared + // observer will race. Do not use as a general-purpose Observable.Merge replacement. + public static IObservable UnsynchronizedMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources); + var terminated = 0; + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase sets + // a one-shot stopped flag on the first OnCompleted or OnError. A single shared + // observer would silently drop terminal notifications from every source after the + // first. The OnNext/OnError/OnCompleted actions close over the shared remainingSources + // and terminated counters so cross-source coordination still works. + IObserver CreateInner() => Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); + + void OnNextSafe(T value) + { + if (Volatile.Read(ref terminated) == 0) + { + observer.OnNext(value); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); + + // Two-input CombineLatest variant that does NOT install a gate. Functionally equivalent + // to Observable.CombineLatest: holds the most-recent value from each source, emits a + // resultSelector output whenever either source fires (provided the other has also fired + // at least once), the first error terminates, completes when both sources complete. + // + // Same precondition as UnsynchronizedMerge: delivery from BOTH sources must already be + // serialized through the same external gate before reaching this operator. In this library + // that is satisfied by routing both inputs through the same SharedDeliveryQueue via + // SynchronizeSafe(queue). Under that precondition no two OnNext calls overlap, so the + // latest-value state needs no internal locking, and the gate that + // Observable.CombineLatest installs becomes redundant. + // + // The Rx gate matters here for the same reason as Merge: Observable.CombineLatest holds + // its private _gate for the entire downstream delivery, and any operator-level lock held + // across a cross-cache write reconstructs the ABBA cycle the queue-drain design eliminated. + // + // Without the external serialization precondition, concurrent OnNext calls would race the + // latest-value state and could produce torn reads. Do not use as a general-purpose + // Observable.CombineLatest replacement. + public static IObservable UnsynchronizedCombineLatest( + this IObservable first, + IObservable second, + Func resultSelector) + where TFirst : notnull + where TSecond : notnull => + Observable.Create(observer => + { + var firstLatest = Optional.None(); + var secondLatest = Optional.None(); + var remainingSources = 2; + var terminated = 0; + + var subscriptions = new CompositeDisposable(2); + subscriptions.Add(first.SubscribeSafe(Observer.Create(OnFirstNext, OnErrorSafe, OnCompletedSafe))); + subscriptions.Add(second.SubscribeSafe(Observer.Create(OnSecondNext, OnErrorSafe, OnCompletedSafe))); + return subscriptions; + + void OnFirstNext(TFirst value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + firstLatest = value; + if (secondLatest.HasValue) + { + observer.OnNext(resultSelector(value, secondLatest.Value)); + } + } + + void OnSecondNext(TSecond value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + secondLatest = value; + if (firstLatest.HasValue) + { + observer.OnNext(resultSelector(firstLatest.Value, value)); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); }