Skip to content

Fix cross-cache deadlocks via queue-drain delivery pattern#1079

Open
dwcullop wants to merge 46 commits intoreactivemarbles:mainfrom
dwcullop:bugfix/delivery_queue_v2
Open

Fix cross-cache deadlocks via queue-drain delivery pattern#1079
dwcullop wants to merge 46 commits intoreactivemarbles:mainfrom
dwcullop:bugfix/delivery_queue_v2

Conversation

@dwcullop
Copy link
Copy Markdown
Member

@dwcullop dwcullop commented Apr 12, 2026

This PR replaces #1074 and provides a more robust fix

The Problem

Many DynamicData cache operators use Synchronize(lock) to serialize concurrent access. This acquires a lock and holds it during the entire downstream observer.OnNext() delivery. When a pipeline crosses cache boundaries (one cache's output feeds into another cache), the downstream cache's lock can be acquired while the upstream operator's lock is still held. If two threads follow reverse paths through the same pair of caches, this creates a classic ABBA deadlock.

This is not a theoretical issue — any application that composes complex DynamicData pipelines across multiple SourceCache instances with concurrent writers can deadlock.

The Solution: Queue-Drain Delivery

Replace Synchronize(lock) with a queue-drain pattern that releases the lock before delivering to downstream subscribers. This eliminates ABBA deadlocks because no operator lock is held during cross-cache delivery.

Two Delivery Queue Classes

DeliveryQueue<T> — typed, single-source queue. Implements IObserver<T>.

  • Internally stores Queue<Notification<T>> (value/error/completed)
  • On enqueue: acquires the gate, queues the notification, releases the gate, THEN delivers outside the lock
  • Single-deliverer token (_isDelivering) ensures Rx serialization contract
  • Used by MergeMany (N children of the same type), DisposeMany/OnBeingRemoved (need EnsureDeliveryComplete for disposal), and ObservableCache itself (the core fix)
  • EnsureDeliveryComplete() with _drainThreadId re-entrant safety for disposal callbacks

SharedDeliveryQueue — type-erased, multi-source queue.

  • One queue shared across multiple sources of different types (e.g., Sort subscribes to both data source and comparer changes)
  • Each source gets a typed DeliverySubQueue<T> via CreateQueue<T>(observer)
  • Single drain loop delivers from all sub-queues outside the lock
  • Used by 24+ operators that subscribe to multiple observables

Drop-In Replacement: SynchronizeSafe

Three extension method overloads in SynchronizeSafeExtensions provide surgical drop-in replacements for Synchronize(lock):

// Multi-source (most operators): replace Synchronize(locker) 
source.SynchronizeSafe(queue)     // SharedDeliveryQueue

// Single-source: replace Synchronize(locker) with implicit queue creation
source.SynchronizeSafe(locker)    // creates DeliveryQueue internally

// Single-source with disposal access
source.SynchronizeSafe(locker, out var queue)  // exposes queue for EnsureDeliveryComplete

Per-Operator Changes

Each operator change is surgical — typically replacing Synchronize(locker) with SynchronizeSafe(queue).

ObservableCache (the core fix)

On main, _locker is held during _changes.OnNext() via source.Synchronize(_locker) and lock(_locker). Replaced with DeliveryQueue<CacheUpdate> that enqueues change notifications under _locker, then drains them outside the lock. Subscribers (Connect, Watch, CountChanged) now receive notifications without _locker held. Introduces CacheUpdate record struct (changes + count + version) and CacheUpdateObserver for the drain callback.

CacheParentSubscription → SharedDeliveryQueue with OnDrainComplete

CacheParentSubscription previously held _synchronize during EmitChanges(observer)observer.OnNext(). Rewritten to use SharedDeliveryQueue(onDrainComplete) + SynchronizeSafe(_queue) — the same infrastructure as all other operators. State mutation (ParentOnNext/ChildOnNext) happens under the queue's lock, and EmitChanges runs in the OnDrainComplete callback, outside the lock. Same-thread reentrant delivery preserves the original Synchronize(lock) ordering for child items emitted synchronously during parent delivery.

Operators fixed by this change:

  • MergeManyChangeSets — child-comparer overload (MergeManyCacheChangeSets)
  • MergeManyChangeSets — source+child-comparer overload (MergeManyCacheChangeSetsSourceCompare)
  • MergeManyChangeSets — list output overload (MergeManyListChangeSets)
  • TransformOnObservable
  • GroupOnObservable
  • TransformManyAsync

Multi-source operators → SharedDeliveryQueue

These operators subscribe to multiple observables (data + comparer, data + page requests, left + right sources, etc.) and need a shared lock across all inputs. Each replaces var locker = InternalEx.NewLock() + Synchronize(locker) with var queue = new SharedDeliveryQueue() + SynchronizeSafe(queue):

  • Sort operators: Sort, SortAndBind, SortAndPage, SortAndVirtualize, Page, Virtualise
  • Group operators: GroupOn, GroupOnDynamic (SpecifiedGrouper), GroupOnImmutable
  • Join operators: FullJoin, InnerJoin, LeftJoin, RightJoin (these keep the explicit locker because they also use lock(locker) in their result handlers)
  • Combine operators: DynamicCombiner (Or/And/Except dynamic overloads)
  • Merge operators: MergeChangeSets, Switch
  • Transform operators: TransformWithForcedTransform, TransformAsync, TransformMany (outer lock)
  • Other multi-source: AutoRefresh, BatchIf, QueryWhenChanged, TreeBuilder

Single-source operators → DeliveryQueue<T>

These operators have a single source but need EnsureDeliveryComplete for safe disposal, or use the queue as a direct IObserver<T>:

  • DisposeMany, AsyncDisposeMany: SynchronizeSafe(locker, out queue) + EnsureDeliveryComplete ensures all pending notifications deliver before disposal cleans up tracked items
  • OnBeingRemoved: Same pattern as DisposeMany (invokes removal action on unsubscribe)
  • MergeMany: Uses DeliveryQueue<TDestination> directly as the observer — N child sources feed into one typed queue

ObservableCacheEx

Two Adapt overloads: Synchronize(locker)SynchronizeSafe(locker).

Not Changed

Static Combiner (Combiner<T,K>) — used by the fixed-list Or/And/Except/Xor overloads. Calls the updatedCallback outside the lock. No deadlock risk.

Supporting Infrastructure

File Description
Internal/DeliveryQueue.cs Typed single-source queue implementing IObserver<T>
Internal/SharedDeliveryQueue.cs Type-erased multi-source queue
Internal/SynchronizeSafeExtensions.cs Drop-in replacement extension methods
Internal/Notification.cs Value/Error/Completed struct with Accept(IObserver<T>)
Internal/KeyedDisposable.cs Manages IDisposable by key (fixes: same-reference guard, pre-NET6 remove order, AggregateException)
Internal/KeyedDisposableExtensions.cs AddIfDisposable<TKey, TItem> extension

Tests

Infrastructure Tests

Test Class Count What It Verifies
DeliveryQueueFixture 21 Core drain loop, EnsureDeliveryComplete, reentrant _drainThreadId, terminal notification ordering, concurrent access
SharedDeliveryQueueFixture 6 Multi-source delivery, sub-queue completion isolation, type-erased draining
KeyedDisposableFixture 12 Same-reference guard, AddIfDisposable, exception aggregation, pre-NET6 remove-then-dispose
CacheParentSubscriptionFixture 10 Batching (EmitChanges fires once per batch), serialization (no interleaving), completion (parent + children), disposal, error propagation, cross-feed deadlock proof

Stress Test

Test Class Count What It Verifies
CrossCacheDeadlockStressTest 1 Exercises all 29 migrated operators in 8 bidirectional cross-cache flows with multi-threaded concurrent writers. All values randomized from seeded Randomizer(42). Custom domain types with INPC (AutoRefresh) and IDisposable (DisposeMany). Both MergeManyChangeSets overloads (child-comparer and source+child-comparer). Exact content verification with BeEquivalentTo for joins, combiners, groups, and sort. Would deadlock if any two co-located operators were reverted.

Existing Test Fixes

Test Class Change
InnerJoinFixture AsObservableCache(false)AsObservableCache() — LockFreeObservableCache races with SynchronizeSafe delivery
MergeManyChangeSetsCacheSourceCompareFixture Minor fix for test stability

dwcullop and others added 30 commits April 5, 2026 22:49
…nt cross-cache deadlock

The original code held _locker while calling _changes.OnNext(), so subscriber
callbacks that propagated to other caches created ABBA deadlocks when concurrent
writes were happening on those caches.

New design:
- Single _locker protects mutation and queue state
- Write paths: lock, mutate, enqueue changeset, release lock, then drain
- DrainOutsideLock delivers notifications with no lock held
- _isDraining flag ensures only one thread drains at a time, preserving
  Rx serialization contract
- Re-entrant writes enqueue and return; the outer drain loop delivers
  them sequentially
- Connect/Watch/CountChanged use Skip(pendingCount) to avoid duplicating
  items already in the snapshot, with no delivery under lock
- Terminal events (OnCompleted/OnError) routed through drain queue
- Preview remains synchronous under _locker (required by ReaderWriter)
- Suspension state captured at enqueue time; re-checked at delivery
- try/catch resets _isDraining on exception
- volatile _isTerminated prevents post-dispose delivery
Introduce ReadOnlyScopedAccess to DeliveryQueue<TItem> for safe, read-only access to queue state under lock. Update tests and ObservableCache<TObject, TKey> to use AcquireReadLock() for reading PendingCount, replacing direct property access and manual locking. Make PendingCount private and encapsulate lock release logic. Wrap _suspensionTracker disposal in a lock for thread safety. These changes improve thread safety and clarify access patterns for queue state.
Refactored DirectCrossWriteDoesNotDeadlock to use Connect, Filter, Transform, and PopulateInto operators for bidirectional cache updates, replacing manual subscription logic. Increased test timeout and clarified assertion message. Prevented infinite feedback with key prefix filtering.
Refactored DeliveryQueue<TItem> to eliminate pending item tracking and PendingCount, removing related read-only lock APIs. ObservableCache<TObject, TKey> now ensures new subscribers do not receive in-flight notifications by connecting under the main lock, preventing duplicate deliveries without pending count logic. NotificationItem and delivery logic were simplified to check suspension state at delivery time. Updated tests: removed PendingCount tests and added a test to verify no duplicate notifications during delivery. Improved comments and code clarity.
Add conditional logic for .NET 9.0+ in SwappableLock to handle both _gate and _lockGate fields. SwapTo now checks both fields for initialization and releases the appropriate lock type, ensuring compatibility with new locking mechanisms while preserving legacy behavior.
Previously, haveExpirationsChanged was overwritten by each call to TrySetExpiration, potentially losing information about prior changes. Now, the |= operator is used to ensure haveExpirationsChanged remains true if any expiration update occurs, preserving the correct state across multiple updates.
Moved _isDelivering reset from finally to catch block in DeliveryQueue<TItem>. Now, the flag is only reset when an exception occurs, and the exception is rethrown, making the error handling more explicit and preventing unnecessary state changes during normal execution.
The MultiThreadedStressTest asserts immediately after stress observables
complete, but with drain-outside-lock delivery, Edit() returns after
enqueueing while delivery may still be in-flight on another thread.
Add a short delay before checking results to allow in-flight deliveries
to complete.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Publish and explicitly connect merged observable in test, and await completion of all notifications for robust result verification.
- Move _suspensionTracker disposal outside lock in ObservableCache to prevent deadlocks and reentrancy issues.
- Add System.Reactive.Threading.Tasks import for ToTask() usage.
Fix race where new subscribers could see duplicate Add notifications
if they connect while in-flight changes are being delivered. Introduce
a versioning mechanism in ObservableCache to track committed and
delivered notifications, and skip already-delivered changes for new
subscribers. Extend NotificationItem with a version field and add
read-only lock support in DeliveryQueue. Update test to reliably
reproduce and verify the fix.
Add comprehensive tests for nested and concurrent suspend/resume scenarios in SuspendNotificationsFixture. Emit resume signals under lock in ObservableCache to prevent race conditions and ensure consistent notification delivery. These changes enhance reliability and determinism of notification delivery under complex and concurrent usage patterns.
- Strengthen test reliability and clarify test names/messages
- Rewrite DeliveryQueueFixture test for robust concurrency checks
- Enhance ObservableCache to avoid duplicate/applied notifications
- Refactor ResumeNotifications to prevent race conditions
- Improve comments and code clarity throughout
SharedDeliveryQueue: type-erased multi-T delivery queue for operators.
- List<IDrainable> of DeliverySubQueue<T> instances
- Per-queue IObserver<T> delivery via Notification<T> struct
- Drain loop: one item per iteration, fair across sources
- ScopedAccess-only API on DeliverySubQueue<T>
- ReadOnlyScopedAccess on SharedDeliveryQueue
- Error terminates queue AFTER delivery (matches Rx contract)
- Completion is per-sub-queue (does not terminate parent queue)
- CreateQueue locked to prevent race with drain loop

SynchronizeSafe(SharedDeliveryQueue) extension in DynamicData.Internal
namespace (NOT System.Reactive.Linq to avoid overload resolution issues).

Existing DeliveryQueue<T> unchanged — still used by ObservableCache.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Surgical 2-line change per operator:
1. var queue = new SharedDeliveryQueue(locker);
2. .Synchronize(locker) -> .SynchronizeSafe(queue)

Operators migrated (28 files):
- Joins: FullJoin, InnerJoin, LeftJoin, RightJoin
- Sort: Sort, SortAndPage, SortAndVirtualize, SortAndBind
- Paging: Page, Virtualise
- Groups: GroupOn, GroupOnImmutable, GroupOnDynamic
- Combine: DynamicCombiner, MergeChangeSets, MergeMany
- Transform: TransformWithForcedTransform, TransformAsync, TransformMany
- Lifecycle: DisposeMany, AsyncDisposeMany, OnBeingRemoved
- Other: BatchIf, Switch, AutoRefresh, TreeBuilder, QueryWhenChanged
- ObservableCacheEx: Bind (2 overloads) + ToObservableOptional

MergeChangeSets: removed #if NET9_0_OR_GREATER block (SharedDeliveryQueue
is same type on both TFMs).

