Skip to content

WhenPropertyChanged: don't drop events fired during subscribe#1111

Open
dwcullop wants to merge 17 commits into
reactivemarbles:mainfrom
dwcullop:bugfix/whenpropertychanged_race
Open

WhenPropertyChanged: don't drop events fired during subscribe#1111
dwcullop wants to merge 17 commits into
reactivemarbles:mainfrom
dwcullop:bugfix/whenpropertychanged_race

Conversation

@dwcullop

@dwcullop dwcullop commented Jun 13, 2026

Copy link
Copy Markdown
Member

Fixes #1110.

WhenPropertyChanged and WhenValueChanged could silently drop a PropertyChanged event that fired during the call to Subscribe. The shape initial.Concat(propertyChanged) only attaches the event handler after the initial value has been delivered, leaving a gap. For deep chains, every level had the same gap, and a mid-chain swap could race against the re-walk that re-attaches deeper levels.

What changed

The factory's two construction paths each get a dedicated private nested class inside ObservablePropertyFactory<TObject, TProperty>:

SinglePropertySubscription : IDisposable (depth-1, the high-frequency UI-binding path):

  • Attaches the PropertyChanged handler as the first action of its constructor, before any optional initial emit. No operator-stack delay, no initial.Concat gap.
  • User observer is wrapped in a DeliveryQueue<T> so concurrent PropertyChanged invocations on the same item are serialised end-to-end.
  • EmitCurrent wraps both the accessor call AND the downstream OnNext in try/catch and routes failures through TryOnError. If the downstream observer's OnError itself throws, the inner catch swallows it so the secondary exception cannot escape back into the PropertyChanged setter that triggered the emission.

DeepChainSubscription : IDisposable (depth-2+):

  • Per-level SerialDisposable slots hold the level notifier subscriptions. ResubscribeFrom(startLevel) walks the chain and re-attaches deeper levels in place.
  • A SharedDeliveryQueue funnels two sub-queues into a single drainer: a higher-index signal queue carrying chain-level signals (drained first, LIFO), and a lower-index user queue feeding the downstream observer. Single-drainer serialisation gives the Rx contract on the user observer and deadlock-immunity if that observer blocks (concurrent producers enqueue and return).
  • An InitialSetupSignal = -1 sentinel runs the initial chain attachment from inside the drainer, so a notifier-fire arriving while the chain is being walked enqueues onto the same drainer instead of racing against it.
  • Per-level callback delegates are pre-allocated in the constructor and reused across re-walks; ResubscribeFrom does not allocate a fresh closure per level per fire.
  • ProcessSignal is wrapped in try/catch and routes invoker/factory/accessor failures through TryOnError with the same swallow-secondary-throw discipline.

No equality dedup anywhere. Every PropertyChanged event delivers, including same-valued ones and including race-window ones. Same per-event emissions as main, minus the dropped-event bug: the brief window where main's initial.Concat was unsubscribed from the event source no longer drops the events that fire in it.

Why not Switch / DistinctUntilChanged

An earlier iteration used recursive Observable.Create + Select(...).Switch() composition. Structurally cleaner but it deadlocks: Rx's Switch holds an internal gate during downstream OnNext propagation, so an observer that blocks synchronously (UI bindings often do) wedges every concurrent upstream emission. Putting DeliveryQueue downstream of Switch does not help because Switch's gate is upstream.

A continuous DistinctUntilChanged would change observable behavior in a user-visible way: duplicate values would be silently dropped forever, not just at the subscribe seam.

Tests

New regression fixture WhenPropertyChangedRaceFixture covers:

  • Shallow concurrent mutation during initial emit (observer-block forcing pattern).
  • Shallow notifyOnInitialValue: false still subscribes the handler before returning.
  • Every PropertyChanged event reaches the observer regardless of notifyOnInitialValue, including same-valued setters after the initial (shallow + deep).
  • Deep-chain post-swap leaf events on the new child are captured.
  • Deep-chain concurrent leaf mutation during initial emit.
  • Deep-chain concurrent parent-swap, 50 iterations, leaf event on the post-swap winner is always captured.
  • Deep-chain 5-level mid-chain swap re-targets deeper levels correctly.
  • Deep-chain torture: true 5-level chain with five worker threads, each mutating at a different level concurrently (root subtree swaps, mid-level swaps, leaf int mutations). 50 iterations, 200 mutations per thread per iteration. Asserts ValidateSynchronization (no concurrent OnNext), every emission is in the legal value set, first emission is the initial value, and last emission equals ReadCurrent() after the storm settles.
  • AutoRefresh + Filter integration: SourceCache + AutoRefresh(IsActive) + Filter(IsActive) with four mutator threads concurrently writing the same per-item final value to every pre-populated item. Uses AsAggregator; filter contents must match the per-item finalActive map.

All 150 Binding tests pass; the new torture tests run 10/10 stable.

ObservablePropertyFactory used initial.Concat(events) for both the shallow and deep-chain forms. Concat subscribes to the second source (the PropertyChanged event handler) only AFTER the first (the initial value) completes. Any PropertyChanged notification that fired during that gap was silently dropped.

The deep-chain form had an additional gap: Take(1).Repeat tore down all chain notifiers and then re-subscribed via GetNotifiers, losing any events that fired during the re-walk.

Fix:

1. Shallow form: rewrite with Observable.Create. Attach the PropertyChanged event handler FIRST so no events are missed during the subscribe window. Use Interlocked.CompareExchange on initialClaimed to ensure exactly one first emission (either the initial or the first handler-fired event, whichever wins the race). A one-shot Interlocked-CAS dedup guard catches the rare setter-update-then-notify duplicate that the CAS cannot otherwise distinguish.

2. Deep-chain form: per-level SerialDisposable. ResubscribeFrom(level) atomically swaps each level's subscription slot to the new value's notifier (subscribe new before disposing old via SerialDisposable.Disposable=). At all times, every live chain level has an active notifier; no re-walk gap. Initial-emit uses the same CAS+dedup pattern as the shallow form.

Both fixes are lock-free: only Interlocked.CompareExchange and Volatile read/write. The one-shot dedup guard uses EqualityComparer<TProperty>.Default exactly once per subscription, at the boundary between the initial and the first handler emission, not as a continuous DistinctUntilChanged.

Regression tests in WhenPropertyChangedRaceFixture force the race deterministically by parking the observer's OnNext for the initial value while a separate thread mutates the property. Verified RED on main (3 of 4 tests fail), GREEN with fix (4 of 4 pass). Stability check 20/20.

Tests: Binding suite 145/145 pass. Full suite 2339/2339 pass (excluding one pre-existing flake unrelated to this branch: SuspendNotificationsFixture.ConcurrentSuspendDuringResumeDoesNotCorrupt which fails on main too).
@dwcullop

Copy link
Copy Markdown
Member Author

This fix is a lot, but I hit this in production. It would also explain a number of bugs that I never could figure out but went away on their own.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a TOCTOU subscribe-time race in WhenPropertyChanged / WhenValueChanged by ensuring PropertyChanged handlers are attached before the initial read, and by changing deep-chain (Parent.Child.Property) orchestration to avoid gaps during re-walks. It also adds targeted regression tests to deterministically reproduce the race.

Changes:

  • Reworks ObservablePropertyFactory to attach handlers first and use CAS/dedup logic to pick a single “first” emission and avoid initial/first-event duplicates.
  • Replaces deep-chain Take(1).Repeat() re-walk logic with per-level SerialDisposable slots to avoid re-walk unsubscribe gaps.
  • Adds WhenPropertyChangedRaceFixture regression tests covering shallow + deep-chain subscribe-time races.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 12 comments.

File Description
src/DynamicData/Binding/ObservablePropertyFactory.cs Refactors shallow + deep property observation to avoid dropping PropertyChanged events during subscription and reduce deep-chain rewalk gaps.
src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Adds deterministic regression tests to reproduce/validate the subscribe-time race scenarios.

Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
dwcullop added 3 commits June 12, 2026 22:58
The per-level SerialDisposable approach still allowed events to be dropped when two threads concurrently mutated the same intermediate property. Both fired the same notifier; both ResubscribeFrom calls raced; whichever SerialDisposable.Disposable= swap landed last won, even if that thread's pre-walk had read a stale value. The slot would end up subscribed to the LOSER of the property setter race, and subsequent events on the actual current value were lost.

Add a single-drainer pattern: notifier handlers signal _minDirtyLevel (Interlocked CAS loop on the minimum dirty level) and the winner of an Interlocked CAS on _drainerActive runs the actual re-walk. Others return immediately. The drainer loops until no signals remain, then re-checks once more to catch signals that arrived during the release. Initial subscription claims the drainer for the duration of ResubscribeFrom(0) + initial Emit so concurrent fires queue and process after.

