Commit e6055c5
Fix cross-cache deadlocks via queue-drain delivery pattern (reactivemarbles#1079)
* fix: Replace lock-during-notification with queue-based drain to prevent cross-cache deadlock
The original code held _locker while calling _changes.OnNext(), so subscriber
callbacks that propagated to other caches created ABBA deadlocks when concurrent
writes were happening on those caches.
New design:
- Single _locker protects mutation and queue state
- Write paths: lock, mutate, enqueue changeset, release lock, then drain
- DrainOutsideLock delivers notifications with no lock held
- _isDraining flag ensures only one thread drains at a time, preserving
Rx serialization contract
- Re-entrant writes enqueue and return; the outer drain loop delivers
them sequentially
- Connect/Watch/CountChanged use Skip(pendingCount) to avoid duplicating
items already in the snapshot, with no delivery under lock
- Terminal events (OnCompleted/OnError) routed through drain queue
- Preview remains synchronous under _locker (required by ReaderWriter)
- Suspension state captured at enqueue time; re-checked at delivery
- try/catch resets _isDraining on exception
- volatile _isTerminated prevents post-dispose delivery
* Refactor to use one lock and a serialized delivery queue to ensure Rx contracts and thread-safety.
* Add read-only lock for DeliveryQueue and improve safety
Introduce ReadOnlyScopedAccess to DeliveryQueue<TItem> for safe, read-only access to queue state under lock. Update tests and ObservableCache<TObject, TKey> to use AcquireReadLock() for reading PendingCount, replacing direct property access and manual locking. Make PendingCount private and encapsulate lock release logic. Wrap _suspensionTracker disposal in a lock for thread safety. These changes improve thread safety and clarify access patterns for queue state.
* Refactor cross-cache deadlock test to use operators
Refactored DirectCrossWriteDoesNotDeadlock to use Connect, Filter, Transform, and PopulateInto operators for bidirectional cache updates, replacing manual subscription logic. Increased test timeout and clarified assertion message. Prevented infinite feedback with key prefix filtering.
* Simplify delivery queue; remove pending count logic
Refactored DeliveryQueue<TItem> to eliminate pending item tracking and PendingCount, removing related read-only lock APIs. ObservableCache<TObject, TKey> now ensures new subscribers do not receive in-flight notifications by connecting under the main lock, preventing duplicate deliveries without pending count logic. NotificationItem and delivery logic were simplified to check suspension state at delivery time. Updated tests: removed PendingCount tests and added a test to verify no duplicate notifications during delivery. Improved comments and code clarity.
* Support .NET 9+ locking in SwappableLock's SwapTo method
Add conditional logic for .NET 9.0+ in SwappableLock to handle both _gate and _lockGate fields. SwapTo now checks both fields for initialization and releases the appropriate lock type, ensuring compatibility with new locking mechanisms while preserving legacy behavior.
* Use |= to accumulate expiration changes correctly
Previously, haveExpirationsChanged was overwritten by each call to TrySetExpiration, potentially losing information about prior changes. Now, the |= operator is used to ensure haveExpirationsChanged remains true if any expiration update occurs, preserving the correct state across multiple updates.
* Refactor DeliveryQueue exception handling logic
Moved _isDelivering reset from finally to catch block in DeliveryQueue<TItem>. Now, the flag is only reset when an exception occurs, and the exception is rethrown, making the error handling more explicit and preventing unnecessary state changes during normal execution.
* Fix MergeMany stress test timing for queue-based delivery
The MultiThreadedStressTest asserts immediately after stress observables
complete, but with drain-outside-lock delivery, Edit() returns after
enqueueing while delivery may still be in-flight on another thread.
Add a short delay before checking results to allow in-flight deliveries
to complete.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Update src/DynamicData.Tests/Cache/SourceCacheFixture.cs
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Improve test reliability and ObservableCache disposal safety
- Publish and explicitly connect merged observable in test, and await completion of all notifications for robust result verification.
- Move _suspensionTracker disposal outside lock in ObservableCache to prevent deadlocks and reentrancy issues.
- Add System.Reactive.Threading.Tasks import for ToTask() usage.
* Prevent duplicate notifications on Connect during delivery
Fix race where new subscribers could see duplicate Add notifications
if they connect while in-flight changes are being delivered. Introduce
a versioning mechanism in ObservableCache to track committed and
delivered notifications, and skip already-delivered changes for new
subscribers. Extend NotificationItem with a version field and add
read-only lock support in DeliveryQueue. Update test to reliably
reproduce and verify the fix.
* Improve suspend/resume notification handling and tests
Add comprehensive tests for nested and concurrent suspend/resume scenarios in SuspendNotificationsFixture. Emit resume signals under lock in ObservableCache to prevent race conditions and ensure consistent notification delivery. These changes enhance reliability and determinism of notification delivery under complex and concurrent usage patterns.
* Improve thread safety, tests, and notification delivery
- Strengthen test reliability and clarify test names/messages
- Rewrite DeliveryQueueFixture test for robust concurrency checks
- Enhance ObservableCache to avoid duplicate/applied notifications
- Refactor ResumeNotifications to prevent race conditions
- Improve comments and code clarity throughout
* Add SharedDeliveryQueue + SynchronizeSafe infrastructure
SharedDeliveryQueue: type-erased multi-T delivery queue for operators.
- List<IDrainable> of DeliverySubQueue<T> instances
- Per-queue IObserver<T> delivery via Notification<T> struct
- Drain loop: one item per iteration, fair across sources
- ScopedAccess-only API on DeliverySubQueue<T>
- ReadOnlyScopedAccess on SharedDeliveryQueue
- Error terminates queue AFTER delivery (matches Rx contract)
- Completion is per-sub-queue (does not terminate parent queue)
- CreateQueue locked to prevent race with drain loop
SynchronizeSafe(SharedDeliveryQueue) extension in DynamicData.Internal
namespace (NOT System.Reactive.Linq to avoid overload resolution issues).
Existing DeliveryQueue<T> unchanged — still used by ObservableCache.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Migrate all operators from Synchronize to SynchronizeSafe
Surgical 2-line change per operator:
1. var queue = new SharedDeliveryQueue(locker);
2. .Synchronize(locker) -> .SynchronizeSafe(queue)
Operators migrated (28 files):
- Joins: FullJoin, InnerJoin, LeftJoin, RightJoin
- Sort: Sort, SortAndPage, SortAndVirtualize, SortAndBind
- Paging: Page, Virtualise
- Groups: GroupOn, GroupOnImmutable, GroupOnDynamic
- Combine: DynamicCombiner, MergeChangeSets, MergeMany
- Transform: TransformWithForcedTransform, TransformAsync, TransformMany
- Lifecycle: DisposeMany, AsyncDisposeMany, OnBeingRemoved
- Other: BatchIf, Switch, AutoRefresh, TreeBuilder, QueryWhenChanged
- ObservableCacheEx: Bind (2 overloads) + ToObservableOptional
MergeChangeSets: removed #if NET9_0_OR_GREATER block (SharedDeliveryQueue
is same type on both TFMs).
Operators NOT migrated (kept on Synchronize - local gate, no cross-cache risk):
- ExpireAfter.ForSource, ExpireAfter.ForStream (timer callbacks use lock())
- CacheParentSubscription (complex EnterUpdate/ExitUpdate batching)
- EditDiffChangeSetOptional (no lock needed)
- SpecifiedGrouper (caches have internal locking)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Add mega cross-cache stress test proving deadlock-free operation
CrossCacheDeadlockStressTest: bidirectional pipeline using Sort, Page,
AutoRefresh, Transform, Filter, SubscribeMany, MergeMany, QueryWhenChanged,
SortAndBind, Virtualise across two SourceCaches. 4 writer threads per cache
plus a property updater thread. Proves no deadlock under concurrent load.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Comprehensive cross-cache stress test with result verification
CrossCacheDeadlockStressTest exercises every migrated operator in a
bidirectional multi-threaded pipeline:
Operators tested: Sort, Page, AutoRefresh, Transform, Filter,
SubscribeMany, MergeMany, MergeChangeSets, QueryWhenChanged,
SortAndBind, Virtualise, DisposeMany, GroupOn, GroupWithImmutableState,
FullJoin, InnerJoin, LeftJoin, TransformMany, BatchIf, Switch,
Or (DynamicCombiner)
Pipeline: cacheA -> Sort -> Page -> AutoRefresh -> Transform -> Filter
-> PopulateInto cacheB (forward)
cacheB -> Filter -> Transform -> PopulateInto cacheA (reverse)
+ cross-cache Join, MergeChangeSets, QueryWhenChanged, etc.
Load: 4 writer threads per cache (100 items each) + property updater
thread toggling BatchIf pause and Switch sources.
Verifies: item counts, sort order, virtualisation window, join results,
union correctness, batch delivery, group presence, transform counts.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Convert SpecifiedGrouper to SynchronizeSafe, document remaining Synchronize calls
SpecifiedGrouper: Synchronize(locker) -> SynchronizeSafe(queue)
Remaining Synchronize calls (6 total, all proven safe):
- EditDiffChangeSetOptional: Synchronize() with no arg (Rx's own gate)
- ExpireAfter.ForSource/ForStream: local gate shared with timer lock()
callbacks that emit directly — timer emission path would need refactoring
to enqueue through queue. Local gate, no cross-cache deadlock risk.
- TransformMany:109: per-item inner lock (new lock per Transform item)
- CacheParentSubscription: reentrant batching requires Synchronize. Local
gate. All downstream operators use SynchronizeSafe — deadlock chain broken.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Remove SwappableLock and ExpireAfter changes (split to separate PRs)
SwappableLock NET9 support -> bugfix/swappable-lock-net9
ExpireAfter race fix -> bugfix/expire-after-race
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Enhanced kitchen-sink stress test with all operators and result verification
KitchenSink_AllOperatorsChained_NoDeadlock_CorrectResults:
- 7 pipeline chains exercising every dangerous operator
- Monster chain: AutoRefresh -> Filter -> Sort -> Page -> Transform ->
IgnoreSameReferenceUpdate -> WhereReasonsAre -> OnItemAdded/Updated/Removed ->
SubscribeMany -> NotEmpty -> SkipInitial
- Join chain: FullJoin -> Group -> DisposeMany -> MergeMany -> Transform
- Individual: InnerJoin, LeftJoin, RightJoin with ChangeKey
- Combined: MergeChangeSets, Or (DynamicCombiner), BatchIf, QueryWhenChanged
- Binding: SortAndBind, Virtualise, GroupWithImmutableState
- Dynamic: Switch, TransformMany
- Bidirectional: PopulateInto both directions with recursive filter guards
Load: 8 writer threads per cache, 500 items each, property mutations
(AutoRefresh), removals, sort/page/virtual parameter changes, BatchIf
toggles, Switch source swaps. Bogus Randomizer with deterministic seed.
Validates: exact counts, sort order, join semantics, union correctness,
virtualisation window, group counts, transform multiplicity.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Add AI instruction files for DynamicData
Three instruction files for GitHub Copilot and AI assistants:
1. .github/copilot-instructions.md — General overview
- What DynamicData is and why it matters
- Why performance and Rx compliance are critical
- Repository structure
- Operator architecture pattern (extension method -> internal class -> Run())
- SharedDeliveryQueue pattern explanation
- Breaking change policy
2. .github/instructions/rx-contracts.instructions.md — Rx contract rules
- Serialized notifications (the reactivemarbles#1 rule)
- Terminal notification semantics
- Subscription lifecycle and disposal
- DynamicData-specific rules (lock ordering, changeset immutability)
- Link to ReactiveX contract reference
3. .github/instructions/dynamicdata-operators.instructions.md — Operator guide
- Complete operator catalog with descriptions and examples
- Categories: Filtering, Transformation, Sorting, Paging, Grouping,
Joining, Combining, Aggregation, Fan-out/Fan-in, Lifecycle, Refresh,
Buffering, Binding, Utilities
- How to write a new operator
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Remove AI instruction files (moved to docs/ai_instructions branch)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Enhance kitchen sink stress test: fix MergeManyChangeSets, add 11 operators
Fix: MergeMany → MergeManyChangeSets for group sub-cache fan-out
(MergeMany returns IObservable<T>, MergeManyChangeSets returns
IObservable<IChangeSet<T,K>> — the latter is what we need here).
Added operators (Pipeline 8-11):
- And, Except, Xor (remaining set operations)
- TransformOnObservable
- FilterOnObservable
- TransformWithInlineUpdate
- DistinctValues
- ToObservableChangeSet (bridges IObservable<T> into DD)
- MergeMany (kept separately from MergeManyChangeSets)
- Bind (ReadOnlyObservableCollection)
- OnItemRefreshed, ForEachChange
- DeferUntilLoaded
All with exact final state assertions.
Results:
- Feature branch: PASSES in ~5s
- main branch: DEADLOCKS at 30s timeout (proven)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Make stress test fully deterministic with hardcoded assertions
Rewrite CrossCacheDeadlockStressTest for exact, verifiable results:
Writers now use explicit non-overlapping ID ranges instead of Faker:
- CacheA: IDs 1..4000 (8 threads × 500, family=(id%5))
- CacheB: IDs 10001..14000 (same pattern)
- No random removals during writes (was non-deterministic)
Post-write deterministic mutations:
- Toggle IncludeInResults=false for id%10==5 (400 items)
- Remove from A: id%20==0 (200 mammals removed)
- Remove from B: id%15==0 in range 10001..14000 (267 removed)
Bidirectional pipeline fixed:
- PopulateInto → ForEachChange+AddOrUpdate (respects target key selector)
- Forward: 600 surviving mammals → B with id+800_000
- Reverse: 600 fwd items → A with id+1_700_000
- Cycle-breaking: forward only accepts name.StartsWith('fwd-A'),
reverse only accepts name.StartsWith('fwd-A')
All 30+ assertions are now hardcoded exact values:
- CacheA: 4400 (3800 direct + 600 reverse)
- CacheB: 4333 (3733 direct + 600 forward)
- FullJoin: produces results (disjoint keys)
- InnerJoin: 0 (disjoint key ranges)
- LeftJoin: 4400, RightJoin: 4333
- MergeChangeSets/Or/Xor: 8733 (A+B, disjoint)
- And: 0, Except: 4400
- FilterOnObservable(Mammal): 1200 (600 direct + 600 reverse)
- TransformMany: 8800 (2× cacheA)
- Virtualise: 50 (window size)
- DistinctValues/Groups: 5 (all AnimalFamily values)
- SortAndBind: 4400, sorted ascending by Id
- Forward items in B: 600, Reverse items in A: 600
Deadlocks on main (30s timeout), passes in ~7s on this branch.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Use DeliveryQueue<T> for single-source operators instead of SharedDeliveryQueue
DisposeMany, AsyncDisposeMany, and OnBeingRemoved are single-source
operators — they only serialize one IObservable<IChangeSet<T,K>>.
Using the type-erased SharedDeliveryQueue (with List<IDrainable>) was
unnecessary overhead for these cases.
Now they use DeliveryQueue<Notification<T>> directly, which:
- Eliminates type-erasure overhead (no IDrainable interface, no List)
- Delivery callback is set at construction, not via sub-queue creation
- Same AcquireReadLock() for disposal synchronization
Added SynchronizeSafe<T>(DeliveryQueue<Notification<T>>) overload that
returns IDisposable (not IObservable<T>) — delivery happens through
the queue's callback, not Rx composition.
All 37 related tests pass (DisposeMany, AsyncDisposeMany, stress test).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Unify DeliveryQueue<T>: Notification<T> internal, IObserver<T> delivery
Major infrastructure unification:
DeliveryQueue<T> now:
- Internally stores Queue<Notification<T>> (was Queue<TItem>)
- Delivers to IObserver<T> (was Func<TItem, bool> callback)
- ScopedAccess exposes Enqueue/EnqueueError/EnqueueCompleted
(aligned with SharedDeliveryQueue's DeliverySubQueue API)
- Terminal handling via Notification<T>.IsTerminal (was bool return)
ObservableCache:
- Deleted NotificationKind enum and NotificationItem record struct
- Added CacheUpdate record struct (Changes?, Count, Version)
- Added CacheUpdateObserver : IObserver<CacheUpdate> that dispatches
to _changes, _changesPreview, _countChanged subjects
- Terminal notifications go through queue's EnqueueCompleted/EnqueueError
(no longer encoded in the payload type)
Operators (DisposeMany, AsyncDisposeMany, OnBeingRemoved):
- DeliveryQueue<IChangeSet<T,K>> directly (was DeliveryQueue<Notification<...>>)
- No namespace-qualified Notification references needed
SynchronizeSafeExtensions:
- DeliveryQueue<T> overload (was DeliveryQueue<Notification<T>>)
- Sets observer via SetObserver, uses Enqueue/EnqueueError/EnqueueCompleted
DeliveryQueueFixture:
- Rewritten for IObserver<T> pattern
- Added ListObserver, ConcurrentObserver, DelegateObserver helpers
- Terminal tests use EnqueueCompleted/EnqueueError
2233 tests passed, 0 failed (1 pre-existing flaky test).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Fix race in join operators: AsObservableCache(false) → AsObservableCache()
With SynchronizeSafe, the gate lock is released BEFORE downstream
delivery. LockFreeObservableCache (used by AsObservableCache(false))
has no internal locking — it relied on the caller's Synchronize gate
being held during the entire delivery chain. With SynchronizeSafe,
a Connect() subscriber starting on another thread can overlap with
delivery, causing 'Collection was modified' during enumeration of
the internal ChangeAwareCache dictionary.
Fix: Use locked AsObservableCache() (defaults to true) in all four
join operators (InnerJoin, FullJoin, LeftJoin, RightJoin). This adds
proper internal synchronization to the intermediate caches.
Verified: 0/20 failures on InnerJoinFixtureRaceCondition (was ~10%).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* MergeMany: SharedDeliveryQueue → DeliveryQueue<TDestination>
MergeMany has N child observables but they're all the same type
TDestination. No need for type-erased SharedDeliveryQueue. Children
enqueue directly via AcquireLock/Enqueue on the shared typed queue
instead of going through SynchronizeSafe per-child.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* DeliveryQueue<T> implements IObserver<T> for clean Rx composition
DeliveryQueue<T> now has public OnNext/OnError/OnCompleted methods
(implementing IObserver<T>) that acquire the lock, enqueue, and drain.
This enables natural Rx patterns:
child.Subscribe(queue) // queue as observer
child.Subscribe(queue.OnNext, …) // selective forwarding
MergeMany now reads naturally:
_observableSelector(t, key)
.Finally(() => counter.Finally())
.Subscribe(queue.OnNext, static _ => { });
SynchronizeSafe(DeliveryQueue<T>) simplified to:
queue.SetObserver(observer);
return source.Subscribe(queue);
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Refactor delivery queues and disposal for thread safety
- Add ForceTerminate() to DeliveryQueue/SharedDeliveryQueue for safe, immediate queue termination
- Track delivery thread to avoid deadlocks during termination
- Update SynchronizeSafe to create queues internally and expose queue via out parameter
- Replace manual disposal in DisposeMany with KeyedDisposable and AddIfDisposable extension
- Simplify OnBeingRemoved and ObservableCacheEx to use new queue pattern
- Remove manual lock/disposal logic in favor of ForceTerminate
- Improve comments, documentation, and add KeyedDisposableExtensions
- Overall, improve safety, efficiency, and usability in multi-threaded scenarios
* Unified DeliveryQueue, IObserver, SynchronizeSafe overloads, EnsureDeliveryComplete
Infrastructure:
- DeliveryQueue<T>: IObserver<T>, Notification<T> internal, EnsureDeliveryComplete
with _drainThreadId for re-entrant safety
- SharedDeliveryQueue: EnsureDeliveryComplete with same pattern
- Both: _isDelivering volatile (spin-wait visibility)
- Both: _drainThreadId initialized to -1 (defensive)
- SetObserver: double-call guard
SynchronizeSafe overloads:
- SynchronizeSafe(locker) — drop-in for Synchronize(locker), implicit DeliveryQueue
- SynchronizeSafe(locker, out queue) — exposes queue for EnsureDeliveryComplete
- SynchronizeSafe(SharedDeliveryQueue) — multi-source
ObservableCache:
- CacheUpdate struct + CacheUpdateObserver : IObserver<CacheUpdate>
- Deleted NotificationItem/NotificationKind
Operators:
- DisposeMany: KeyedDisposable + EnsureDeliveryComplete
- AsyncDisposeMany: same pattern
- OnBeingRemoved: surgical SynchronizeSafe(locker, out queue)
- MergeMany: DeliveryQueue<T> with queue.OnNext
- Join operators: AsObservableCache(false) → AsObservableCache()
- ObservableCacheEx: 2 single-source Adapt operators simplified
KeyedDisposable:
- Same-reference guard in Add
- AddIfDisposable extension method
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Fix multi-agent review findings
Fixes for issues introduced by our changes:
1. [High] DeliveryQueue: set _isTerminated BEFORE Accept for terminal
notifications, not after. Prevents race where concurrent code
(e.g., InvokePreview) sees IsTerminated==false while terminal
delivery is in-flight. (Found by: GPT-5.2 rx-expert-cache)
2. [Medium] KeyedDisposable pre-NET6 Remove: swap to remove-then-dispose
matching NET6+ branch. Prevents double-dispose on re-entrant Dispose
callbacks. (Found by: Claude Opus 4.6 bughunt-infra)
3. [Medium] KeyedDisposable.Dispose: per-item try/catch with
AggregateException. Prevents leaking remaining disposables if one
throws. (Found by: Claude Opus 4.6 bughunt-infra)
4. [Medium] Remove duplicate 'using DynamicData.Internal' in
MergeChangeSets.cs and GroupOnDynamic.cs.
(Found by: Claude Opus 4.5 concurrency-expert)
Pre-existing issues noted but NOT fixed (existed before our changes):
- Watch() error handling (GPT-5.2)
- DynamicCombiner/TreeBuilder unsynchronized handlers (Opus 4.5)
- Suspended Connect/Watch double OnCompleted (GPT-5.2)
- Sub-queue per-instance terminal tracking (GPT-5.4)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Add missing infrastructure tests (Critical review finding)
New test fixtures:
- KeyedDisposableFixture (12 tests): Add, Remove, same-reference guard,
AddIfDisposable for disposable/non-disposable items, Dispose aggregates
exceptions, idempotent Dispose, Add-after-Dispose immediate disposal
- SharedDeliveryQueueFixture (6 tests): single source delivery, multi-source
serialization, error terminates all sub-queues, completion does NOT
terminate parent, EnsureDeliveryComplete, concurrent multi-source
- DeliveryQueueFixture additions (7 tests): EnsureDeliveryComplete terminates,
clears pending items, re-entrant from drain thread (no deadlock),
spin-waits for in-flight delivery, terminal items delivered before
termination, error terminates and clears pending
Total: 39 new infrastructure tests, all passing.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Refactor delivery queue: lock-free, reentrant, safer emits
Refactored all internal cache operators and CacheParentSubscription to use a lock-free SharedDeliveryQueue for delivery serialization, removing explicit lock usage. SharedDeliveryQueue now supports an onDrainComplete callback and same-thread reentrant delivery, preventing deadlocks and preserving correct parent/child emission order. Updated CacheParentSubscription to batch and emit changes outside the lock, and removed legacy batching logic. All operators now instantiate SharedDeliveryQueue without a lock, simplifying code and improving robustness. Added comprehensive tests for parent/child emission, batching, completion, disposal, error propagation, and deadlock scenarios. Updated copyright headers and project file.
* Add _observer field to CacheParentSubscription
Introduced a private readonly _observer field of type IObserver<TObserver> to the CacheParentSubscription class for handling observer logic. No other changes were made.
* fix: eliminate cross-cache deadlocks via queue-drain delivery pattern
Replace Synchronize(lock) with SynchronizeSafe(SharedDeliveryQueue) across
all cache operators. SharedDeliveryQueue releases the lock before downstream
delivery, preventing ABBA deadlocks between caches.
Key changes:
- SharedDeliveryQueue: parameterless ctor, OnDrainComplete callback,
same-thread reentrant delivery, DrainPending with EnterLock/ExitLock
- CacheParentSubscription: rewritten to use SharedDeliveryQueue
- DeliveryQueue<T>: typed single-source queue for ObservableCache
- 27 operator files: Synchronize(locker) -> SynchronizeSafe(queue)
- Global using DynamicData.Internal in csproj
Tests:
- DeliveryQueueFixture (21), SharedDeliveryQueueFixture (6)
- KeyedDisposableFixture (12), CacheParentSubscriptionFixture (10)
- CrossCacheDeadlockStressTest: 29 operators, all randomized
- SourceCacheFixture: DirectCrossWriteDoesNotDeadlock
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* fix: LIFO drain ordering in SharedDeliveryQueue, ExpireAfter race guard
SharedDeliveryQueue: DrainPending now iterates sub-queues newest-first (LIFO).
Child sub-queues are always created after the parent sub-queue in
CacheParentSubscription. LIFO ensures child items are fully delivered
before parent operations that may dispose child subscriptions —
preventing silent loss of pending child notifications (including
Removes) by the stopped AutoDetachObserver.
ExpireAfter.ForSource: Guard against deferred notification delivery
causing stale _expirationDueTimesByKey state. Re-check that the item
still exists AND still has a Lifetime before expiring it. Without this,
an item updated to have no Lifetime could be incorrectly expired if the
Update notification hadn't been delivered yet.
Also: collapsed duplicate #if NET9_0_OR_GREATER in MergeChangeSets,
removed redundant using DynamicData.Internal from touched files,
added self-locking DeliveryQueue(observer) constructor, simplified
MergeMany.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* fix: review findings - sub-queue leak, drain thread reset, IObserver, Notification safety
Multi-agent code review findings addressed:
SharedDeliveryQueue:
- Sub-queues marked removed on disposal, compacted lazily after drain
- RemoveQueue + MarkRemoved + CompactRemovedQueues prevents O(history) drain scans
- DeliverySubQueue implements IObserver<T> (cleaner SynchronizeSafe)
DeliveryQueue:
- _drainThreadId reset to -1 on all exit paths (matches SharedDeliveryQueue)
Notification<T>:
- Added where T : notnull constraint (prevents null-as-terminal misclassification)
- OnError guards against null exception argument
- Completed is static readonly field (avoids per-access allocation)
KeyedDisposable:
- Add is exception-safe: new item tracked before old disposed
- Removed KeyedDisposableExtensions (Add handles IDisposable check internally)
SynchronizeSafeExtensions:
- SynchronizeSafe(SharedDeliveryQueue) calls RemoveQueue on disposal
- Uses source.SubscribeSafe(subQueue) via IObserver<T>
ObservableCacheEx:
- ToObservableOptional uses Defer/Do/Merge/Defer composition (no SDQ)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Refactor Notification<T> API and update usage throughout
- Rename Notification<T> static methods: Next → CreateNext, OnError → CreateError, Completed → CreateCompleted (now a method).
- Make Value and Error properties with getters; add IsError property.
- Update all usages to new Create* methods and Enqueue* methods in DeliveryQueue<T> and DeliverySubQueue<T>.
- Use IsError in DeliverySubQueue<T>.StageNext for clarity.
- Simplify disposal in SynchronizeSafeExtensions with CompositeDisposable.
- Update copyright header in ObservableCache.cs.
- Improves code clarity, consistency, and reduces risk of misuse.
* Refactor delivery queue for thread safety and clarity
- Rename Enqueue to EnqueueNext for clearer intent
- Replace _isDelivering with _drainThreadId for precise reentrancy tracking
- Use AcquireReadLock() for all internal locking
- Refactor drain logic to prevent concurrent delivery
- Make IDrainable methods explicit to hide from public API
- Fix KeyedDisposable to avoid double-disposal
- Update tests to use EnqueueNext and match new logic
- Improve documentation on sub-queue draining order and semantics
* Clarify LIFO rationale, add notnull to DeliverySubQueue<T>
Revised DrainPending XML doc to clarify LIFO iteration rationale and its effect on sub-queue disposal. Added where T : notnull constraint to DeliverySubQueue<T> to enforce non-nullable types.
* perf: add delivery queue throughput and contention benchmarks
* perf: batch drain in DeliveryQueue, add MMCS contention benchmark
DeliveryQueue now batches all pending items in one lock acquisition
during the drain cycle instead of one-at-a-time. Reduces lock
reacquisitions from N to 1 per drain cycle under contention.
On exception, undelivered items are re-enqueued in order (skipping
the failed item).
SharedDeliveryQueue remains per-item: batch drain is incompatible
with reentrant delivery needed by CPS child-during-parent ordering.
Added MmcsContentionBenchmarks for CPS path under contention.
* refactor: revert batch drain, fix MMCS benchmark to use SourceCache children
- Revert batch drain in DeliveryQueue (no measurable benefit at realistic thread counts)
- Remove DeliveryQueueBenchmarks class (redundant with ContentionBenchmarks at ThreadCount=1)
- MMCS benchmark: use SourceCache children (SourceList was untouched, not testing our changes)
- MMCS benchmark: add SubscriberWork param (None, Sort, Transform) for fair comparison
* Refactor SharedDeliveryQueue to use Bitset for O(1) lookup
Introduce Bitset struct for efficient active queue tracking in SharedDeliveryQueue, replacing O(N) scans with O(1) bitset operations. Update sub-queue management to use stable indices and compact dead slots. Add comprehensive Bitset unit tests. Improves delivery performance, scalability, and memory usage.
* fix: review findings from multi-agent analysis
SharedDeliveryQueue:
- Fix TOCTOU: items lost when onDrainComplete is null
- Fix _drainThreadId leak after error-terminated drain
- Fix Dispose data race: acquire lock, idempotent, clear items
- Cache Environment.CurrentManagedThreadId in ExitLockAndDrain
- Devirtualize IDrainable -> DrainableBase abstract class
- Queue initial capacity hint
CacheParentSubscription:
- Deferred completion: _isCompleted flag, deliver in OnDrainComplete
- TerminalError guard prevents OnCompleted after OnError
- Document _subscriptionCounter and Finally vs onCompleted asymmetry
Operators:
- GroupOn: move Finally(OnCompleted) after SynchronizeSafe
- GroupOnDynamic: add IgnoreElements before completion subscription
- OnBeingRemoved: remove Do(observer.OnError) bypass
- MergeMany: simplify (remove SubscriptionCounter/Subject/Concat)
Infrastructure:
- Notification<T>: shrink from 24 to 16 bytes (eliminate Optional)
- Bitset: single-word fast paths for FindHighest/ClearAll
- SynchronizeSafe: Subscribe -> SubscribeSafe consistency
- SynchronizeSafe: document single-subscription on out overload
- CPS test fixture: use TestSourceCache.Complete() for completion tests
* fix: Notification<T> use T? instead of object? to avoid boxing, drop notnull constraint from queues
* refactor: dispose queues on cleanup, remove dead code, document disposal ordering
- DeliveryQueue and SharedDeliveryQueue implement IDisposable
- EnsureDeliveryComplete is now private (called only by Dispose)
- All operators include their queue in CompositeDisposable (last position)
- Parameterless SynchronizeSafe disposes queue first for teardown operators
- Removed dead out-parameter overloads and deferred-observer constructors
- DeliveryQueue._observer is now readonly (no more SetObserver)
- Notification<T> uses T? instead of object? to avoid value-type boxing
- Dropped notnull constraint from DeliveryQueue, SharedDeliveryQueue, Notification
- Replaced Disposable.Create closures with CompositeDisposable where possible
- Documented disposal ordering rationale in SynchronizeSafeExtensions
* test: reduce DirectCrossWrite iterations and increase timeout to prevent flake
50 iterations still provides good deadlock coverage. 60s timeout
handles full-suite load where stress tests compete for threads.
* refactor: thread-safe KeyedDisposable, fix Notification boxing, cleanup
- KeyedDisposable: internal locking on all operations, Dispose calls
happen outside the lock to prevent deadlocks
- DisposeMany: remove SynchronizeSafe (KeyedDisposable is now self-serializing)
- Notification<T>: use T? instead of object? (avoids value-type boxing),
drop notnull constraint
- SharedDeliveryQueue: drop notnull from CreateQueue and DeliverySubQueue
- ObservableCacheEx Bind: inline lock creation, remove unused variable
* fix: Notification<T> use Optional<T> for value-type correctness, fix KeyedDisposable snapshot bug
Notification<T> with T? field broke terminal notification delivery for
value types (Unit) on .NET 9: default(Nullable<T>) is incorrectly
evaluated as non-null in generic struct field access. Reverted to
Optional<T> which uses an explicit bool HasValue field, avoiding
the runtime bug entirely. No boxing, same 24-byte size.
Added NotificationFixture with 8 tests covering OnNext/OnError/OnCompleted
for both reference types and value types.
Fixed KeyedDisposable.Dispose(): snapshot was initialized as empty dict
instead of copying _disposables before clearing, so nothing was disposed.
* fix: wrap CompactIfNeeded in try/finally to prevent lock leak on exception
If CompactIfNeeded throws (e.g. OOM from Array.Resize in Bitset.Compact),
the exception propagated to DrainAll's catch block which re-entered the
reentrant Monitor, exited once, and rethrew with the lock permanently held
at depth 1. Now both CompactIfNeeded call sites use try/finally to guarantee
ExitLock runs even on exception.
Found by multi-agent code review (Sonnet 4.6, concurrency focus).
* fix: MergeMany queue-first disposal prevents spurious OnCompleted, use StrongBox
Finally callbacks fire on disposal, not just natural completion. With
queue-last ordering, child Finally handlers could deliver OnCompleted
to the observer during teardown. Fixed by using queue-first disposal
(same as DisposeMany/OnBeingRemoved) and guarding CheckCompleted
against terminated queue.
Replaced int[] counter with StrongBox<int> for clearer intent.
* test: remove batch pause from stress test, use batch writes
Remove BatchIf pause during concurrent writes so notifications flow
freely through bidirectional pipelines, maximizing contention.
Use Edit(batch) instead of per-item AddOrUpdate for larger changesets.
* test: add DeadlockTortureTest proving ABBA fix across all dangerous operators
12 tests: Sort, AutoRefresh, GroupOn, Page, Virtualise, Transform(force),
BatchIf, DisposeMany, OnItemRemoved, stacked chain, 8 parallel pairs,
and three-way circular (A->B->C->A).
Each test creates bidirectional cross-cache pipelines through dangerous
operators and hammers them with concurrent writes from two threads.
On main (Synchronize(lock)): deadlocks within seconds (3 deadlocks + crash
in first run). On the PR branch (SynchronizeSafe): 12/12 pass.
---------
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>1 parent 335ecbc commit e6055c5
51 files changed
Lines changed: 5386 additions & 552 deletions
File tree
- src
- DynamicData.Benchmarks
- Cache
- DynamicData.Tests
- Cache
- Internal
- DynamicData
- Binding
- Cache
- Internal
- Internal
Some content is hidden
Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 226 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
| 159 | + | |
| 160 | + | |
| 161 | + | |
| 162 | + | |
| 163 | + | |
| 164 | + | |
| 165 | + | |
| 166 | + | |
| 167 | + | |
| 168 | + | |
| 169 | + | |
| 170 | + | |
| 171 | + | |
| 172 | + | |
| 173 | + | |
| 174 | + | |
| 175 | + | |
| 176 | + | |
| 177 | + | |
| 178 | + | |
| 179 | + | |
| 180 | + | |
| 181 | + | |
| 182 | + | |
| 183 | + | |
| 184 | + | |
| 185 | + | |
| 186 | + | |
| 187 | + | |
| 188 | + | |
| 189 | + | |
| 190 | + | |
| 191 | + | |
| 192 | + | |
| 193 | + | |
| 194 | + | |
| 195 | + | |
| 196 | + | |
| 197 | + | |
| 198 | + | |
| 199 | + | |
| 200 | + | |
| 201 | + | |
| 202 | + | |
| 203 | + | |
| 204 | + | |
| 205 | + | |
| 206 | + | |
| 207 | + | |
| 208 | + | |
| 209 | + | |
| 210 | + | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
| 220 | + | |
| 221 | + | |
| 222 | + | |
| 223 | + | |
| 224 | + | |
| 225 | + | |
| 226 | + | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
2 | 2 | | |
3 | 3 | | |
4 | 4 | | |
5 | | - | |
| 5 | + | |
6 | 6 | | |
7 | 7 | | |
8 | 8 | | |
| |||
0 commit comments