Operators NOT migrated (kept on Synchronize - local gate, no cross-cache risk):
- ExpireAfter.ForSource, ExpireAfter.ForStream (timer callbacks use lock())
- CacheParentSubscription (complex EnterUpdate/ExitUpdate batching)
- EditDiffChangeSetOptional (no lock needed)
- SpecifiedGrouper (caches have internal locking)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CrossCacheDeadlockStressTest: bidirectional pipeline using Sort, Page,
AutoRefresh, Transform, Filter, SubscribeMany, MergeMany, QueryWhenChanged,
SortAndBind, Virtualise across two SourceCaches. 4 writer threads per cache
plus a property updater thread. Proves no deadlock under concurrent load.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CrossCacheDeadlockStressTest exercises every migrated operator in a
bidirectional multi-threaded pipeline:

Operators tested: Sort, Page, AutoRefresh, Transform, Filter,
SubscribeMany, MergeMany, MergeChangeSets, QueryWhenChanged,
SortAndBind, Virtualise, DisposeMany, GroupOn, GroupWithImmutableState,
FullJoin, InnerJoin, LeftJoin, TransformMany, BatchIf, Switch,
Or (DynamicCombiner)

Pipeline: cacheA -> Sort -> Page -> AutoRefresh -> Transform -> Filter
          -> PopulateInto cacheB (forward)
          cacheB -> Filter -> Transform -> PopulateInto cacheA (reverse)
          + cross-cache Join, MergeChangeSets, QueryWhenChanged, etc.

