diff --git a/src/DynamicData.Tests/Binding/WhenPropertyChangedBehaviorFixture.cs b/src/DynamicData.Tests/Binding/WhenPropertyChangedBehaviorFixture.cs new file mode 100644 index 000000000..48a60e35d --- /dev/null +++ b/src/DynamicData.Tests/Binding/WhenPropertyChangedBehaviorFixture.cs @@ -0,0 +1,275 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.ComponentModel; + +using DynamicData.Binding; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Binding; + +/// +/// Single-threaded contract tests for : +/// handler attachment ordering, no-dedup semantics, deep-chain re-walks on swaps. +/// +public sealed class WhenPropertyChangedBehaviorFixture +{ + [Fact] + public void Shallow_NotifyInitialFalse_SubscribesHandlerBeforeReturning() + { + // notifyOnInitialValue=false: Subscribe must return only after the PropertyChanged handler + // is attached. A setter that fires immediately after Subscribe returns must reach the + // observer. + var model = new TestModel { Value = 10 }; + var emissions = new List(); + + using var sub = model.WhenPropertyChanged(m => m.Value, notifyOnInitialValue: false) + .Subscribe(pv => emissions.Add(pv.Value)); + + model.Value = 20; + + emissions.Should().Equal(new[] { 20 }); + } + + [Fact] + public void Shallow_NotifyInitialTrue_DoesNotDedupSameValuedEvents() + { + var model = new TestModel { Value = 10 }; + var emissions = new List(); + + using var sub = model.WhenPropertyChanged(m => m.Value, notifyOnInitialValue: true) + .Subscribe(pv => emissions.Add(pv.Value)); + + model.Value = 10; + model.Value = 10; + model.Value = 10; + + emissions.Should().Equal(new[] { 10, 10, 10, 10 }); + } + + [Fact] + public void Shallow_NotifyInitialFalse_DoesNotDedupSameValuedEvents() + { + var model = new TestModel { Value = 10 }; + var emissions = new List(); + + using var sub = model.WhenPropertyChanged(m => m.Value, notifyOnInitialValue: false) + .Subscribe(pv => emissions.Add(pv.Value)); + + model.Value = 42; + model.Value = 42; + + emissions.Should().Equal(new[] { 42, 42 }); + } + + [Fact] + public void DeepChain_NotifyInitialTrue_DoesNotDedupSameValuedEvents() + { + var parent = new ParentModel { Child = new ChildModel { Age = 1 } }; + var emissions = new List(); + + using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: true) + .Subscribe(pv => emissions.Add(pv.Value)); + + parent.Child!.Age = 1; + parent.Child!.Age = 1; + parent.Child!.Age = 1; + + emissions.Should().Equal(new[] { 1, 1, 1, 1 }); + } + + [Fact] + public void DeepChain_NotifyInitialFalse_DoesNotDedupSameValuedEvents() + { + var parent = new ParentModel { Child = new ChildModel { Age = 1 } }; + var emissions = new List(); + + using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: false) + .Subscribe(pv => emissions.Add(pv.Value)); + + parent.Child!.Age = 7; + parent.Child!.Age = 7; + + emissions.Should().Equal(new[] { 7, 7 }); + } + + [Fact] + public void DeepChain_PostSwap_LeafEventOnNewChild_Captured() + { + // After parent.Child is reassigned, the leaf-level subscription must be re-attached + // against the new child. A subsequent leaf mutation on the new child must be captured. + var parent = new ParentModel { Child = new ChildModel { Age = 10 } }; + var emissions = new List(); + + using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: true) + .Subscribe(pv => emissions.Add(pv.Value)); + + var newChild = new ChildModel { Age = 20 }; + parent.Child = newChild; + newChild.Age = 30; + + emissions.Should().Equal(new[] { 10, 20, 30 }); + } + + [Fact] + public void DeepChain_MidChainSwap_DeeperLevelsRetargetCorrectly() + { + // Mid-chain swap on a 4-level chain. When level 3 is reassigned, the leaf subscription + // must re-attach against the new subtree; events on the old subtree must be ignored + // (its notifier subscription was disposed). + var l1 = new Level1 + { + Child = new Level2 + { + Child = new Level3 + { + Child = new Level4 { Leaf = 10 }, + }, + }, + }; + + var emissions = new List(); + using var sub = l1.WhenPropertyChanged(x => x.Child!.Child!.Child!.Leaf, notifyOnInitialValue: true) + .Subscribe(pv => emissions.Add(pv.Value)); + + emissions.Should().Equal(new[] { 10 }, "initial emission"); + + var originalLeaf = l1.Child!.Child!.Child!; + + var newL4 = new Level4 { Leaf = 20 }; + l1.Child!.Child!.Child = newL4; + + emissions.Should().Equal(new[] { 10, 20 }, "mid-chain swap emits the new leaf value"); + + newL4.Leaf = 30; + emissions.Should().Equal(new[] { 10, 20, 30 }, "leaf event on new subtree is captured"); + + originalLeaf.Leaf = 999; + emissions.Should().Equal(new[] { 10, 20, 30 }, "leaf event on detached subtree is ignored"); + } + + private sealed class TestModel : INotifyPropertyChanged + { + private int _value; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Value + { + get => _value; + set + { + _value = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value))); + } + } + } + + private sealed class ParentModel : INotifyPropertyChanged + { + private ChildModel? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public ChildModel? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class ChildModel : INotifyPropertyChanged + { + private int _age; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Age + { + get => _age; + set + { + _age = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Age))); + } + } + } + + private sealed class Level1 : INotifyPropertyChanged + { + private Level2? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public Level2? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class Level2 : INotifyPropertyChanged + { + private Level3? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public Level3? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class Level3 : INotifyPropertyChanged + { + private Level4? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public Level4? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class Level4 : INotifyPropertyChanged + { + private int _leaf; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Leaf + { + get => _leaf; + set + { + _leaf = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Leaf))); + } + } + } +} diff --git a/src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs b/src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs new file mode 100644 index 000000000..5c8d073c8 --- /dev/null +++ b/src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs @@ -0,0 +1,554 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Binding; +using DynamicData.Tests.Utilities; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Binding; + +/// +/// Multi-threaded race tests for . +/// Each test forces concurrency between the operator's subscribe call (or chain re-walk) and one or more +/// notifiers firing on other threads. +/// +public sealed class WhenPropertyChangedRaceFixture +{ + private static readonly TimeSpan ConditionTimeout = TimeSpan.FromSeconds(30); + + [Fact] + public async Task Shallow_ConcurrentMutationDuringInitialEmit_NotDropped() + { + var item = new Item() + { + Id = 1, + Value = 10 + }; + + var whenSubscribing = new ManualResetEventSlim(); + var whenValueChanged = new ManualResetEventSlim(); + + var source = item.WhenPropertyChanged( + propertyAccessor: static item => item.Value, + notifyOnInitialValue: true); + + var observedValues = new List(); + var observer = Observer.Create>(propertyValue => + { + observedValues.Add(propertyValue.Value); + + whenSubscribing.Set(); + whenValueChanged.Wait(); + }); + + await Task.WhenAll( + Task.Run(() => + { + using var subscription = source.Subscribe(observer); + }), + Task.Run(() => + { + whenSubscribing.Wait(); + + item.Value = 20; + + whenValueChanged.Set(); + })); + + observedValues.Should().BeEquivalentTo( + expectation: new [] { 10, 20 }, + config: options => options.WithStrictOrdering(), + because: "All change events occurring after publication of the initial value should be captured and forwarded."); + } + + [Fact] + public async Task DeepChain_ConcurrentLeafMutationDuringInitialEmit_NotDropped() + { + // Deep-chain version of the above. The observer blocks inside its OnNext for the initial + // leaf value while a second thread mutates the leaf. + var parent = new ParentModel { Child = new ChildModel { Age = 10 } }; + + var whenSubscribing = new ManualResetEventSlim(); + var whenValueChanged = new ManualResetEventSlim(); + + var emissions = new List(); + var observer = Observer.Create>(pv => + { + emissions.Add(pv.Value); + whenSubscribing.Set(); + whenValueChanged.Wait(); + }); + + var source = parent.WhenPropertyChanged(static p => p.Child!.Age, notifyOnInitialValue: true); + + await Task.WhenAll( + Task.Run(() => + { + using var subscription = source.Subscribe(observer); + }), + Task.Run(() => + { + whenSubscribing.Wait(); + parent.Child!.Age = 20; + whenValueChanged.Set(); + })).WaitAsync(ConditionTimeout); + + emissions.Should().Equal(new[] { 10, 20 }); + } + + [Fact] + public async Task DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped() + { + // Two threads concurrently swap parent.Child. After both swaps complete, a leaf mutation + // on the current child must be captured. SharedDeliveryQueue serialises the level-0 + // signals on the drainer, so the final level-1 subscription always targets parent.Child's + // current value. + const int iterations = 50; + var losses = 0; + + for (var iter = 0; iter < iterations; iter++) + { + var parent = new ParentModel { Child = new ChildModel { Age = 0 } }; + var emissions = new List(); + + using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: false) + .Subscribe(pv => { lock (emissions) emissions.Add(pv.Value); }); + + var newChild1 = new ChildModel { Age = 1 }; + var newChild2 = new ChildModel { Age = 2 }; + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); parent.Child = newChild1; }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); parent.Child = newChild2; }); + await Task.WhenAll(taskA, taskB).WaitAsync(ConditionTimeout); + + var winner = parent.Child; + if (winner is null) + { + continue; + } + + winner.Age = 99; + + WaitForCondition(() => { lock (emissions) return emissions.Contains(99); }); + + lock (emissions) + { + if (!emissions.Contains(99)) + { + losses++; + } + } + } + + losses.Should().Be(0, $"out of {iterations} iterations, {losses} dropped the leaf event on the post-swap winner"); + } + + [Fact] + public async Task DeepChain_FiveLevels_AllLevelsMutatedConcurrently_FinalEmissionMatchesActual() + { + // Torture: five worker threads each mutating at a different level of a 5-level chain. + // Mutations that land on detached subtrees are ignored (their notifier subscriptions were + // disposed by ResubscribeFrom). Mutations on the live chain reach the drainer. + // + // Three invariants per iteration: + // (a) Rx contract: ValidateSynchronization catches any concurrent OnNext on the user + // observer (a SharedDeliveryQueue serialisation failure). + // (b) Value legality: every emission must be a value that some thread legitimately + // wrote. + // (c) Final consistency: after Task.WhenAll the drainer continues until the queue is + // empty. The last processed signal triggers a ReadCurrent against the now-frozen + // chain state, so emissions.Last() == ReadCurrent(). + const int iterations = 50; + const int mutationsPerThread = 200; + var mismatches = 0; + + for (var iter = 0; iter < iterations; iter++) + { + var root = NewDeepChain(0); + var emissions = new List(); + + using var sub = root.WhenPropertyChanged(r => r.Child!.Child!.Child!.Child!.Leaf, notifyOnInitialValue: true) + .ValidateSynchronization() + .Subscribe(pv => { lock (emissions) emissions.Add(pv.Value); }); + + using var barrier = new Barrier(5); + var iterSeed = iter * 10_000; + var tasks = new[] + { + Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < mutationsPerThread; i++) + { + root.Child = NewDeep2(iterSeed + 40_000 + i); + } + }), + Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < mutationsPerThread; i++) + { + var l2 = root.Child; + if (l2 is not null) l2.Child = NewDeep3(iterSeed + 30_000 + i); + } + }), + Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < mutationsPerThread; i++) + { + var l3 = root.Child?.Child; + if (l3 is not null) l3.Child = NewDeep4(iterSeed + 20_000 + i); + } + }), + Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < mutationsPerThread; i++) + { + var l4 = root.Child?.Child?.Child; + if (l4 is not null) l4.Child = new Deep5 { Leaf = iterSeed + 10_000 + i }; + } + }), + Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < mutationsPerThread; i++) + { + var l5 = root.Child?.Child?.Child?.Child; + if (l5 is not null) l5.Leaf = i; + } + }), + }; + + await Task.WhenAll(tasks).WaitAsync(ConditionTimeout); + + var actualFinal = root.Child!.Child!.Child!.Child!.Leaf; + + WaitForCondition(() => { lock (emissions) return emissions.Count > 0 && emissions[^1] == actualFinal; }); + + var legal = new HashSet { 0 }; + for (var i = 0; i < mutationsPerThread; i++) + { + legal.Add(i); + legal.Add(iterSeed + 10_000 + i); + legal.Add(iterSeed + 20_000 + i); + legal.Add(iterSeed + 30_000 + i); + legal.Add(iterSeed + 40_000 + i); + } + + lock (emissions) + { + emissions.Should().NotBeEmpty($"iter {iter}: notifyOnInitialValue=true requires at least the initial emission"); + emissions[0].Should().Be(0, $"iter {iter}: first emission must be the initial value"); + + var illegal = emissions.Where(v => !legal.Contains(v)).ToList(); + illegal.Should().BeEmpty($"iter {iter}: every emission must be a value some thread wrote; saw {string.Join(",", illegal.Take(5))}"); + + if (emissions.Count == 0 || emissions[^1] != actualFinal) + { + mismatches++; + } + } + } + + mismatches.Should().Be(0, $"out of {iterations} iterations, {mismatches} ended with the last emission not matching the actual final chain leaf"); + } + + [Fact(Skip = "AutoRefresh has a separate concurrency bug; tracked separately")] + public async Task AutoRefreshThenFilter_ConcurrentAddsAndPropertyActivation_AllItemsObserved() + { + // One adder thread sequentially adds items to the cache while a single flipper thread + // concurrently sets each item's Activated to true. Final filter contents must include + // every item (every item ends Activated=true). + // + // KeyedActivable's setter only raises PropertyChanged on actual value change, so a + // dropped false->true transition is unrecoverable. + // + // The race lives in AutoRefresh's internal Publish multicast: Sub 1 (Filter path) + // receives the Add and reads the property before Sub 2 (MergeMany) subscribes the + // per-item refresh handler. A concurrent flip landing in that gap is dropped. This + // is not a WhenPropertyChanged issue: AutoRefresh calls WhenPropertyChanged with + // notifyInitial=false, so the per-item subscribe attaches the handler immediately + // and has no internal race window. + const int iterations = 100; + const int itemCount = 200; + + for (var iter = 0; iter < iterations; iter++) + { + using var cache = new SourceCache(x => x.Id); + var items = Enumerable.Range(0, itemCount).Select(i => new KeyedActivable(i)).ToList(); + + using var results = cache.Connect() + .AutoRefresh(x => x.Activated) + .Filter(x => x.Activated) + .AsAggregator(); + + using var barrier = new Barrier(2); + + var adder = Task.Run(() => + { + barrier.SignalAndWait(); + foreach (var item in items) cache.AddOrUpdate(item); + }); + + var flipper = Task.Run(() => + { + barrier.SignalAndWait(); + foreach (var item in items) item.Activated = true; + }); + + await Task.WhenAll(adder, flipper).WaitAsync(ConditionTimeout); + + var expected = items.Select(x => x.Id).ToHashSet(); + WaitForCondition(() => results.Data.Keys.ToHashSet().SetEquals(expected)); + + var actual = results.Data.Keys.ToHashSet(); + actual.Should().BeEquivalentTo(expected, $"iter {iter}: every item ends Activated=true and must appear in the filter (missing: {string.Join(",", expected.Except(actual))})"); + results.Error.Should().BeNull($"iter {iter}: pipeline must not error"); + } + } + + [Fact(Skip = "AutoRefresh has a separate concurrency bug; tracked separately")] + public async Task AutoRefreshThenFilter_DualSubscribers_AllItemsObserved() + { + // Two independent cache subscribers running on the ThreadPool: + // Sub 1 (mutator): on every Add change, flips item.Activated to true + // Sub 2 (filter chain): AutoRefresh + Filter (filter = Activated) + // Items start with Activated=false (filtered out). The mutator flips every item, so + // the final filter contents must include every item. + // + // Same root cause as the single-flipper variant above: AutoRefresh's internal Publish + // multicasts the Add to the Filter path before MergeMany subscribes the per-item + // refresh handler. The mutator's flip can land in that gap and be dropped. + const int iterations = 100; + const int itemCount = 200; + + for (var iter = 0; iter < iterations; iter++) + { + using var cache = new SourceCache(x => x.Id); + var items = Enumerable.Range(0, itemCount).Select(i => new KeyedActivable(i)).ToList(); + + using var mutator = cache.Connect() + .ObserveOn(TaskPoolScheduler.Default) + .Subscribe(changes => + { + foreach (var change in changes) + { + if (change.Reason == ChangeReason.Add) + { + change.Current.Activated = true; + } + } + }); + + using var results = cache.Connect() + .ObserveOn(TaskPoolScheduler.Default) + .AutoRefresh(x => x.Activated) + .Filter(x => x.Activated) + .AsAggregator(); + + foreach (var item in items) cache.AddOrUpdate(item); + + var expected = items.Select(x => x.Id).ToHashSet(); + WaitForCondition(() => results.Data.Keys.ToHashSet().SetEquals(expected)); + + var actual = results.Data.Keys.ToHashSet(); + actual.Should().BeEquivalentTo(expected, $"iter {iter}: every item was flipped to Activated=true by the mutator and must appear in the filter (missing: {string.Join(",", expected.Except(actual))})"); + results.Error.Should().BeNull($"iter {iter}: pipeline must not error"); + } + } + + private static Deep1 NewDeepChain(int leaf) => + new Deep1 { Child = NewDeep2(leaf) }; + + private static Deep2 NewDeep2(int leaf) => + new Deep2 { Child = NewDeep3(leaf) }; + + private static Deep3 NewDeep3(int leaf) => + new Deep3 { Child = NewDeep4(leaf) }; + + private static Deep4 NewDeep4(int leaf) => + new Deep4 { Child = new Deep5 { Leaf = leaf } }; + + private static void WaitForCondition(Func condition, TimeSpan? timeout = null) => + SpinWait.SpinUntil(condition, timeout ?? ConditionTimeout); + + private sealed class Item : INotifyPropertyChanged + { + private int _value; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Id { get; init; } + + public int Value + { + get => _value; + set + { + _value = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value))); + } + } + } + + private sealed class ParentModel : INotifyPropertyChanged + { + private ChildModel? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public ChildModel? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class ChildModel : INotifyPropertyChanged + { + private int _age; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Age + { + get => _age; + set + { + _age = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Age))); + } + } + } + + private sealed class Deep1 : INotifyPropertyChanged + { + private Deep2? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public Deep2? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class Deep2 : INotifyPropertyChanged + { + private Deep3? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public Deep3? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class Deep3 : INotifyPropertyChanged + { + private Deep4? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public Deep4? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class Deep4 : INotifyPropertyChanged + { + private Deep5? _child; + + public event PropertyChangedEventHandler? PropertyChanged; + + public Deep5? Child + { + get => _child; + set + { + _child = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child))); + } + } + } + + private sealed class Deep5 : INotifyPropertyChanged + { + private int _leaf; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Leaf + { + get => _leaf; + set + { + _leaf = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Leaf))); + } + } + } + + private sealed class KeyedActivable : INotifyPropertyChanged + { + private bool _activated; + + public KeyedActivable(int id) + { + Id = id; + } + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Id { get; } + + public bool Activated + { + get => _activated; + set + { + if (_activated == value) return; + _activated = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Activated))); + } + } + } +} diff --git a/src/DynamicData/Binding/ObservablePropertyFactory.cs b/src/DynamicData/Binding/ObservablePropertyFactory.cs index d9f9be00c..4dd3fbd68 100644 --- a/src/DynamicData/Binding/ObservablePropertyFactory.cs +++ b/src/DynamicData/Binding/ObservablePropertyFactory.cs @@ -1,12 +1,16 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Collections.Generic; using System.ComponentModel; using System.Linq.Expressions; using System.Reactive; +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.Binding; internal sealed class ObservablePropertyFactory @@ -14,74 +18,279 @@ internal sealed class ObservablePropertyFactory { private readonly Func>> _factory; - public ObservablePropertyFactory(Func valueAccessor, ObservablePropertyPart[] chain) => - _factory = (t, notifyInitial) => + public ObservablePropertyFactory(Func valueAccessor, ObservablePropertyPart[] chain) + { + // chain is leaf-first (output of SplitIntoSteps). Reverse once to root-to-leaf order. + var rootToLeaf = chain.AsEnumerable().Reverse().ToArray(); + _factory = (source, notifyInitial) => Observable.Create>( + observer => new DeepChainSubscription(observer, source, rootToLeaf, valueAccessor, notifyInitial)); + } + + public ObservablePropertyFactory(Expression> expression) + { + // Shallow form: single property, no chain. Used when depth == 1. Skips SharedDeliveryQueue + // and Observable.FromEventPattern in favour of a direct PropertyChanged += handler for + // the high-frequency single-property hot path. + var memberName = expression.GetProperty().Name; + var accessor = expression.Compile(); + _factory = (source, notifyInitial) => Observable.Create>( + observer => new SinglePropertySubscription(observer, source, memberName, accessor, notifyInitial)); + } + + public IObservable> Create(TObject source, bool notifyInitial) => _factory(source, notifyInitial); + + // Single-property subscription. Attaches a direct PropertyChanged handler and forwards every + // event through a DeliveryQueue. Used for x => x.Prop (depth == 1) where SharedDeliveryQueue + // and Observable.FromEventPattern would be needless overhead on the hot path. + // + // notifyInitial only controls whether the constructor synthesises an initial emission. There + // is no equality dedup at the subscribe seam: a same-valued PropertyChanged firing in the + // subscribe window is a legitimate event and must be delivered. The "never drop events" + // contract takes precedence over avoiding a benign duplicate. + private sealed class SinglePropertySubscription : IDisposable + { + private readonly TObject _source; + private readonly string _memberName; + private readonly Func _accessor; + private readonly DeliveryQueue> _queue; + + public SinglePropertySubscription( + IObserver> observer, + TObject source, + string memberName, + Func accessor, + bool notifyInitial) { - // 1) notify when values have changed - // 2) resubscribe when changed because it may be a child object which has changed - var valueHasChanged = GetNotifiers(t, chain).Merge().Take(1).Repeat(); + _source = source; + _memberName = memberName; + _accessor = accessor; + _queue = new DeliveryQueue>(observer); + + // Attach PropertyChanged handler FIRST so events during the initial read are not missed. + _source.PropertyChanged += OnPropertyChanged; + if (notifyInitial) { - valueHasChanged = Observable.Defer(() => Observable.Return(Unit.Default)).Concat(valueHasChanged); + EmitCurrent(); } + } - return valueHasChanged.Select(_ => GetPropertyValue(t, chain, valueAccessor)); - }; + public void Dispose() + { + _source.PropertyChanged -= OnPropertyChanged; + _queue.Dispose(); + } - public ObservablePropertyFactory(Expression> expression) + private void OnPropertyChanged(object? sender, PropertyChangedEventArgs args) + { + if (args.PropertyName == _memberName) + { + EmitCurrent(); + } + } + + // Reads the current property value and forwards it through the queue. The accessor is + // user code and may throw; that exception routes to OnError. The downstream OnNext call + // is NOT wrapped: per the Rx contract, if the user observer throws, the exception + // propagates back to whoever invoked the PropertyChanged setter, matching what a plain + // Subject.OnNext would do. + private void EmitCurrent() + { + PropertyValue value; + try + { + value = new PropertyValue(_source, _accessor(_source)); + } + catch (Exception ex) + { + _queue.OnError(ex); + return; + } + + _queue.OnNext(value); + } + } + + // Deep-chain subscription. Encapsulates the SharedDeliveryQueue + sub-queues + per-level + // SerialDisposable slots so fields are assigned in well-defined order and the signal sub-queue + // never needs a forward null bootstrap. + // + // SharedDeliveryQueue funnels both chain-level signals and user emissions through a + // single-drainer pattern. Two sub-queues: + // - userSub: receives PropertyValue emissions; drains to the user observer. + // - signalSub: receives level-change signals (int); drains by running ProcessSignal + // synchronously, which performs the re-walk (ResubscribeFrom) and enqueues the + // resulting value onto userSub. + // Drain order is LIFO (highest sub-queue index first), so signal processing runs before + // user delivery within each drain cycle, batching multiple signals before delivering the + // resulting values. + // + // Three properties this gives us: + // 1) Rx contract: user observer is never called concurrently (single drainer). + // 2) Deadlock immunity: a blocking user observer parks ONLY the drainer thread; + // concurrent producers enqueue and return immediately. + // 3) Concurrent-mutation safety: ResubscribeFrom runs on the drainer thread, so + // two threads racing to reassign the same intermediate property cannot leave a + // SerialDisposable slot subscribed to the loser; the drainer's loop processes + // every queued signal in order against the current chain state. + // + // notifyInitial only controls whether ProcessSignal emits the current chain value during + // the InitialSetupSignal pass. There is no equality dedup at the subscribe seam: every + // chain event is delivered. + private sealed class DeepChainSubscription : IDisposable { - // this overload is used for shallow observations i.e. depth = 1, so no need for re-subscriptions - var member = expression.GetProperty(); - var accessor = expression.Compile(); + // Sentinel signal value enqueued during subscribe to perform the initial chain setup + // from inside the SharedDeliveryQueue drainer. + private const int InitialSetupSignal = -1; + + private readonly TObject _source; + private readonly ObservablePropertyPart[] _rootToLeaf; + private readonly Func _valueAccessor; + private readonly bool _notifyInitial; + private readonly SharedDeliveryQueue _sharedQueue; + private readonly DeliverySubQueue> _userSub; + private readonly DeliverySubQueue _signalSub; + private readonly SerialDisposable[] _levelSlots; - _factory = (t, notifyInitial) => + // Pre-allocated per-level notifier callbacks. Indexed by level. ResubscribeFrom reuses + // these instead of allocating a fresh closure per re-walk. + private readonly Action[] _levelCallbacks; + + public DeepChainSubscription( + IObserver> observer, + TObject source, + ObservablePropertyPart[] rootToLeaf, + Func valueAccessor, + bool notifyInitial) { - PropertyValue Factory() => new(t, accessor(t)); + _source = source; + _rootToLeaf = rootToLeaf; + _valueAccessor = valueAccessor; + _notifyInitial = notifyInitial; + + _sharedQueue = new SharedDeliveryQueue(); + _userSub = _sharedQueue.CreateQueue(observer); - var propertyChanged = Observable.FromEventPattern(handler => t.PropertyChanged += handler, handler => t.PropertyChanged -= handler).Where(args => args.EventArgs.PropertyName == member.Name).Select(_ => Factory()); + // ProcessSignal references _signalSub indirectly via ResubscribeFrom's notifier + // subscriptions. Observer.Create stores the method group without invoking it, and + // _signalSub is assigned by the surrounding expression, so by the time anything calls + // ProcessSignal the field is set. + _signalSub = _sharedQueue.CreateQueue(Observer.Create(ProcessSignal)); - if (!notifyInitial) + var depth = rootToLeaf.Length; + _levelSlots = new SerialDisposable[depth]; + _levelCallbacks = new Action[depth]; + for (var i = 0; i < depth; i++) { - return propertyChanged; + _levelSlots[i] = new SerialDisposable(); + var level = i; + _levelCallbacks[i] = _ => _signalSub.OnNext(level); } - var initial = Observable.Defer(() => Observable.Return(Factory())); - return initial.Concat(propertyChanged); - }; - } + // Kick off initial chain setup via the drainer. The subscribe thread becomes the + // drainer (no one else is draining yet on a fresh subscription) and runs + // ProcessSignal(InitialSetupSignal) synchronously, which attaches the chain and + // emits the initial value. + _signalSub.OnNext(InitialSetupSignal); + } - public IObservable> Create(TObject source, bool notifyInitial) => _factory(source, notifyInitial); + public void Dispose() + { + foreach (var slot in _levelSlots) + { + slot.Dispose(); + } - // create notifier for all parts of the property path - private static IEnumerable> GetNotifiers(TObject source, IEnumerable chain) - { - object? value = source; - foreach (var metadata in chain.Reverse()) + _signalSub.Dispose(); + _userSub.Dispose(); + _sharedQueue.Dispose(); + } + + private void ProcessSignal(int level) { - var obs = metadata.Factory(value).Publish().RefCount(); - value = metadata.Invoker(value); - yield return obs; + // Drainer thread. The chain walk (Invoker / notifier Factory / ReadCurrent's accessor) + // is user code and may throw; those exceptions route to OnError. The downstream + // OnNext call is NOT wrapped: per the Rx contract, if the user observer throws, the + // exception propagates back through the drainer, matching what a plain Subject + // would do. + // + // The two cases (initial setup vs level-fire) collapse to: + // startLevel = (initial) ? 0 : level + 1 + // emit = (level-fire) || _notifyInitial + var isInitial = level == InitialSetupSignal; + var shouldEmit = !isInitial || _notifyInitial; + PropertyValue value; + try + { + ResubscribeFrom(isInitial ? 0 : level + 1); + if (!shouldEmit) + { + return; + } - if (value is null) + value = ReadCurrent(); + } + catch (Exception ex) { - yield break; + _userSub.OnError(ex); + return; } + + _userSub.OnNext(value); } - } - // walk the tree and break at a null, or return the value [should reduce this to a null an expression] - private static PropertyValue GetPropertyValue(TObject source, IEnumerable chain, Func valueAccessor) - { - object? value = source; - foreach (var metadata in chain.Reverse()) + private void ResubscribeFrom(int startLevel) { - value = metadata.Invoker(value); - if (value is null) + var depth = _rootToLeaf.Length; + if (startLevel >= depth) + { + return; + } + + object? value = _source; + for (var i = 0; i < startLevel; i++) + { + value = _rootToLeaf[i].Invoker(value); + if (value is null) + { + for (var j = startLevel; j < depth; j++) + { + _levelSlots[j].Disposable = Disposable.Empty; + } + + return; + } + } + + for (var i = startLevel; i < depth; i++) { - return new PropertyValue(source); + if (value is null) + { + _levelSlots[i].Disposable = Disposable.Empty; + continue; + } + + var notifier = _rootToLeaf[i].Factory(value); + _levelSlots[i].Disposable = notifier.Subscribe(_levelCallbacks[i]); + + value = _rootToLeaf[i].Invoker(value); } } - return new PropertyValue(source, valueAccessor(source)); + // Root-to-leaf chain walk. Stops at null and returns an unobtainable PropertyValue. + private PropertyValue ReadCurrent() + { + object? value = _source; + foreach (var metadata in _rootToLeaf) + { + value = metadata.Invoker(value); + if (value is null) + { + return new PropertyValue(_source); + } + } + + return new PropertyValue(_source, _valueAccessor(_source)); + } } }