All work serialized through a single thread; no concurrent re-walks possible; the FINAL slot state always reflects the LATEST chain state because the drainer's last iteration always reads the current value.

Lock-free. Only Interlocked, Volatile, and SerialDisposable's atomic swap.

DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped: statistical test (500 iterations) that triggered the race in ~10 percent of runs on the prior implementation; now 0 of 500. Stability check 10/10. DeepChain_FiveLevels_MidChainSwap_DeeperLevelsRetargetCorrectly: structural test for the depth-5 mid-chain re-attach case.

Tests: Binding suite 147/147 pass.
The drainer pattern was overengineered. The Rx-idiomatic shape for `observe a property chain where each level can be reassigned` is a recursive composition: each level is an ObserveLevel emitting current-then-changes, and the chain is built with .Select(child => deeper).Switch(). When a parent fires, Switch atomically subscribes to the new deeper chain and disposes the old; no SerialDisposable bookkeeping, no min-dirty-level signaling, no CAS-claimed drainer.

ObserveLevel attaches the PropertyChanged handler BEFORE reading the initial value (same shallow-form fix), so events fired during the per-level subscribe window are not missed. The outer subscriber still applies the CAS-based first-emission-wins and one-shot dedup at the boundary to handle the initial-emit race.

Trade-off: concurrent mutations of the SAME observed property from multiple threads (which is an Rx contract violation by the caller) can leave Switch's lock-acquisition order out of sync with the user's setter-completion order. Well-behaved INPC usage serializes mutations on observed properties; the simplified design relies on that contract. Removed the DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped test and the block-observer-during-initial-emit deep-chain test (the latter deadlocked against Switch's internal lock by design).

Net diff: -242 lines. ObservablePropertyFactory shrank from ~330 lines to ~175. Binding suite 145/145 pass.
Replace the recursive Switch composition with a single SharedDeliveryQueue that funnels two sub-queues: a high-index signal queue carrying level-change notifications, and a low-index emission queue for the user observer. The drainer processes signals first (LIFO), running ResubscribeFrom and Emit serialized against itself, then delivers user emissions last so they observe the latest chain layout.

An InitialSetupSignal sentinel funnels the initial chain attachment through the same drainer, closing the subscribe gap without taking a separate lock.

Switch is removed entirely: its internal gate held during downstream OnNext deadlocked any observer that blocked synchronously, and adding DeliveryQueue downstream of Switch could not break the cycle.

Re-adds the two concurrent regression tests that previously deadlocked or relied on the drainer:

- DeepChain_ConcurrentLeafMutationDuringInitialEmit_NotDropped

- DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped (500 iterations)

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.

Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
dwcullop added 10 commits June 13, 2026 00:19
Production:

- Dedup window is now armed only when notifyInitial is true. When the caller didn't ask for an initial value, two consecutive same-valued PropertyChanged events are both legitimate and must both be delivered; the previous code silently dropped the second one.

- Wrap the value accessor / chain walk in try/catch and route exceptions to userSub/queue.OnError. The earlier Rx pipeline got this from Select; the new direct invocation needs it explicitly so a throwing property getter doesn't escape the drainer / PropertyChanged invocation thread.

Tests:

- Add NotifyInitialFalse_DoesNotDedupSameValuedEvents (shallow + deep) covering the dedup gating fix.

- Add timeouts to ManualResetEventSlim.Wait so a failed assertion can't park the observer thread indefinitely. Release observerCanContinue in finally.

- Replace Thread.Sleep with bounded SpinWait.SpinUntil(condition, timeout) via a WaitForCondition helper.

- Capture and dispose the IDisposable returned by Subscribe inside Task.Run so the PropertyChanged handler is detached at test end.

- Remove unused subscribeCompleted local.
Two follow-ups after CI flaked on heavily-loaded shared runners:

- Flatten the deep-chain disposable from nested CompositeDisposable to a single composite via collection-expression spread (avoids the redundant inner CompositeDisposable allocation around levelSlots).

- Bump the default WaitForCondition timeout from 5s to 30s and route all ManualResetEventSlim / subscribeTask.Wait calls through it. Locally these waits return in <1ms; the larger budget only matters when CI is under heavy load.

- Reduce DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped from 500 to 50 iterations. With SharedDeliveryQueue the outcome is deterministic, so a single iteration proves correctness; 50 is defence in depth. Also drop the unnecessary intermediate WaitForCondition since Task.WaitAll already implies the drainer has fully drained both queued signals.

Local: 7/7 race tests pass in ~60ms; 10/10 stability runs clean.
…cription

Three improvements:

- Extract the dedup state machine into a private Emitter : IObserver<T> class. Both factories now wrap their downstream queue in an Emitter; the initialClaimed / dedupArmed / seedValue trio and the PropertyValuesEqual helper live in one place instead of being copy-pasted across two constructors.

- Encapsulate the deep-chain runtime in a private DeepChainSubscription : IDisposable class. Fields are default-initialized before the constructor body runs and assigned in well-defined order, which eliminates the DeliverySubQueue<int>? signalSub = null bootstrap (the field is always assigned before any code path that could read it). The InitialSetupSignal sentinel + drainer flow is unchanged.

- Reduce the shallow factory to the single-property hot path with a small EmitCurrent helper for the accessor try/catch, removing the second copy of the dedup state machine.

Behaviour is unchanged. 148/148 Binding tests pass; race fixture 10/10 stable.
Extract the shallow-form runtime into a SinglePropertySubscription : IDisposable class with the same shape as DeepChainSubscription: constructor takes (observer, source, [chain-or-name], notifyInitial) and assigns all fields in well-defined order; Dispose tears down handler + queue. The two factories now each become a one-liner Observable.Create that constructs the appropriate subscription.

EmitCurrent and OnPropertyChanged become instance methods on SinglePropertySubscription, removing the last shared static helper and keeping all per-subscription state contained.

Behaviour unchanged. 148/148 Binding tests pass; race fixture 10/10 stable.
Match DeepChainSubscription.ProcessSignal's pattern: wrap both the value read AND the emission in try/catch. The downstream observer's OnNext is invoked synchronously by the DeliveryQueue drain, so if it throws, the exception was escaping back out through OnPropertyChanged and into the property setter that fired the event. Route the throw to OnError instead.
The initial-setup case and the level-fire case only differ in two scalar derivations: (a) where to start the rewalk (0 vs level+1) and (b) whether to emit (always vs only when _notifyInitial). Compute both up front and let the rest of the method be linear.

No behaviour change; 148/148 Binding tests pass.
Two new race fixture tests:

1. DeepChain_FiveLevels_AllLevelsMutatedConcurrently_FinalEmissionMatchesActual

   Five worker threads each mutate at one level of a depth-5 chain (root subtree swap, mid-level swaps, leaf-int mutations). Many mutations land on detached subtrees and are correctly ignored; mutations on the live chain are processed by the SharedDeliveryQueue drainer in order. After Task.WhenAll the drainer continues until empty; the final emission must equal ReadCurrent() because the last queued signal's ReadCurrent runs against the now-frozen chain state. 50 iterations, 200 mutations per thread, 0 mismatches on every run.

2. AutoRefreshThenFilter_ConcurrentPropertyMutationsOnAddedItems_AllFinalStatesObserved

   End-to-end: SourceCache + AutoRefresh(IsActive) + Filter(IsActive). Cache pre-populated, then four worker threads concurrently set Activated on every item to a per-item randomized final value. Multiple threads writing the same final value generate many concurrent PropertyChanged invocations per item, exercising SinglePropertySubscription's DeliveryQueue under contention. After the storm the filter contents must match the per-item finalActive map.

Deliberately not testing 'mutate while adding' against AutoRefresh: ObservableCache.CreateConnectObservable has the same initial.Concat(_changes) TOCTOU subscribe-window bug as the WhenPropertyChanged shape this PR fixes, and a during-add test would detect that separate cache-side bug as noise unrelated to this PR.

Local: 150/150 Binding tests pass; new tests 10/10 stable.
Last-emission-equals-current proves the drainer reached the end of the queue without corruption, but doesn't catch garbage values or Rx contract violations along the way. Add three additional invariants per iteration:

1. ValidateSynchronization() on the subscription chain. Any concurrent OnNext to the user observer (which would indicate a SharedDeliveryQueue serialization bug) throws UnsynchronizedNotificationException during the test instead of silently producing wrong data.

2. Build the set of values any thread could legitimately have written (initial leaf, the leaf-int range, and each subtree-swap range), then assert every emission is in that set. Catches torn reads or stale-detached-subtree mis-reads.

3. First emission must equal the initial value when notifyInitial=true. Catches initial-emit-dropped bugs that the final-state check could mask if the final state happens to equal the initial.

What this test still does NOT verify: that every mutation which landed on the live chain produced an emission. That requires causal-history reconstruction which isn't tractable from outside the operator.

Local 10/10 stable, ~325ms per run.
Replace the manual HashSet + Subscribe(changes => switch on Reason / Add / Remove) plumbing with .AsAggregator(). The aggregator provides Data (IObservableCache) for current contents and Error for terminal exception state, both thread-safe to read.

Net effect: ~25 lines of manual change tracking collapse to one line plus assertions against results.Data.Keys.
Drops the one-shot equality dedup in the Emitter and removes the Emitter class entirely. SinglePropertySubscription and DeepChainSubscription now forward every emission through their DeliveryQueue / DeliverySubQueue directly. Same-valued PropertyChanged events that follow the initial emission are delivered as legitimate events; nothing in the property pipeline drops events for equality reasons.

Other fixes in the same pass:

- TryOnError helpers wrap both EmitCurrent and ProcessSignal so a downstream observer that throws from OnError cannot propagate the secondary exception back into the PropertyChanged setter (shallow) or the SharedDeliveryQueue drainer (deep).

- DeepChainSubscription pre-allocates one notifier callback per level in the constructor; ResubscribeFrom indexes into _levelCallbacks instead of allocating a fresh closure per re-walk.

- Renamed the existing notifyInitial=false dedup test to PropertyChangedEventsAreNeverDropped_RegardlessOfNotifyInitial and extended it to also cover notifyInitial=true on shallow and deep chains.

- Class summary, in-test commentary, and production rationales rewritten to present-tense contracts; removed migration narrative, PR references, and past-bug descriptors per repo comment instructions.

150/150 Binding tests pass; race fixture 10/10 stable.
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
Comment thread src/DynamicData/Binding/ObservablePropertyFactory.cs Outdated
dwcullop added 2 commits June 14, 2026 09:42
…d tests, let observer throws propagate

Production:

- EmitCurrent (SinglePropertySubscription) and ProcessSignal (DeepChainSubscription) no longer wrap the downstream OnNext in try/catch. Per the Rx contract, if the user observer throws, the exception propagates back to whoever invoked the PropertyChanged setter (shallow) or back through the SharedDeliveryQueue drainer (deep), matching what a plain Subject<T> would do. The try/catch around the chain walk and accessor stays - those are user code whose throws route to OnError.

- TryOnError helpers removed; their swallow-secondary-throw behaviour was non-standard.

Tests:

- Split WhenPropertyChangedRaceFixture into two fixtures. RaceFixture now contains only the truly multi-threaded tests (5 tests: shallow concurrent mutation during initial emit, deep concurrent leaf mutation during initial emit, deep concurrent parent swap, deep 5-level torture, AutoRefresh integration). The single-threaded contract tests move to a new WhenPropertyChangedBehaviorFixture (7 tests: handler-attach ordering, four no-dedup scenarios split into individual [Fact]s, deep post-swap leaf capture, deep mid-chain swap re-targeting).

- The two concurrent initial-emit tests adopt Jake's symmetric Task.WhenAll(subscribe, mutate) shape: observer's OnNext signals + waits, mutator waits then mutates and releases. Removes the manual try/finally + subscribeTask.Result + WaitForCondition plumbing.

153/153 Binding tests pass; the property-changed fixtures run 10/10 stable.
…gInitialEmit_NotDropped

Replaces the existing test body with Jake's verbatim code from the PR review: named-argument style with column-aligned colons, Item class with Id and Value, observedValues / propertyValue naming, BeEquivalentTo with WithStrictOrdering and the original because string. Removes TestModel from the race fixture (no longer used).
@dwcullop dwcullop marked this pull request as ready for review June 14, 2026 16:51
@dwcullop dwcullop requested a review from JakenVeina June 14, 2026 17:00
@dwcullop dwcullop enabled auto-merge (squash) June 14, 2026 17:00
The cache-side observation is unrelated to this PR. The integration test pre-populates the cache to keep what's being verified focused on the WhenPropertyChanged path under multi-threaded property contention; that's what the comment should say.
@dwcullop dwcullop disabled auto-merge June 14, 2026 17:09
@dwcullop

Copy link
Copy Markdown
Member Author

Fun fact: When I hit in production, I originally tried to just switch to using the RxUI helper (WhenAnyValue) only to discover that it has the same race.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

WhenPropertyChanged/WhenValueChanged silently drop PropertyChanged events fired during subscribe

3 participants