diff --git a/src/DynamicData.Tests/Binding/WhenPropertyChangedBehaviorFixture.cs b/src/DynamicData.Tests/Binding/WhenPropertyChangedBehaviorFixture.cs
new file mode 100644
index 000000000..48a60e35d
--- /dev/null
+++ b/src/DynamicData.Tests/Binding/WhenPropertyChangedBehaviorFixture.cs
@@ -0,0 +1,275 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Generic;
+using System.ComponentModel;
+
+using DynamicData.Binding;
+
+using FluentAssertions;
+
+using Xunit;
+
+namespace DynamicData.Tests.Binding;
+
+///
+/// Single-threaded contract tests for :
+/// handler attachment ordering, no-dedup semantics, deep-chain re-walks on swaps.
+///
+public sealed class WhenPropertyChangedBehaviorFixture
+{
+ [Fact]
+ public void Shallow_NotifyInitialFalse_SubscribesHandlerBeforeReturning()
+ {
+ // notifyOnInitialValue=false: Subscribe must return only after the PropertyChanged handler
+ // is attached. A setter that fires immediately after Subscribe returns must reach the
+ // observer.
+ var model = new TestModel { Value = 10 };
+ var emissions = new List();
+
+ using var sub = model.WhenPropertyChanged(m => m.Value, notifyOnInitialValue: false)
+ .Subscribe(pv => emissions.Add(pv.Value));
+
+ model.Value = 20;
+
+ emissions.Should().Equal(new[] { 20 });
+ }
+
+ [Fact]
+ public void Shallow_NotifyInitialTrue_DoesNotDedupSameValuedEvents()
+ {
+ var model = new TestModel { Value = 10 };
+ var emissions = new List();
+
+ using var sub = model.WhenPropertyChanged(m => m.Value, notifyOnInitialValue: true)
+ .Subscribe(pv => emissions.Add(pv.Value));
+
+ model.Value = 10;
+ model.Value = 10;
+ model.Value = 10;
+
+ emissions.Should().Equal(new[] { 10, 10, 10, 10 });
+ }
+
+ [Fact]
+ public void Shallow_NotifyInitialFalse_DoesNotDedupSameValuedEvents()
+ {
+ var model = new TestModel { Value = 10 };
+ var emissions = new List();
+
+ using var sub = model.WhenPropertyChanged(m => m.Value, notifyOnInitialValue: false)
+ .Subscribe(pv => emissions.Add(pv.Value));
+
+ model.Value = 42;
+ model.Value = 42;
+
+ emissions.Should().Equal(new[] { 42, 42 });
+ }
+
+ [Fact]
+ public void DeepChain_NotifyInitialTrue_DoesNotDedupSameValuedEvents()
+ {
+ var parent = new ParentModel { Child = new ChildModel { Age = 1 } };
+ var emissions = new List();
+
+ using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: true)
+ .Subscribe(pv => emissions.Add(pv.Value));
+
+ parent.Child!.Age = 1;
+ parent.Child!.Age = 1;
+ parent.Child!.Age = 1;
+
+ emissions.Should().Equal(new[] { 1, 1, 1, 1 });
+ }
+
+ [Fact]
+ public void DeepChain_NotifyInitialFalse_DoesNotDedupSameValuedEvents()
+ {
+ var parent = new ParentModel { Child = new ChildModel { Age = 1 } };
+ var emissions = new List();
+
+ using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: false)
+ .Subscribe(pv => emissions.Add(pv.Value));
+
+ parent.Child!.Age = 7;
+ parent.Child!.Age = 7;
+
+ emissions.Should().Equal(new[] { 7, 7 });
+ }
+
+ [Fact]
+ public void DeepChain_PostSwap_LeafEventOnNewChild_Captured()
+ {
+ // After parent.Child is reassigned, the leaf-level subscription must be re-attached
+ // against the new child. A subsequent leaf mutation on the new child must be captured.
+ var parent = new ParentModel { Child = new ChildModel { Age = 10 } };
+ var emissions = new List();
+
+ using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: true)
+ .Subscribe(pv => emissions.Add(pv.Value));
+
+ var newChild = new ChildModel { Age = 20 };
+ parent.Child = newChild;
+ newChild.Age = 30;
+
+ emissions.Should().Equal(new[] { 10, 20, 30 });
+ }
+
+ [Fact]
+ public void DeepChain_MidChainSwap_DeeperLevelsRetargetCorrectly()
+ {
+ // Mid-chain swap on a 4-level chain. When level 3 is reassigned, the leaf subscription
+ // must re-attach against the new subtree; events on the old subtree must be ignored
+ // (its notifier subscription was disposed).
+ var l1 = new Level1
+ {
+ Child = new Level2
+ {
+ Child = new Level3
+ {
+ Child = new Level4 { Leaf = 10 },
+ },
+ },
+ };
+
+ var emissions = new List();
+ using var sub = l1.WhenPropertyChanged(x => x.Child!.Child!.Child!.Leaf, notifyOnInitialValue: true)
+ .Subscribe(pv => emissions.Add(pv.Value));
+
+ emissions.Should().Equal(new[] { 10 }, "initial emission");
+
+ var originalLeaf = l1.Child!.Child!.Child!;
+
+ var newL4 = new Level4 { Leaf = 20 };
+ l1.Child!.Child!.Child = newL4;
+
+ emissions.Should().Equal(new[] { 10, 20 }, "mid-chain swap emits the new leaf value");
+
+ newL4.Leaf = 30;
+ emissions.Should().Equal(new[] { 10, 20, 30 }, "leaf event on new subtree is captured");
+
+ originalLeaf.Leaf = 999;
+ emissions.Should().Equal(new[] { 10, 20, 30 }, "leaf event on detached subtree is ignored");
+ }
+
+ private sealed class TestModel : INotifyPropertyChanged
+ {
+ private int _value;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public int Value
+ {
+ get => _value;
+ set
+ {
+ _value = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value)));
+ }
+ }
+ }
+
+ private sealed class ParentModel : INotifyPropertyChanged
+ {
+ private ChildModel? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public ChildModel? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class ChildModel : INotifyPropertyChanged
+ {
+ private int _age;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public int Age
+ {
+ get => _age;
+ set
+ {
+ _age = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Age)));
+ }
+ }
+ }
+
+ private sealed class Level1 : INotifyPropertyChanged
+ {
+ private Level2? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public Level2? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class Level2 : INotifyPropertyChanged
+ {
+ private Level3? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public Level3? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class Level3 : INotifyPropertyChanged
+ {
+ private Level4? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public Level4? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class Level4 : INotifyPropertyChanged
+ {
+ private int _leaf;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public int Leaf
+ {
+ get => _leaf;
+ set
+ {
+ _leaf = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Leaf)));
+ }
+ }
+ }
+}
diff --git a/src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs b/src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs
new file mode 100644
index 000000000..5c8d073c8
--- /dev/null
+++ b/src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs
@@ -0,0 +1,554 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Generic;
+using System.ComponentModel;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+using DynamicData.Binding;
+using DynamicData.Tests.Utilities;
+
+using FluentAssertions;
+
+using Xunit;
+
+namespace DynamicData.Tests.Binding;
+
+///
+/// Multi-threaded race tests for .
+/// Each test forces concurrency between the operator's subscribe call (or chain re-walk) and one or more
+/// notifiers firing on other threads.
+///
+public sealed class WhenPropertyChangedRaceFixture
+{
+ private static readonly TimeSpan ConditionTimeout = TimeSpan.FromSeconds(30);
+
+ [Fact]
+ public async Task Shallow_ConcurrentMutationDuringInitialEmit_NotDropped()
+ {
+ var item = new Item()
+ {
+ Id = 1,
+ Value = 10
+ };
+
+ var whenSubscribing = new ManualResetEventSlim();
+ var whenValueChanged = new ManualResetEventSlim();
+
+ var source = item.WhenPropertyChanged(
+ propertyAccessor: static item => item.Value,
+ notifyOnInitialValue: true);
+
+ var observedValues = new List();
+ var observer = Observer.Create>(propertyValue =>
+ {
+ observedValues.Add(propertyValue.Value);
+
+ whenSubscribing.Set();
+ whenValueChanged.Wait();
+ });
+
+ await Task.WhenAll(
+ Task.Run(() =>
+ {
+ using var subscription = source.Subscribe(observer);
+ }),
+ Task.Run(() =>
+ {
+ whenSubscribing.Wait();
+
+ item.Value = 20;
+
+ whenValueChanged.Set();
+ }));
+
+ observedValues.Should().BeEquivalentTo(
+ expectation: new [] { 10, 20 },
+ config: options => options.WithStrictOrdering(),
+ because: "All change events occurring after publication of the initial value should be captured and forwarded.");
+ }
+
+ [Fact]
+ public async Task DeepChain_ConcurrentLeafMutationDuringInitialEmit_NotDropped()
+ {
+ // Deep-chain version of the above. The observer blocks inside its OnNext for the initial
+ // leaf value while a second thread mutates the leaf.
+ var parent = new ParentModel { Child = new ChildModel { Age = 10 } };
+
+ var whenSubscribing = new ManualResetEventSlim();
+ var whenValueChanged = new ManualResetEventSlim();
+
+ var emissions = new List();
+ var observer = Observer.Create>(pv =>
+ {
+ emissions.Add(pv.Value);
+ whenSubscribing.Set();
+ whenValueChanged.Wait();
+ });
+
+ var source = parent.WhenPropertyChanged(static p => p.Child!.Age, notifyOnInitialValue: true);
+
+ await Task.WhenAll(
+ Task.Run(() =>
+ {
+ using var subscription = source.Subscribe(observer);
+ }),
+ Task.Run(() =>
+ {
+ whenSubscribing.Wait();
+ parent.Child!.Age = 20;
+ whenValueChanged.Set();
+ })).WaitAsync(ConditionTimeout);
+
+ emissions.Should().Equal(new[] { 10, 20 });
+ }
+
+ [Fact]
+ public async Task DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped()
+ {
+ // Two threads concurrently swap parent.Child. After both swaps complete, a leaf mutation
+ // on the current child must be captured. SharedDeliveryQueue serialises the level-0
+ // signals on the drainer, so the final level-1 subscription always targets parent.Child's
+ // current value.
+ const int iterations = 50;
+ var losses = 0;
+
+ for (var iter = 0; iter < iterations; iter++)
+ {
+ var parent = new ParentModel { Child = new ChildModel { Age = 0 } };
+ var emissions = new List();
+
+ using var sub = parent.WhenPropertyChanged(p => p.Child!.Age, notifyOnInitialValue: false)
+ .Subscribe(pv => { lock (emissions) emissions.Add(pv.Value); });
+
+ var newChild1 = new ChildModel { Age = 1 };
+ var newChild2 = new ChildModel { Age = 2 };
+
+ using var barrier = new Barrier(2);
+ var taskA = Task.Run(() => { barrier.SignalAndWait(); parent.Child = newChild1; });
+ var taskB = Task.Run(() => { barrier.SignalAndWait(); parent.Child = newChild2; });
+ await Task.WhenAll(taskA, taskB).WaitAsync(ConditionTimeout);
+
+ var winner = parent.Child;
+ if (winner is null)
+ {
+ continue;
+ }
+
+ winner.Age = 99;
+
+ WaitForCondition(() => { lock (emissions) return emissions.Contains(99); });
+
+ lock (emissions)
+ {
+ if (!emissions.Contains(99))
+ {
+ losses++;
+ }
+ }
+ }
+
+ losses.Should().Be(0, $"out of {iterations} iterations, {losses} dropped the leaf event on the post-swap winner");
+ }
+
+ [Fact]
+ public async Task DeepChain_FiveLevels_AllLevelsMutatedConcurrently_FinalEmissionMatchesActual()
+ {
+ // Torture: five worker threads each mutating at a different level of a 5-level chain.
+ // Mutations that land on detached subtrees are ignored (their notifier subscriptions were
+ // disposed by ResubscribeFrom). Mutations on the live chain reach the drainer.
+ //
+ // Three invariants per iteration:
+ // (a) Rx contract: ValidateSynchronization catches any concurrent OnNext on the user
+ // observer (a SharedDeliveryQueue serialisation failure).
+ // (b) Value legality: every emission must be a value that some thread legitimately
+ // wrote.
+ // (c) Final consistency: after Task.WhenAll the drainer continues until the queue is
+ // empty. The last processed signal triggers a ReadCurrent against the now-frozen
+ // chain state, so emissions.Last() == ReadCurrent().
+ const int iterations = 50;
+ const int mutationsPerThread = 200;
+ var mismatches = 0;
+
+ for (var iter = 0; iter < iterations; iter++)
+ {
+ var root = NewDeepChain(0);
+ var emissions = new List();
+
+ using var sub = root.WhenPropertyChanged(r => r.Child!.Child!.Child!.Child!.Leaf, notifyOnInitialValue: true)
+ .ValidateSynchronization()
+ .Subscribe(pv => { lock (emissions) emissions.Add(pv.Value); });
+
+ using var barrier = new Barrier(5);
+ var iterSeed = iter * 10_000;
+ var tasks = new[]
+ {
+ Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (var i = 0; i < mutationsPerThread; i++)
+ {
+ root.Child = NewDeep2(iterSeed + 40_000 + i);
+ }
+ }),
+ Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (var i = 0; i < mutationsPerThread; i++)
+ {
+ var l2 = root.Child;
+ if (l2 is not null) l2.Child = NewDeep3(iterSeed + 30_000 + i);
+ }
+ }),
+ Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (var i = 0; i < mutationsPerThread; i++)
+ {
+ var l3 = root.Child?.Child;
+ if (l3 is not null) l3.Child = NewDeep4(iterSeed + 20_000 + i);
+ }
+ }),
+ Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (var i = 0; i < mutationsPerThread; i++)
+ {
+ var l4 = root.Child?.Child?.Child;
+ if (l4 is not null) l4.Child = new Deep5 { Leaf = iterSeed + 10_000 + i };
+ }
+ }),
+ Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (var i = 0; i < mutationsPerThread; i++)
+ {
+ var l5 = root.Child?.Child?.Child?.Child;
+ if (l5 is not null) l5.Leaf = i;
+ }
+ }),
+ };
+
+ await Task.WhenAll(tasks).WaitAsync(ConditionTimeout);
+
+ var actualFinal = root.Child!.Child!.Child!.Child!.Leaf;
+
+ WaitForCondition(() => { lock (emissions) return emissions.Count > 0 && emissions[^1] == actualFinal; });
+
+ var legal = new HashSet { 0 };
+ for (var i = 0; i < mutationsPerThread; i++)
+ {
+ legal.Add(i);
+ legal.Add(iterSeed + 10_000 + i);
+ legal.Add(iterSeed + 20_000 + i);
+ legal.Add(iterSeed + 30_000 + i);
+ legal.Add(iterSeed + 40_000 + i);
+ }
+
+ lock (emissions)
+ {
+ emissions.Should().NotBeEmpty($"iter {iter}: notifyOnInitialValue=true requires at least the initial emission");
+ emissions[0].Should().Be(0, $"iter {iter}: first emission must be the initial value");
+
+ var illegal = emissions.Where(v => !legal.Contains(v)).ToList();
+ illegal.Should().BeEmpty($"iter {iter}: every emission must be a value some thread wrote; saw {string.Join(",", illegal.Take(5))}");
+
+ if (emissions.Count == 0 || emissions[^1] != actualFinal)
+ {
+ mismatches++;
+ }
+ }
+ }
+
+ mismatches.Should().Be(0, $"out of {iterations} iterations, {mismatches} ended with the last emission not matching the actual final chain leaf");
+ }
+
+ [Fact(Skip = "AutoRefresh has a separate concurrency bug; tracked separately")]
+ public async Task AutoRefreshThenFilter_ConcurrentAddsAndPropertyActivation_AllItemsObserved()
+ {
+ // One adder thread sequentially adds items to the cache while a single flipper thread
+ // concurrently sets each item's Activated to true. Final filter contents must include
+ // every item (every item ends Activated=true).
+ //
+ // KeyedActivable's setter only raises PropertyChanged on actual value change, so a
+ // dropped false->true transition is unrecoverable.
+ //
+ // The race lives in AutoRefresh's internal Publish multicast: Sub 1 (Filter path)
+ // receives the Add and reads the property before Sub 2 (MergeMany) subscribes the
+ // per-item refresh handler. A concurrent flip landing in that gap is dropped. This
+ // is not a WhenPropertyChanged issue: AutoRefresh calls WhenPropertyChanged with
+ // notifyInitial=false, so the per-item subscribe attaches the handler immediately
+ // and has no internal race window.
+ const int iterations = 100;
+ const int itemCount = 200;
+
+ for (var iter = 0; iter < iterations; iter++)
+ {
+ using var cache = new SourceCache(x => x.Id);
+ var items = Enumerable.Range(0, itemCount).Select(i => new KeyedActivable(i)).ToList();
+
+ using var results = cache.Connect()
+ .AutoRefresh(x => x.Activated)
+ .Filter(x => x.Activated)
+ .AsAggregator();
+
+ using var barrier = new Barrier(2);
+
+ var adder = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ foreach (var item in items) cache.AddOrUpdate(item);
+ });
+
+ var flipper = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ foreach (var item in items) item.Activated = true;
+ });
+
+ await Task.WhenAll(adder, flipper).WaitAsync(ConditionTimeout);
+
+ var expected = items.Select(x => x.Id).ToHashSet();
+ WaitForCondition(() => results.Data.Keys.ToHashSet().SetEquals(expected));
+
+ var actual = results.Data.Keys.ToHashSet();
+ actual.Should().BeEquivalentTo(expected, $"iter {iter}: every item ends Activated=true and must appear in the filter (missing: {string.Join(",", expected.Except(actual))})");
+ results.Error.Should().BeNull($"iter {iter}: pipeline must not error");
+ }
+ }
+
+ [Fact(Skip = "AutoRefresh has a separate concurrency bug; tracked separately")]
+ public async Task AutoRefreshThenFilter_DualSubscribers_AllItemsObserved()
+ {
+ // Two independent cache subscribers running on the ThreadPool:
+ // Sub 1 (mutator): on every Add change, flips item.Activated to true
+ // Sub 2 (filter chain): AutoRefresh + Filter (filter = Activated)
+ // Items start with Activated=false (filtered out). The mutator flips every item, so
+ // the final filter contents must include every item.
+ //
+ // Same root cause as the single-flipper variant above: AutoRefresh's internal Publish
+ // multicasts the Add to the Filter path before MergeMany subscribes the per-item
+ // refresh handler. The mutator's flip can land in that gap and be dropped.
+ const int iterations = 100;
+ const int itemCount = 200;
+
+ for (var iter = 0; iter < iterations; iter++)
+ {
+ using var cache = new SourceCache(x => x.Id);
+ var items = Enumerable.Range(0, itemCount).Select(i => new KeyedActivable(i)).ToList();
+
+ using var mutator = cache.Connect()
+ .ObserveOn(TaskPoolScheduler.Default)
+ .Subscribe(changes =>
+ {
+ foreach (var change in changes)
+ {
+ if (change.Reason == ChangeReason.Add)
+ {
+ change.Current.Activated = true;
+ }
+ }
+ });
+
+ using var results = cache.Connect()
+ .ObserveOn(TaskPoolScheduler.Default)
+ .AutoRefresh(x => x.Activated)
+ .Filter(x => x.Activated)
+ .AsAggregator();
+
+ foreach (var item in items) cache.AddOrUpdate(item);
+
+ var expected = items.Select(x => x.Id).ToHashSet();
+ WaitForCondition(() => results.Data.Keys.ToHashSet().SetEquals(expected));
+
+ var actual = results.Data.Keys.ToHashSet();
+ actual.Should().BeEquivalentTo(expected, $"iter {iter}: every item was flipped to Activated=true by the mutator and must appear in the filter (missing: {string.Join(",", expected.Except(actual))})");
+ results.Error.Should().BeNull($"iter {iter}: pipeline must not error");
+ }
+ }
+
+ private static Deep1 NewDeepChain(int leaf) =>
+ new Deep1 { Child = NewDeep2(leaf) };
+
+ private static Deep2 NewDeep2(int leaf) =>
+ new Deep2 { Child = NewDeep3(leaf) };
+
+ private static Deep3 NewDeep3(int leaf) =>
+ new Deep3 { Child = NewDeep4(leaf) };
+
+ private static Deep4 NewDeep4(int leaf) =>
+ new Deep4 { Child = new Deep5 { Leaf = leaf } };
+
+ private static void WaitForCondition(Func condition, TimeSpan? timeout = null) =>
+ SpinWait.SpinUntil(condition, timeout ?? ConditionTimeout);
+
+ private sealed class Item : INotifyPropertyChanged
+ {
+ private int _value;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public int Id { get; init; }
+
+ public int Value
+ {
+ get => _value;
+ set
+ {
+ _value = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value)));
+ }
+ }
+ }
+
+ private sealed class ParentModel : INotifyPropertyChanged
+ {
+ private ChildModel? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public ChildModel? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class ChildModel : INotifyPropertyChanged
+ {
+ private int _age;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public int Age
+ {
+ get => _age;
+ set
+ {
+ _age = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Age)));
+ }
+ }
+ }
+
+ private sealed class Deep1 : INotifyPropertyChanged
+ {
+ private Deep2? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public Deep2? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class Deep2 : INotifyPropertyChanged
+ {
+ private Deep3? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public Deep3? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class Deep3 : INotifyPropertyChanged
+ {
+ private Deep4? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public Deep4? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class Deep4 : INotifyPropertyChanged
+ {
+ private Deep5? _child;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public Deep5? Child
+ {
+ get => _child;
+ set
+ {
+ _child = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Child)));
+ }
+ }
+ }
+
+ private sealed class Deep5 : INotifyPropertyChanged
+ {
+ private int _leaf;
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public int Leaf
+ {
+ get => _leaf;
+ set
+ {
+ _leaf = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Leaf)));
+ }
+ }
+ }
+
+ private sealed class KeyedActivable : INotifyPropertyChanged
+ {
+ private bool _activated;
+
+ public KeyedActivable(int id)
+ {
+ Id = id;
+ }
+
+ public event PropertyChangedEventHandler? PropertyChanged;
+
+ public int Id { get; }
+
+ public bool Activated
+ {
+ get => _activated;
+ set
+ {
+ if (_activated == value) return;
+ _activated = value;
+ PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Activated)));
+ }
+ }
+ }
+}
diff --git a/src/DynamicData/Binding/ObservablePropertyFactory.cs b/src/DynamicData/Binding/ObservablePropertyFactory.cs
index d9f9be00c..4dd3fbd68 100644
--- a/src/DynamicData/Binding/ObservablePropertyFactory.cs
+++ b/src/DynamicData/Binding/ObservablePropertyFactory.cs
@@ -1,12 +1,16 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
+using System.Collections.Generic;
using System.ComponentModel;
using System.Linq.Expressions;
using System.Reactive;
+using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.Binding;
internal sealed class ObservablePropertyFactory
@@ -14,74 +18,279 @@ internal sealed class ObservablePropertyFactory
{
private readonly Func>> _factory;
- public ObservablePropertyFactory(Func valueAccessor, ObservablePropertyPart[] chain) =>
- _factory = (t, notifyInitial) =>
+ public ObservablePropertyFactory(Func valueAccessor, ObservablePropertyPart[] chain)
+ {
+ // chain is leaf-first (output of SplitIntoSteps). Reverse once to root-to-leaf order.
+ var rootToLeaf = chain.AsEnumerable().Reverse().ToArray();
+ _factory = (source, notifyInitial) => Observable.Create>(
+ observer => new DeepChainSubscription(observer, source, rootToLeaf, valueAccessor, notifyInitial));
+ }
+
+ public ObservablePropertyFactory(Expression> expression)
+ {
+ // Shallow form: single property, no chain. Used when depth == 1. Skips SharedDeliveryQueue
+ // and Observable.FromEventPattern in favour of a direct PropertyChanged += handler for
+ // the high-frequency single-property hot path.
+ var memberName = expression.GetProperty().Name;
+ var accessor = expression.Compile();
+ _factory = (source, notifyInitial) => Observable.Create>(
+ observer => new SinglePropertySubscription(observer, source, memberName, accessor, notifyInitial));
+ }
+
+ public IObservable> Create(TObject source, bool notifyInitial) => _factory(source, notifyInitial);
+
+ // Single-property subscription. Attaches a direct PropertyChanged handler and forwards every
+ // event through a DeliveryQueue. Used for x => x.Prop (depth == 1) where SharedDeliveryQueue
+ // and Observable.FromEventPattern would be needless overhead on the hot path.
+ //
+ // notifyInitial only controls whether the constructor synthesises an initial emission. There
+ // is no equality dedup at the subscribe seam: a same-valued PropertyChanged firing in the
+ // subscribe window is a legitimate event and must be delivered. The "never drop events"
+ // contract takes precedence over avoiding a benign duplicate.
+ private sealed class SinglePropertySubscription : IDisposable
+ {
+ private readonly TObject _source;
+ private readonly string _memberName;
+ private readonly Func _accessor;
+ private readonly DeliveryQueue> _queue;
+
+ public SinglePropertySubscription(
+ IObserver> observer,
+ TObject source,
+ string memberName,
+ Func accessor,
+ bool notifyInitial)
{
- // 1) notify when values have changed
- // 2) resubscribe when changed because it may be a child object which has changed
- var valueHasChanged = GetNotifiers(t, chain).Merge().Take(1).Repeat();
+ _source = source;
+ _memberName = memberName;
+ _accessor = accessor;
+ _queue = new DeliveryQueue>(observer);
+
+ // Attach PropertyChanged handler FIRST so events during the initial read are not missed.
+ _source.PropertyChanged += OnPropertyChanged;
+
if (notifyInitial)
{
- valueHasChanged = Observable.Defer(() => Observable.Return(Unit.Default)).Concat(valueHasChanged);
+ EmitCurrent();
}
+ }
- return valueHasChanged.Select(_ => GetPropertyValue(t, chain, valueAccessor));
- };
+ public void Dispose()
+ {
+ _source.PropertyChanged -= OnPropertyChanged;
+ _queue.Dispose();
+ }
- public ObservablePropertyFactory(Expression> expression)
+ private void OnPropertyChanged(object? sender, PropertyChangedEventArgs args)
+ {
+ if (args.PropertyName == _memberName)
+ {
+ EmitCurrent();
+ }
+ }
+
+ // Reads the current property value and forwards it through the queue. The accessor is
+ // user code and may throw; that exception routes to OnError. The downstream OnNext call
+ // is NOT wrapped: per the Rx contract, if the user observer throws, the exception
+ // propagates back to whoever invoked the PropertyChanged setter, matching what a plain
+ // Subject.OnNext would do.
+ private void EmitCurrent()
+ {
+ PropertyValue value;
+ try
+ {
+ value = new PropertyValue(_source, _accessor(_source));
+ }
+ catch (Exception ex)
+ {
+ _queue.OnError(ex);
+ return;
+ }
+
+ _queue.OnNext(value);
+ }
+ }
+
+ // Deep-chain subscription. Encapsulates the SharedDeliveryQueue + sub-queues + per-level
+ // SerialDisposable slots so fields are assigned in well-defined order and the signal sub-queue
+ // never needs a forward null bootstrap.
+ //
+ // SharedDeliveryQueue funnels both chain-level signals and user emissions through a
+ // single-drainer pattern. Two sub-queues:
+ // - userSub: receives PropertyValue emissions; drains to the user observer.
+ // - signalSub: receives level-change signals (int); drains by running ProcessSignal
+ // synchronously, which performs the re-walk (ResubscribeFrom) and enqueues the
+ // resulting value onto userSub.
+ // Drain order is LIFO (highest sub-queue index first), so signal processing runs before
+ // user delivery within each drain cycle, batching multiple signals before delivering the
+ // resulting values.
+ //
+ // Three properties this gives us:
+ // 1) Rx contract: user observer is never called concurrently (single drainer).
+ // 2) Deadlock immunity: a blocking user observer parks ONLY the drainer thread;
+ // concurrent producers enqueue and return immediately.
+ // 3) Concurrent-mutation safety: ResubscribeFrom runs on the drainer thread, so
+ // two threads racing to reassign the same intermediate property cannot leave a
+ // SerialDisposable slot subscribed to the loser; the drainer's loop processes
+ // every queued signal in order against the current chain state.
+ //
+ // notifyInitial only controls whether ProcessSignal emits the current chain value during
+ // the InitialSetupSignal pass. There is no equality dedup at the subscribe seam: every
+ // chain event is delivered.
+ private sealed class DeepChainSubscription : IDisposable
{
- // this overload is used for shallow observations i.e. depth = 1, so no need for re-subscriptions
- var member = expression.GetProperty();
- var accessor = expression.Compile();
+ // Sentinel signal value enqueued during subscribe to perform the initial chain setup
+ // from inside the SharedDeliveryQueue drainer.
+ private const int InitialSetupSignal = -1;
+
+ private readonly TObject _source;
+ private readonly ObservablePropertyPart[] _rootToLeaf;
+ private readonly Func _valueAccessor;
+ private readonly bool _notifyInitial;
+ private readonly SharedDeliveryQueue _sharedQueue;
+ private readonly DeliverySubQueue> _userSub;
+ private readonly DeliverySubQueue _signalSub;
+ private readonly SerialDisposable[] _levelSlots;
- _factory = (t, notifyInitial) =>
+ // Pre-allocated per-level notifier callbacks. Indexed by level. ResubscribeFrom reuses
+ // these instead of allocating a fresh closure per re-walk.
+ private readonly Action[] _levelCallbacks;
+
+ public DeepChainSubscription(
+ IObserver> observer,
+ TObject source,
+ ObservablePropertyPart[] rootToLeaf,
+ Func valueAccessor,
+ bool notifyInitial)
{
- PropertyValue Factory() => new(t, accessor(t));
+ _source = source;
+ _rootToLeaf = rootToLeaf;
+ _valueAccessor = valueAccessor;
+ _notifyInitial = notifyInitial;
+
+ _sharedQueue = new SharedDeliveryQueue();
+ _userSub = _sharedQueue.CreateQueue(observer);
- var propertyChanged = Observable.FromEventPattern(handler => t.PropertyChanged += handler, handler => t.PropertyChanged -= handler).Where(args => args.EventArgs.PropertyName == member.Name).Select(_ => Factory());
+ // ProcessSignal references _signalSub indirectly via ResubscribeFrom's notifier
+ // subscriptions. Observer.Create stores the method group without invoking it, and
+ // _signalSub is assigned by the surrounding expression, so by the time anything calls
+ // ProcessSignal the field is set.
+ _signalSub = _sharedQueue.CreateQueue(Observer.Create(ProcessSignal));
- if (!notifyInitial)
+ var depth = rootToLeaf.Length;
+ _levelSlots = new SerialDisposable[depth];
+ _levelCallbacks = new Action[depth];
+ for (var i = 0; i < depth; i++)
{
- return propertyChanged;
+ _levelSlots[i] = new SerialDisposable();
+ var level = i;
+ _levelCallbacks[i] = _ => _signalSub.OnNext(level);
}
- var initial = Observable.Defer(() => Observable.Return(Factory()));
- return initial.Concat(propertyChanged);
- };
- }
+ // Kick off initial chain setup via the drainer. The subscribe thread becomes the
+ // drainer (no one else is draining yet on a fresh subscription) and runs
+ // ProcessSignal(InitialSetupSignal) synchronously, which attaches the chain and
+ // emits the initial value.
+ _signalSub.OnNext(InitialSetupSignal);
+ }
- public IObservable> Create(TObject source, bool notifyInitial) => _factory(source, notifyInitial);
+ public void Dispose()
+ {
+ foreach (var slot in _levelSlots)
+ {
+ slot.Dispose();
+ }
- // create notifier for all parts of the property path
- private static IEnumerable> GetNotifiers(TObject source, IEnumerable chain)
- {
- object? value = source;
- foreach (var metadata in chain.Reverse())
+ _signalSub.Dispose();
+ _userSub.Dispose();
+ _sharedQueue.Dispose();
+ }
+
+ private void ProcessSignal(int level)
{
- var obs = metadata.Factory(value).Publish().RefCount();
- value = metadata.Invoker(value);
- yield return obs;
+ // Drainer thread. The chain walk (Invoker / notifier Factory / ReadCurrent's accessor)
+ // is user code and may throw; those exceptions route to OnError. The downstream
+ // OnNext call is NOT wrapped: per the Rx contract, if the user observer throws, the
+ // exception propagates back through the drainer, matching what a plain Subject
+ // would do.
+ //
+ // The two cases (initial setup vs level-fire) collapse to:
+ // startLevel = (initial) ? 0 : level + 1
+ // emit = (level-fire) || _notifyInitial
+ var isInitial = level == InitialSetupSignal;
+ var shouldEmit = !isInitial || _notifyInitial;
+ PropertyValue value;
+ try
+ {
+ ResubscribeFrom(isInitial ? 0 : level + 1);
+ if (!shouldEmit)
+ {
+ return;
+ }
- if (value is null)
+ value = ReadCurrent();
+ }
+ catch (Exception ex)
{
- yield break;
+ _userSub.OnError(ex);
+ return;
}
+
+ _userSub.OnNext(value);
}
- }
- // walk the tree and break at a null, or return the value [should reduce this to a null an expression]
- private static PropertyValue GetPropertyValue(TObject source, IEnumerable chain, Func valueAccessor)
- {
- object? value = source;
- foreach (var metadata in chain.Reverse())
+ private void ResubscribeFrom(int startLevel)
{
- value = metadata.Invoker(value);
- if (value is null)
+ var depth = _rootToLeaf.Length;
+ if (startLevel >= depth)
+ {
+ return;
+ }
+
+ object? value = _source;
+ for (var i = 0; i < startLevel; i++)
+ {
+ value = _rootToLeaf[i].Invoker(value);
+ if (value is null)
+ {
+ for (var j = startLevel; j < depth; j++)
+ {
+ _levelSlots[j].Disposable = Disposable.Empty;
+ }
+
+ return;
+ }
+ }
+
+ for (var i = startLevel; i < depth; i++)
{
- return new PropertyValue(source);
+ if (value is null)
+ {
+ _levelSlots[i].Disposable = Disposable.Empty;
+ continue;
+ }
+
+ var notifier = _rootToLeaf[i].Factory(value);
+ _levelSlots[i].Disposable = notifier.Subscribe(_levelCallbacks[i]);
+
+ value = _rootToLeaf[i].Invoker(value);
}
}
- return new PropertyValue(source, valueAccessor(source));
+ // Root-to-leaf chain walk. Stops at null and returns an unobtainable PropertyValue.
+ private PropertyValue ReadCurrent()
+ {
+ object? value = _source;
+ foreach (var metadata in _rootToLeaf)
+ {
+ value = metadata.Invoker(value);
+ if (value is null)
+ {
+ return new PropertyValue(_source);
+ }
+ }
+
+ return new PropertyValue(_source, _valueAccessor(_source));
+ }
}
}