Load: 4 writer threads per cache (100 items each) + property updater
thread toggling BatchIf pause and Switch sources.

Verifies: item counts, sort order, virtualisation window, join results,
union correctness, batch delivery, group presence, transform counts.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ronize calls

SpecifiedGrouper: Synchronize(locker) -> SynchronizeSafe(queue)

Remaining Synchronize calls (6 total, all proven safe):
- EditDiffChangeSetOptional: Synchronize() with no arg (Rx's own gate)
- ExpireAfter.ForSource/ForStream: local gate shared with timer lock()
  callbacks that emit directly — timer emission path would need refactoring
  to enqueue through queue. Local gate, no cross-cache deadlock risk.
- TransformMany:109: per-item inner lock (new lock per Transform item)
- CacheParentSubscription: reentrant batching requires Synchronize. Local
  gate. All downstream operators use SynchronizeSafe — deadlock chain broken.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
SwappableLock NET9 support -> bugfix/swappable-lock-net9
ExpireAfter race fix -> bugfix/expire-after-race

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ication

KitchenSink_AllOperatorsChained_NoDeadlock_CorrectResults:
- 7 pipeline chains exercising every dangerous operator
- Monster chain: AutoRefresh -> Filter -> Sort -> Page -> Transform ->
  IgnoreSameReferenceUpdate -> WhereReasonsAre -> OnItemAdded/Updated/Removed ->
  SubscribeMany -> NotEmpty -> SkipInitial
- Join chain: FullJoin -> Group -> DisposeMany -> MergeMany -> Transform
- Individual: InnerJoin, LeftJoin, RightJoin with ChangeKey
- Combined: MergeChangeSets, Or (DynamicCombiner), BatchIf, QueryWhenChanged
- Binding: SortAndBind, Virtualise, GroupWithImmutableState
- Dynamic: Switch, TransformMany
- Bidirectional: PopulateInto both directions with recursive filter guards

Load: 8 writer threads per cache, 500 items each, property mutations
(AutoRefresh), removals, sort/page/virtual parameter changes, BatchIf
toggles, Switch source swaps. Bogus Randomizer with deterministic seed.

Validates: exact counts, sort order, join semantics, union correctness,
virtualisation window, group counts, transform multiplicity.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Three instruction files for GitHub Copilot and AI assistants:

1. .github/copilot-instructions.md — General overview
   - What DynamicData is and why it matters
   - Why performance and Rx compliance are critical
   - Repository structure
   - Operator architecture pattern (extension method -> internal class -> Run())
   - SharedDeliveryQueue pattern explanation
   - Breaking change policy

2. .github/instructions/rx-contracts.instructions.md — Rx contract rules
   - Serialized notifications (the reactivemarbles#1 rule)
   - Terminal notification semantics
   - Subscription lifecycle and disposal
   - DynamicData-specific rules (lock ordering, changeset immutability)
   - Link to ReactiveX contract reference

3. .github/instructions/dynamicdata-operators.instructions.md — Operator guide
   - Complete operator catalog with descriptions and examples
   - Categories: Filtering, Transformation, Sorting, Paging, Grouping,
     Joining, Combining, Aggregation, Fan-out/Fan-in, Lifecycle, Refresh,
     Buffering, Binding, Utilities
   - How to write a new operator

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…rators

Fix: MergeMany → MergeManyChangeSets for group sub-cache fan-out
(MergeMany returns IObservable<T>, MergeManyChangeSets returns
IObservable<IChangeSet<T,K>> — the latter is what we need here).

Added operators (Pipeline 8-11):
- And, Except, Xor (remaining set operations)
- TransformOnObservable
- FilterOnObservable
- TransformWithInlineUpdate
- DistinctValues
- ToObservableChangeSet (bridges IObservable<T> into DD)
- MergeMany (kept separately from MergeManyChangeSets)
- Bind (ReadOnlyObservableCollection)
- OnItemRefreshed, ForEachChange
- DeferUntilLoaded

All with exact final state assertions.

Results:
- Feature branch: PASSES in ~5s
- main branch: DEADLOCKS at 30s timeout (proven)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Rewrite CrossCacheDeadlockStressTest for exact, verifiable results:

Writers now use explicit non-overlapping ID ranges instead of Faker:
- CacheA: IDs 1..4000 (8 threads × 500, family=(id%5))
- CacheB: IDs 10001..14000 (same pattern)
- No random removals during writes (was non-deterministic)

Post-write deterministic mutations:
- Toggle IncludeInResults=false for id%10==5 (400 items)
- Remove from A: id%20==0 (200 mammals removed)
- Remove from B: id%15==0 in range 10001..14000 (267 removed)

Bidirectional pipeline fixed:
- PopulateInto → ForEachChange+AddOrUpdate (respects target key selector)
- Forward: 600 surviving mammals → B with id+800_000
- Reverse: 600 fwd items → A with id+1_700_000
- Cycle-breaking: forward only accepts name.StartsWith('fwd-A'),
  reverse only accepts name.StartsWith('fwd-A')

All 30+ assertions are now hardcoded exact values:
- CacheA: 4400 (3800 direct + 600 reverse)
- CacheB: 4333 (3733 direct + 600 forward)
- FullJoin: produces results (disjoint keys)
- InnerJoin: 0 (disjoint key ranges)
- LeftJoin: 4400, RightJoin: 4333
- MergeChangeSets/Or/Xor: 8733 (A+B, disjoint)
- And: 0, Except: 4400
- FilterOnObservable(Mammal): 1200 (600 direct + 600 reverse)
- TransformMany: 8800 (2× cacheA)
- Virtualise: 50 (window size)
- DistinctValues/Groups: 5 (all AnimalFamily values)
- SortAndBind: 4400, sorted ascending by Id
- Forward items in B: 600, Reverse items in A: 600

Deadlocks on main (30s timeout), passes in ~7s on this branch.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…iveryQueue

DisposeMany, AsyncDisposeMany, and OnBeingRemoved are single-source
operators — they only serialize one IObservable<IChangeSet<T,K>>.
Using the type-erased SharedDeliveryQueue (with List<IDrainable>) was
unnecessary overhead for these cases.

Now they use DeliveryQueue<Notification<T>> directly, which:
- Eliminates type-erasure overhead (no IDrainable interface, no List)
- Delivery callback is set at construction, not via sub-queue creation
- Same AcquireReadLock() for disposal synchronization

Added SynchronizeSafe<T>(DeliveryQueue<Notification<T>>) overload that
returns IDisposable (not IObservable<T>) — delivery happens through
the queue's callback, not Rx composition.

All 37 related tests pass (DisposeMany, AsyncDisposeMany, stress test).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Major infrastructure unification:

DeliveryQueue<T> now:
- Internally stores Queue<Notification<T>> (was Queue<TItem>)
- Delivers to IObserver<T> (was Func<TItem, bool> callback)
- ScopedAccess exposes Enqueue/EnqueueError/EnqueueCompleted
  (aligned with SharedDeliveryQueue's DeliverySubQueue API)
- Terminal handling via Notification<T>.IsTerminal (was bool return)

ObservableCache:
- Deleted NotificationKind enum and NotificationItem record struct
- Added CacheUpdate record struct (Changes?, Count, Version)
- Added CacheUpdateObserver : IObserver<CacheUpdate> that dispatches
  to _changes, _changesPreview, _countChanged subjects
- Terminal notifications go through queue's EnqueueCompleted/EnqueueError
  (no longer encoded in the payload type)

Operators (DisposeMany, AsyncDisposeMany, OnBeingRemoved):
- DeliveryQueue<IChangeSet<T,K>> directly (was DeliveryQueue<Notification<...>>)
- No namespace-qualified Notification references needed

SynchronizeSafeExtensions:
- DeliveryQueue<T> overload (was DeliveryQueue<Notification<T>>)
- Sets observer via SetObserver, uses Enqueue/EnqueueError/EnqueueCompleted

DeliveryQueueFixture:
- Rewritten for IObserver<T> pattern
- Added ListObserver, ConcurrentObserver, DelegateObserver helpers
- Terminal tests use EnqueueCompleted/EnqueueError

2233 tests passed, 0 failed (1 pre-existing flaky test).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
dwcullop and others added 10 commits April 10, 2026 14:58
…che()

With SynchronizeSafe, the gate lock is released BEFORE downstream
delivery. LockFreeObservableCache (used by AsObservableCache(false))
has no internal locking — it relied on the caller's Synchronize gate
being held during the entire delivery chain. With SynchronizeSafe,
a Connect() subscriber starting on another thread can overlap with
delivery, causing 'Collection was modified' during enumeration of
the internal ChangeAwareCache dictionary.

Fix: Use locked AsObservableCache() (defaults to true) in all four
join operators (InnerJoin, FullJoin, LeftJoin, RightJoin). This adds
proper internal synchronization to the intermediate caches.

Verified: 0/20 failures on InnerJoinFixtureRaceCondition (was ~10%).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
MergeMany has N child observables but they're all the same type
TDestination. No need for type-erased SharedDeliveryQueue. Children
enqueue directly via AcquireLock/Enqueue on the shared typed queue
instead of going through SynchronizeSafe per-child.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
DeliveryQueue<T> now has public OnNext/OnError/OnCompleted methods
(implementing IObserver<T>) that acquire the lock, enqueue, and drain.
This enables natural Rx patterns:

  child.Subscribe(queue)           // queue as observer
  child.Subscribe(queue.OnNext, …) // selective forwarding

MergeMany now reads naturally:
  _observableSelector(t, key)
      .Finally(() => counter.Finally())
      .Subscribe(queue.OnNext, static _ => { });

SynchronizeSafe(DeliveryQueue<T>) simplified to:
  queue.SetObserver(observer);
  return source.Subscribe(queue);

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Add ForceTerminate() to DeliveryQueue/SharedDeliveryQueue for safe, immediate queue termination
- Track delivery thread to avoid deadlocks during termination
- Update SynchronizeSafe to create queues internally and expose queue via out parameter
- Replace manual disposal in DisposeMany with KeyedDisposable and AddIfDisposable extension
- Simplify OnBeingRemoved and ObservableCacheEx to use new queue pattern
- Remove manual lock/disposal logic in favor of ForceTerminate
- Improve comments, documentation, and add KeyedDisposableExtensions
- Overall, improve safety, efficiency, and usability in multi-threaded scenarios
…liveryComplete

Infrastructure:
- DeliveryQueue<T>: IObserver<T>, Notification<T> internal, EnsureDeliveryComplete
  with _drainThreadId for re-entrant safety
- SharedDeliveryQueue: EnsureDeliveryComplete with same pattern
- Both: _isDelivering volatile (spin-wait visibility)
- Both: _drainThreadId initialized to -1 (defensive)
- SetObserver: double-call guard

SynchronizeSafe overloads:
- SynchronizeSafe(locker) — drop-in for Synchronize(locker), implicit DeliveryQueue
- SynchronizeSafe(locker, out queue) — exposes queue for EnsureDeliveryComplete
- SynchronizeSafe(SharedDeliveryQueue) — multi-source

ObservableCache:
- CacheUpdate struct + CacheUpdateObserver : IObserver<CacheUpdate>
- Deleted NotificationItem/NotificationKind

Operators:
- DisposeMany: KeyedDisposable + EnsureDeliveryComplete
- AsyncDisposeMany: same pattern
- OnBeingRemoved: surgical SynchronizeSafe(locker, out queue)
- MergeMany: DeliveryQueue<T> with queue.OnNext
- Join operators: AsObservableCache(false) → AsObservableCache()
- ObservableCacheEx: 2 single-source Adapt operators simplified

KeyedDisposable:
- Same-reference guard in Add
- AddIfDisposable extension method

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Fixes for issues introduced by our changes:

1. [High] DeliveryQueue: set _isTerminated BEFORE Accept for terminal
   notifications, not after. Prevents race where concurrent code
   (e.g., InvokePreview) sees IsTerminated==false while terminal
   delivery is in-flight. (Found by: GPT-5.2 rx-expert-cache)

2. [Medium] KeyedDisposable pre-NET6 Remove: swap to remove-then-dispose
   matching NET6+ branch. Prevents double-dispose on re-entrant Dispose
   callbacks. (Found by: Claude Opus 4.6 bughunt-infra)

3. [Medium] KeyedDisposable.Dispose: per-item try/catch with
   AggregateException. Prevents leaking remaining disposables if one
   throws. (Found by: Claude Opus 4.6 bughunt-infra)

4. [Medium] Remove duplicate 'using DynamicData.Internal' in
   MergeChangeSets.cs and GroupOnDynamic.cs.
   (Found by: Claude Opus 4.5 concurrency-expert)

Pre-existing issues noted but NOT fixed (existed before our changes):
- Watch() error handling (GPT-5.2)
- DynamicCombiner/TreeBuilder unsynchronized handlers (Opus 4.5)
- Suspended Connect/Watch double OnCompleted (GPT-5.2)
- Sub-queue per-instance terminal tracking (GPT-5.4)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
New test fixtures:
- KeyedDisposableFixture (12 tests): Add, Remove, same-reference guard,
  AddIfDisposable for disposable/non-disposable items, Dispose aggregates
  exceptions, idempotent Dispose, Add-after-Dispose immediate disposal

- SharedDeliveryQueueFixture (6 tests): single source delivery, multi-source
  serialization, error terminates all sub-queues, completion does NOT
  terminate parent, EnsureDeliveryComplete, concurrent multi-source

- DeliveryQueueFixture additions (7 tests): EnsureDeliveryComplete terminates,
  clears pending items, re-entrant from drain thread (no deadlock),
  spin-waits for in-flight delivery, terminal items delivered before
  termination, error terminates and clears pending

Total: 39 new infrastructure tests, all passing.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Refactored all internal cache operators and CacheParentSubscription to use a lock-free SharedDeliveryQueue for delivery serialization, removing explicit lock usage. SharedDeliveryQueue now supports an onDrainComplete callback and same-thread reentrant delivery, preventing deadlocks and preserving correct parent/child emission order. Updated CacheParentSubscription to batch and emit changes outside the lock, and removed legacy batching logic. All operators now instantiate SharedDeliveryQueue without a lock, simplifying code and improving robustness. Added comprehensive tests for parent/child emission, batching, completion, disposal, error propagation, and deadlock scenarios. Updated copyright headers and project file.
Introduced a private readonly _observer field of type IObserver<TObserver> to the CacheParentSubscription class for handling observer logic. No other changes were made.
Replace Synchronize(lock) with SynchronizeSafe(SharedDeliveryQueue) across
all cache operators. SharedDeliveryQueue releases the lock before downstream
delivery, preventing ABBA deadlocks between caches.

Key changes:
- SharedDeliveryQueue: parameterless ctor, OnDrainComplete callback,
  same-thread reentrant delivery, DrainPending with EnterLock/ExitLock
- CacheParentSubscription: rewritten to use SharedDeliveryQueue
- DeliveryQueue<T>: typed single-source queue for ObservableCache
- 27 operator files: Synchronize(locker) -> SynchronizeSafe(queue)
- Global using DynamicData.Internal in csproj

Tests:
- DeliveryQueueFixture (21), SharedDeliveryQueueFixture (6)
- KeyedDisposableFixture (12), CacheParentSubscriptionFixture (10)
- CrossCacheDeadlockStressTest: 29 operators, all randomized
- SourceCacheFixture: DirectCrossWriteDoesNotDeadlock

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses cross-cache ABBA deadlocks in DynamicData by replacing Synchronize(lock)-based delivery (which holds operator locks during downstream OnNext) with a queue-drain delivery pattern that releases locks before notifying subscribers.

Changes:

  • Introduces DeliveryQueue<T>, SharedDeliveryQueue, Notification<T>, and SynchronizeSafe extensions to serialize emissions while delivering outside locks.
  • Migrates a large set of cache/list operators (including ObservableCache) from Synchronize(lock) to SynchronizeSafe(...) patterns.
  • Adds extensive unit/stress test coverage to validate serialization, ordering, disposal safety, and deadlock prevention.

Reviewed changes

Copilot reviewed 46 out of 46 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/DynamicData/Internal/SynchronizeSafeExtensions.cs Adds SynchronizeSafe overloads as drop-in replacements for safer delivery.
src/DynamicData/Internal/SharedDeliveryQueue.cs Adds shared multi-source queue-drain delivery mechanism for cross-source serialization.
src/DynamicData/Internal/Notification.cs Adds allocation-free notification representation for queue delivery.
src/DynamicData/Internal/KeyedDisposableExtensions.cs Adds AddIfDisposable helper for keyed disposal tracking.
src/DynamicData/Internal/KeyedDisposable.cs Improves keyed disposal semantics (same-reference guard, removal order, exception aggregation).
src/DynamicData/Internal/DeliveryQueue.cs Adds typed single-source queue-drain delivery mechanism (also used as an IObserver<T>).
src/DynamicData/Internal/CacheParentSubscription.cs Reworks parent/child subscription serialization via SharedDeliveryQueue + OnDrainComplete.
src/DynamicData/DynamicData.csproj Adds global using for DynamicData.Internal within the DynamicData assembly.
src/DynamicData/Cache/ObservableCacheEx.cs Switches select usages from Synchronize to SynchronizeSafe; updates optional merge gating.
src/DynamicData/Cache/ObservableCache.cs Core fix: queues cache updates under lock and drains notifications outside lock; adds versioning to avoid duplicates.
src/DynamicData/Cache/Internal/Virtualise.cs Migrates multi-source serialization to SharedDeliveryQueue + SynchronizeSafe.
src/DynamicData/Cache/Internal/TreeBuilder.cs Migrates multi-observable gating to SharedDeliveryQueue + SynchronizeSafe.
src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs Uses SharedDeliveryQueue for source + force-transform coordination without lock-held delivery.
src/DynamicData/Cache/Internal/TransformMany.cs Replaces outer lock synchronization with a shared queue across merged sources.
src/DynamicData/Cache/Internal/TransformAsync.cs Replaces locker-based synchronization with SharedDeliveryQueue.
src/DynamicData/Cache/Internal/Switch.cs Replaces locker-based synchronization with SharedDeliveryQueue.
src/DynamicData/Cache/Internal/SpecifiedGrouper.cs Migrates group pipelines to SharedDeliveryQueue serialization.
src/DynamicData/Cache/Internal/SortAndVirtualize.cs Migrates comparer/params/data coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/SortAndPage.cs Migrates comparer/params/data coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/Sort.cs Migrates multi-input sort coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/RightJoin.cs Uses SharedDeliveryQueue(locker) to avoid lock-held downstream delivery; adjusts observable cache creation.
src/DynamicData/Cache/Internal/QueryWhenChanged.cs Migrates merge/snapshot serialization to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/Page.cs Migrates request+data coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/OnBeingRemoved.cs Uses SynchronizeSafe(locker, out queue) + EnsureDeliveryComplete to avoid disposal races.
src/DynamicData/Cache/Internal/MergeMany.cs Uses DeliveryQueue<T> as the serialized observer for many child sources.
src/DynamicData/Cache/Internal/MergeChangeSets.cs Migrates merge container + child source synchronization to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/LeftJoin.cs Uses SharedDeliveryQueue(locker) to avoid lock-held downstream delivery; adjusts observable cache creation.
src/DynamicData/Cache/Internal/InnerJoin.cs Uses SharedDeliveryQueue(locker) to avoid lock-held downstream delivery; adjusts observable cache creation.
src/DynamicData/Cache/Internal/GroupOnImmutable.cs Migrates multi-input group coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/GroupOnDynamic.cs Migrates 3-input dynamic grouping coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/GroupOn.cs Migrates group/regroup coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/FullJoin.cs Uses SharedDeliveryQueue(locker) to avoid lock-held downstream delivery; adjusts observable cache creation.
src/DynamicData/Cache/Internal/DynamicCombiner.cs Migrates multi-stream merge + state updates to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/DisposeMany.cs Replaces dictionary locking/finalization with SynchronizeSafe(..., out queue) + KeyedDisposable.
src/DynamicData/Cache/Internal/BatchIf.cs Migrates pause/timer/source coordination to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/AutoRefresh.cs Migrates refresh+source merge serialization to SharedDeliveryQueue.
src/DynamicData/Cache/Internal/AsyncDisposeMany.cs Uses SynchronizeSafe(..., out queue) + EnsureDeliveryComplete to avoid lock-held disposal ordering issues.
src/DynamicData/Binding/SortAndBind.cs Migrates comparer+source coordination to SharedDeliveryQueue.
src/DynamicData.Tests/Internal/SharedDeliveryQueueFixture.cs Adds unit tests for shared queue delivery/termination/serialization behavior.
src/DynamicData.Tests/Internal/KeyedDisposableFixture.cs Adds unit tests for keyed disposal semantics and exception aggregation.
src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs Adds unit tests for typed delivery queue serialization, ordering, termination, and concurrency.
src/DynamicData.Tests/Internal/CacheParentSubscriptionFixture.cs Adds behavioral/deadlock-proof tests for parent/child subscription pattern.
src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs Adds tests for suspension/resume ordering and concurrency correctness with the new delivery behavior.
src/DynamicData.Tests/Cache/SourceCacheFixture.cs Adds cross-cache deadlock/duplication regression tests (fan-in, cross-write, connect-during-delivery).
src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs Stabilizes stress test completion by publishing and awaiting source completion.
src/DynamicData.Tests/Cache/CrossCacheDeadlockStressTest.cs Adds broad stress test exercising migrated operators in bidirectional cross-cache flows.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +71 to +77
public void EnsureDeliveryComplete()
{
lock (_gate)
{
_isTerminated = true;
_queue.Clear();

Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeliveryQueue mixes explicit EnterLock/ExitLock (Lock.Enter/Exit vs Monitor.Enter/Exit) with lock (_gate) inside EnsureDeliveryComplete/DeliverAll. Please make locking consistent (use EnterLock/ExitLock everywhere or lock everywhere) so all critical sections definitely coordinate on the same gate implementation for NET9+ and older TFMs.

Copilot uses AI. Check for mistakes.
dwcullop and others added 3 commits April 12, 2026 20:03
SharedDeliveryQueue: DrainPending now iterates sub-queues newest-first (LIFO).
Child sub-queues are always created after the parent sub-queue in
CacheParentSubscription. LIFO ensures child items are fully delivered
before parent operations that may dispose child subscriptions —
preventing silent loss of pending child notifications (including
Removes) by the stopped AutoDetachObserver.

ExpireAfter.ForSource: Guard against deferred notification delivery
causing stale _expirationDueTimesByKey state. Re-check that the item
still exists AND still has a Lifetime before expiring it. Without this,
an item updated to have no Lifetime could be incorrectly expired if the
Update notification hadn't been delivered yet.

Also: collapsed duplicate #if NET9_0_OR_GREATER in MergeChangeSets,
removed redundant using DynamicData.Internal from touched files,
added self-locking DeliveryQueue(observer) constructor, simplified
MergeMany.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… Notification safety

Multi-agent code review findings addressed:

SharedDeliveryQueue:
- Sub-queues marked removed on disposal, compacted lazily after drain
- RemoveQueue + MarkRemoved + CompactRemovedQueues prevents O(history) drain scans
- DeliverySubQueue implements IObserver<T> (cleaner SynchronizeSafe)

DeliveryQueue:
- _drainThreadId reset to -1 on all exit paths (matches SharedDeliveryQueue)

Notification<T>:
- Added where T : notnull constraint (prevents null-as-terminal misclassification)
- OnError guards against null exception argument
- Completed is static readonly field (avoids per-access allocation)

KeyedDisposable:
- Add is exception-safe: new item tracked before old disposed
- Removed KeyedDisposableExtensions (Add handles IDisposable check internally)

SynchronizeSafeExtensions:
- SynchronizeSafe(SharedDeliveryQueue) calls RemoveQueue on disposal
- Uses source.SubscribeSafe(subQueue) via IObserver<T>

ObservableCacheEx:
- ToObservableOptional uses Defer/Do/Merge/Defer composition (no SDQ)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Rename Notification<T> static methods: Next → CreateNext, OnError → CreateError, Completed → CreateCompleted (now a method).
- Make Value and Error properties with getters; add IsError property.
- Update all usages to new Create* methods and Enqueue* methods in DeliveryQueue<T> and DeliverySubQueue<T>.
- Use IsError in DeliverySubQueue<T>.StageNext for clarity.
- Simplify disposal in SynchronizeSafeExtensions with CompositeDisposable.
- Update copyright header in ObservableCache.cs.
- Improves code clarity, consistency, and reduces risk of misuse.
- Rename Enqueue to EnqueueNext for clearer intent
- Replace _isDelivering with _drainThreadId for precise reentrancy tracking
- Use AcquireReadLock() for all internal locking
- Refactor drain logic to prevent concurrent delivery
- Make IDrainable methods explicit to hide from public API
- Fix KeyedDisposable to avoid double-disposal
- Update tests to use EnqueueNext and match new logic
- Improve documentation on sub-queue draining order and semantics
Revised DrainPending XML doc to clarify LIFO iteration rationale and its effect on sub-queue disposal. Added where T : notnull constraint to DeliverySubQueue<T> to enforce non-nullable types.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants