WhenPropertyChanged: don't drop events fired during subscribe#1111
WhenPropertyChanged: don't drop events fired during subscribe#1111dwcullop wants to merge 17 commits into
Conversation
ObservablePropertyFactory used initial.Concat(events) for both the shallow and deep-chain forms. Concat subscribes to the second source (the PropertyChanged event handler) only AFTER the first (the initial value) completes. Any PropertyChanged notification that fired during that gap was silently dropped. The deep-chain form had an additional gap: Take(1).Repeat tore down all chain notifiers and then re-subscribed via GetNotifiers, losing any events that fired during the re-walk. Fix: 1. Shallow form: rewrite with Observable.Create. Attach the PropertyChanged event handler FIRST so no events are missed during the subscribe window. Use Interlocked.CompareExchange on initialClaimed to ensure exactly one first emission (either the initial or the first handler-fired event, whichever wins the race). A one-shot Interlocked-CAS dedup guard catches the rare setter-update-then-notify duplicate that the CAS cannot otherwise distinguish. 2. Deep-chain form: per-level SerialDisposable. ResubscribeFrom(level) atomically swaps each level's subscription slot to the new value's notifier (subscribe new before disposing old via SerialDisposable.Disposable=). At all times, every live chain level has an active notifier; no re-walk gap. Initial-emit uses the same CAS+dedup pattern as the shallow form. Both fixes are lock-free: only Interlocked.CompareExchange and Volatile read/write. The one-shot dedup guard uses EqualityComparer<TProperty>.Default exactly once per subscription, at the boundary between the initial and the first handler emission, not as a continuous DistinctUntilChanged. Regression tests in WhenPropertyChangedRaceFixture force the race deterministically by parking the observer's OnNext for the initial value while a separate thread mutates the property. Verified RED on main (3 of 4 tests fail), GREEN with fix (4 of 4 pass). Stability check 20/20. Tests: Binding suite 145/145 pass. Full suite 2339/2339 pass (excluding one pre-existing flake unrelated to this branch: SuspendNotificationsFixture.ConcurrentSuspendDuringResumeDoesNotCorrupt which fails on main too).
|
This fix is a lot, but I hit this in production. It would also explain a number of bugs that I never could figure out but went away on their own. |
There was a problem hiding this comment.
Pull request overview
This PR addresses a TOCTOU subscribe-time race in WhenPropertyChanged / WhenValueChanged by ensuring PropertyChanged handlers are attached before the initial read, and by changing deep-chain (Parent.Child.Property) orchestration to avoid gaps during re-walks. It also adds targeted regression tests to deterministically reproduce the race.
Changes:
- Reworks
ObservablePropertyFactoryto attach handlers first and use CAS/dedup logic to pick a single “first” emission and avoid initial/first-event duplicates. - Replaces deep-chain
Take(1).Repeat()re-walk logic with per-levelSerialDisposableslots to avoid re-walk unsubscribe gaps. - Adds
WhenPropertyChangedRaceFixtureregression tests covering shallow + deep-chain subscribe-time races.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 12 comments.
| File | Description |
|---|---|
| src/DynamicData/Binding/ObservablePropertyFactory.cs | Refactors shallow + deep property observation to avoid dropping PropertyChanged events during subscription and reduce deep-chain rewalk gaps. |
| src/DynamicData.Tests/Binding/WhenPropertyChangedRaceFixture.cs | Adds deterministic regression tests to reproduce/validate the subscribe-time race scenarios. |
The per-level SerialDisposable approach still allowed events to be dropped when two threads concurrently mutated the same intermediate property. Both fired the same notifier; both ResubscribeFrom calls raced; whichever SerialDisposable.Disposable= swap landed last won, even if that thread's pre-walk had read a stale value. The slot would end up subscribed to the LOSER of the property setter race, and subsequent events on the actual current value were lost. Add a single-drainer pattern: notifier handlers signal _minDirtyLevel (Interlocked CAS loop on the minimum dirty level) and the winner of an Interlocked CAS on _drainerActive runs the actual re-walk. Others return immediately. The drainer loops until no signals remain, then re-checks once more to catch signals that arrived during the release. Initial subscription claims the drainer for the duration of ResubscribeFrom(0) + initial Emit so concurrent fires queue and process after. All work serialized through a single thread; no concurrent re-walks possible; the FINAL slot state always reflects the LATEST chain state because the drainer's last iteration always reads the current value. Lock-free. Only Interlocked, Volatile, and SerialDisposable's atomic swap. DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped: statistical test (500 iterations) that triggered the race in ~10 percent of runs on the prior implementation; now 0 of 500. Stability check 10/10. DeepChain_FiveLevels_MidChainSwap_DeeperLevelsRetargetCorrectly: structural test for the depth-5 mid-chain re-attach case. Tests: Binding suite 147/147 pass.
The drainer pattern was overengineered. The Rx-idiomatic shape for `observe a property chain where each level can be reassigned` is a recursive composition: each level is an ObserveLevel emitting current-then-changes, and the chain is built with .Select(child => deeper).Switch(). When a parent fires, Switch atomically subscribes to the new deeper chain and disposes the old; no SerialDisposable bookkeeping, no min-dirty-level signaling, no CAS-claimed drainer. ObserveLevel attaches the PropertyChanged handler BEFORE reading the initial value (same shallow-form fix), so events fired during the per-level subscribe window are not missed. The outer subscriber still applies the CAS-based first-emission-wins and one-shot dedup at the boundary to handle the initial-emit race. Trade-off: concurrent mutations of the SAME observed property from multiple threads (which is an Rx contract violation by the caller) can leave Switch's lock-acquisition order out of sync with the user's setter-completion order. Well-behaved INPC usage serializes mutations on observed properties; the simplified design relies on that contract. Removed the DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped test and the block-observer-during-initial-emit deep-chain test (the latter deadlocked against Switch's internal lock by design). Net diff: -242 lines. ObservablePropertyFactory shrank from ~330 lines to ~175. Binding suite 145/145 pass.
Replace the recursive Switch composition with a single SharedDeliveryQueue that funnels two sub-queues: a high-index signal queue carrying level-change notifications, and a low-index emission queue for the user observer. The drainer processes signals first (LIFO), running ResubscribeFrom and Emit serialized against itself, then delivers user emissions last so they observe the latest chain layout. An InitialSetupSignal sentinel funnels the initial chain attachment through the same drainer, closing the subscribe gap without taking a separate lock. Switch is removed entirely: its internal gate held during downstream OnNext deadlocked any observer that blocked synchronously, and adding DeliveryQueue downstream of Switch could not break the cycle. Re-adds the two concurrent regression tests that previously deadlocked or relied on the drainer: - DeepChain_ConcurrentLeafMutationDuringInitialEmit_NotDropped - DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped (500 iterations)
Production: - Dedup window is now armed only when notifyInitial is true. When the caller didn't ask for an initial value, two consecutive same-valued PropertyChanged events are both legitimate and must both be delivered; the previous code silently dropped the second one. - Wrap the value accessor / chain walk in try/catch and route exceptions to userSub/queue.OnError. The earlier Rx pipeline got this from Select; the new direct invocation needs it explicitly so a throwing property getter doesn't escape the drainer / PropertyChanged invocation thread. Tests: - Add NotifyInitialFalse_DoesNotDedupSameValuedEvents (shallow + deep) covering the dedup gating fix. - Add timeouts to ManualResetEventSlim.Wait so a failed assertion can't park the observer thread indefinitely. Release observerCanContinue in finally. - Replace Thread.Sleep with bounded SpinWait.SpinUntil(condition, timeout) via a WaitForCondition helper. - Capture and dispose the IDisposable returned by Subscribe inside Task.Run so the PropertyChanged handler is detached at test end. - Remove unused subscribeCompleted local.
Two follow-ups after CI flaked on heavily-loaded shared runners: - Flatten the deep-chain disposable from nested CompositeDisposable to a single composite via collection-expression spread (avoids the redundant inner CompositeDisposable allocation around levelSlots). - Bump the default WaitForCondition timeout from 5s to 30s and route all ManualResetEventSlim / subscribeTask.Wait calls through it. Locally these waits return in <1ms; the larger budget only matters when CI is under heavy load. - Reduce DeepChain_ConcurrentParentSwap_LeafEventOnWinnerNotDropped from 500 to 50 iterations. With SharedDeliveryQueue the outcome is deterministic, so a single iteration proves correctness; 50 is defence in depth. Also drop the unnecessary intermediate WaitForCondition since Task.WaitAll already implies the drainer has fully drained both queued signals. Local: 7/7 race tests pass in ~60ms; 10/10 stability runs clean.
…cription Three improvements: - Extract the dedup state machine into a private Emitter : IObserver<T> class. Both factories now wrap their downstream queue in an Emitter; the initialClaimed / dedupArmed / seedValue trio and the PropertyValuesEqual helper live in one place instead of being copy-pasted across two constructors. - Encapsulate the deep-chain runtime in a private DeepChainSubscription : IDisposable class. Fields are default-initialized before the constructor body runs and assigned in well-defined order, which eliminates the DeliverySubQueue<int>? signalSub = null bootstrap (the field is always assigned before any code path that could read it). The InitialSetupSignal sentinel + drainer flow is unchanged. - Reduce the shallow factory to the single-property hot path with a small EmitCurrent helper for the accessor try/catch, removing the second copy of the dedup state machine. Behaviour is unchanged. 148/148 Binding tests pass; race fixture 10/10 stable.
Extract the shallow-form runtime into a SinglePropertySubscription : IDisposable class with the same shape as DeepChainSubscription: constructor takes (observer, source, [chain-or-name], notifyInitial) and assigns all fields in well-defined order; Dispose tears down handler + queue. The two factories now each become a one-liner Observable.Create that constructs the appropriate subscription. EmitCurrent and OnPropertyChanged become instance methods on SinglePropertySubscription, removing the last shared static helper and keeping all per-subscription state contained. Behaviour unchanged. 148/148 Binding tests pass; race fixture 10/10 stable.
Match DeepChainSubscription.ProcessSignal's pattern: wrap both the value read AND the emission in try/catch. The downstream observer's OnNext is invoked synchronously by the DeliveryQueue drain, so if it throws, the exception was escaping back out through OnPropertyChanged and into the property setter that fired the event. Route the throw to OnError instead.
The initial-setup case and the level-fire case only differ in two scalar derivations: (a) where to start the rewalk (0 vs level+1) and (b) whether to emit (always vs only when _notifyInitial). Compute both up front and let the rest of the method be linear. No behaviour change; 148/148 Binding tests pass.
Two new race fixture tests: 1. DeepChain_FiveLevels_AllLevelsMutatedConcurrently_FinalEmissionMatchesActual Five worker threads each mutate at one level of a depth-5 chain (root subtree swap, mid-level swaps, leaf-int mutations). Many mutations land on detached subtrees and are correctly ignored; mutations on the live chain are processed by the SharedDeliveryQueue drainer in order. After Task.WhenAll the drainer continues until empty; the final emission must equal ReadCurrent() because the last queued signal's ReadCurrent runs against the now-frozen chain state. 50 iterations, 200 mutations per thread, 0 mismatches on every run. 2. AutoRefreshThenFilter_ConcurrentPropertyMutationsOnAddedItems_AllFinalStatesObserved End-to-end: SourceCache + AutoRefresh(IsActive) + Filter(IsActive). Cache pre-populated, then four worker threads concurrently set Activated on every item to a per-item randomized final value. Multiple threads writing the same final value generate many concurrent PropertyChanged invocations per item, exercising SinglePropertySubscription's DeliveryQueue under contention. After the storm the filter contents must match the per-item finalActive map. Deliberately not testing 'mutate while adding' against AutoRefresh: ObservableCache.CreateConnectObservable has the same initial.Concat(_changes) TOCTOU subscribe-window bug as the WhenPropertyChanged shape this PR fixes, and a during-add test would detect that separate cache-side bug as noise unrelated to this PR. Local: 150/150 Binding tests pass; new tests 10/10 stable.
Last-emission-equals-current proves the drainer reached the end of the queue without corruption, but doesn't catch garbage values or Rx contract violations along the way. Add three additional invariants per iteration: 1. ValidateSynchronization() on the subscription chain. Any concurrent OnNext to the user observer (which would indicate a SharedDeliveryQueue serialization bug) throws UnsynchronizedNotificationException during the test instead of silently producing wrong data. 2. Build the set of values any thread could legitimately have written (initial leaf, the leaf-int range, and each subtree-swap range), then assert every emission is in that set. Catches torn reads or stale-detached-subtree mis-reads. 3. First emission must equal the initial value when notifyInitial=true. Catches initial-emit-dropped bugs that the final-state check could mask if the final state happens to equal the initial. What this test still does NOT verify: that every mutation which landed on the live chain produced an emission. That requires causal-history reconstruction which isn't tractable from outside the operator. Local 10/10 stable, ~325ms per run.
Replace the manual HashSet + Subscribe(changes => switch on Reason / Add / Remove) plumbing with .AsAggregator(). The aggregator provides Data (IObservableCache) for current contents and Error for terminal exception state, both thread-safe to read. Net effect: ~25 lines of manual change tracking collapse to one line plus assertions against results.Data.Keys.
Drops the one-shot equality dedup in the Emitter and removes the Emitter class entirely. SinglePropertySubscription and DeepChainSubscription now forward every emission through their DeliveryQueue / DeliverySubQueue directly. Same-valued PropertyChanged events that follow the initial emission are delivered as legitimate events; nothing in the property pipeline drops events for equality reasons. Other fixes in the same pass: - TryOnError helpers wrap both EmitCurrent and ProcessSignal so a downstream observer that throws from OnError cannot propagate the secondary exception back into the PropertyChanged setter (shallow) or the SharedDeliveryQueue drainer (deep). - DeepChainSubscription pre-allocates one notifier callback per level in the constructor; ResubscribeFrom indexes into _levelCallbacks instead of allocating a fresh closure per re-walk. - Renamed the existing notifyInitial=false dedup test to PropertyChangedEventsAreNeverDropped_RegardlessOfNotifyInitial and extended it to also cover notifyInitial=true on shallow and deep chains. - Class summary, in-test commentary, and production rationales rewritten to present-tense contracts; removed migration narrative, PR references, and past-bug descriptors per repo comment instructions. 150/150 Binding tests pass; race fixture 10/10 stable.
…d tests, let observer throws propagate Production: - EmitCurrent (SinglePropertySubscription) and ProcessSignal (DeepChainSubscription) no longer wrap the downstream OnNext in try/catch. Per the Rx contract, if the user observer throws, the exception propagates back to whoever invoked the PropertyChanged setter (shallow) or back through the SharedDeliveryQueue drainer (deep), matching what a plain Subject<T> would do. The try/catch around the chain walk and accessor stays - those are user code whose throws route to OnError. - TryOnError helpers removed; their swallow-secondary-throw behaviour was non-standard. Tests: - Split WhenPropertyChangedRaceFixture into two fixtures. RaceFixture now contains only the truly multi-threaded tests (5 tests: shallow concurrent mutation during initial emit, deep concurrent leaf mutation during initial emit, deep concurrent parent swap, deep 5-level torture, AutoRefresh integration). The single-threaded contract tests move to a new WhenPropertyChangedBehaviorFixture (7 tests: handler-attach ordering, four no-dedup scenarios split into individual [Fact]s, deep post-swap leaf capture, deep mid-chain swap re-targeting). - The two concurrent initial-emit tests adopt Jake's symmetric Task.WhenAll(subscribe, mutate) shape: observer's OnNext signals + waits, mutator waits then mutates and releases. Removes the manual try/finally + subscribeTask.Result + WaitForCondition plumbing. 153/153 Binding tests pass; the property-changed fixtures run 10/10 stable.
…gInitialEmit_NotDropped Replaces the existing test body with Jake's verbatim code from the PR review: named-argument style with column-aligned colons, Item class with Id and Value, observedValues / propertyValue naming, BeEquivalentTo with WithStrictOrdering and the original because string. Removes TestModel from the race fixture (no longer used).
The cache-side observation is unrelated to this PR. The integration test pre-populates the cache to keep what's being verified focused on the WhenPropertyChanged path under multi-threaded property contention; that's what the comment should say.
|
Fun fact: When I hit in production, I originally tried to just switch to using the RxUI helper ( |
Fixes #1110.
WhenPropertyChangedandWhenValueChangedcould silently drop aPropertyChangedevent that fired during the call toSubscribe. The shapeinitial.Concat(propertyChanged)only attaches the event handler after the initial value has been delivered, leaving a gap. For deep chains, every level had the same gap, and a mid-chain swap could race against the re-walk that re-attaches deeper levels.What changed
The factory's two construction paths each get a dedicated private nested class inside
ObservablePropertyFactory<TObject, TProperty>:SinglePropertySubscription : IDisposable(depth-1, the high-frequency UI-binding path):PropertyChangedhandler as the first action of its constructor, before any optional initial emit. No operator-stack delay, noinitial.Concatgap.DeliveryQueue<T>so concurrentPropertyChangedinvocations on the same item are serialised end-to-end.EmitCurrentwraps both the accessor call AND the downstreamOnNextin try/catch and routes failures throughTryOnError. If the downstream observer'sOnErroritself throws, the inner catch swallows it so the secondary exception cannot escape back into thePropertyChangedsetter that triggered the emission.DeepChainSubscription : IDisposable(depth-2+):SerialDisposableslots hold the level notifier subscriptions.ResubscribeFrom(startLevel)walks the chain and re-attaches deeper levels in place.SharedDeliveryQueuefunnels two sub-queues into a single drainer: a higher-index signal queue carrying chain-level signals (drained first, LIFO), and a lower-index user queue feeding the downstream observer. Single-drainer serialisation gives the Rx contract on the user observer and deadlock-immunity if that observer blocks (concurrent producers enqueue and return).InitialSetupSignal = -1sentinel runs the initial chain attachment from inside the drainer, so a notifier-fire arriving while the chain is being walked enqueues onto the same drainer instead of racing against it.ResubscribeFromdoes not allocate a fresh closure per level per fire.ProcessSignalis wrapped in try/catch and routes invoker/factory/accessor failures throughTryOnErrorwith the same swallow-secondary-throw discipline.No equality dedup anywhere. Every
PropertyChangedevent delivers, including same-valued ones and including race-window ones. Same per-event emissions as main, minus the dropped-event bug: the brief window where main'sinitial.Concatwas unsubscribed from the event source no longer drops the events that fire in it.Why not
Switch/DistinctUntilChangedAn earlier iteration used recursive
Observable.Create + Select(...).Switch()composition. Structurally cleaner but it deadlocks: Rx'sSwitchholds an internal gate during downstreamOnNextpropagation, so an observer that blocks synchronously (UI bindings often do) wedges every concurrent upstream emission. PuttingDeliveryQueuedownstream ofSwitchdoes not help because Switch's gate is upstream.A continuous
DistinctUntilChangedwould change observable behavior in a user-visible way: duplicate values would be silently dropped forever, not just at the subscribe seam.Tests
New regression fixture
WhenPropertyChangedRaceFixturecovers:notifyOnInitialValue: falsestill subscribes the handler before returning.PropertyChangedevent reaches the observer regardless ofnotifyOnInitialValue, including same-valued setters after the initial (shallow + deep).ValidateSynchronization(no concurrentOnNext), every emission is in the legal value set, first emission is the initial value, and last emission equalsReadCurrent()after the storm settles.SourceCache + AutoRefresh(IsActive) + Filter(IsActive)with four mutator threads concurrently writing the same per-item final value to every pre-populated item. UsesAsAggregator; filter contents must match the per-itemfinalActivemap.All 150
Bindingtests pass; the new torture tests run 10/10 stable.