diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 00000000..2a577f40 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,271 @@ +# DynamicData — AI Instructions + +## What is DynamicData? + +DynamicData is a reactive collections library for .NET, built on top of [Reactive Extensions (Rx)](https://github.com/dotnet/reactive). It provides `SourceCache` and `SourceList` — observable data collections that emit **changesets** when modified. These changesets flow through operator pipelines (Sort, Filter, Transform, Group, Join, etc.) that maintain live, incrementally-updated views of the data. + +DynamicData is used in production by thousands of applications. It is the reactive data layer for [ReactiveUI](https://reactiveui.net/), making it foundational infrastructure for the .NET reactive ecosystem. + +## Cache vs List — Two Collection Types + +DynamicData provides two parallel collection types. **Choose the right one — they are not interchangeable.** + +| | **Cache** (`SourceCache`) | **List** (`SourceList`) | +|---|---|---| +| **Identity** | Items identified by unique key | Items identified by index position | +| **Duplicates** | Not allowed (key must be unique) | Allowed (same item at multiple positions) | +| **Ordering** | Unordered by default (Sort adds ordering) | Inherently ordered (like `List`) | +| **Best for** | Entities with IDs, lookup by key | Ordered sequences, duplicates OK | +| **Change types** | Add, Update, Remove, Refresh, Moved | Add, AddRange, Replace, Remove, RemoveRange, Moved, Refresh, Clear | +| **Changeset** | `IChangeSet` | `IChangeSet` | + +**Rule of thumb:** If your items have a natural unique key (ID, name, etc.), use **Cache**. If order matters and/or duplicates are possible, use **List**. Cache is used far more often in practice. + +See `.github/instructions/dynamicdata-cache.instructions.md` for the complete cache operator reference. + +See `.github/instructions/dynamicdata-list.instructions.md` for the complete list operator reference. + +## Why Performance Matters + +Every item flowing through a DynamicData pipeline passes through multiple operators. Each operator processes changesets — not individual items — so a single cache edit with 1000 items creates a changeset that flows through every operator in the chain. At library scale: + +- **Per-item overhead compounds**: 1 allocation × 10 operators × 1000 items × 100 pipelines = 1M allocations per batch +- **Lock contention is the bottleneck**: operators serialize access to shared state. Minimizing lock hold time is a core design goal. +- **Prefer value types and stack allocation**: use structs, `ref struct`, `Span`, and avoid closures in hot paths where possible + +When optimizing, measure allocation rates and lock contention, not just wall-clock time. + +## Why Rx Contract Compliance is Critical + +DynamicData operators compose — the output of one is the input of the next. If any operator violates the Rx contract (e.g., concurrent `OnNext` calls, calls after `OnCompleted`), every downstream operator can corrupt its internal state. This is not a crash — it's silent data corruption that manifests as wrong results, missing items, or phantom entries. In a reactive UI, this means the user sees stale or incorrect data with no error message. + +See `.github/instructions/rx.instructions.md` for comprehensive Rx contract rules, scheduler usage, disposable patterns, and a complete standard Rx operator reference. + +## Breaking Changes + +DynamicData follows [Semantic Versioning (SemVer)](https://semver.org/). Breaking changes **are possible** in major version bumps, but they are never done lightly. This library has thousands of downstream consumers — every breaking change has a blast radius. + +**Rules:** +- Breaking changes require a major version bump. **You MUST explicitly call out any potentially breaking change to the user** before making it — even if you think it's minor. Let the maintainers decide. +- Prefer non-breaking alternatives first: new overloads, new methods, optional parameters with safe defaults. +- When a breaking change is justified, mark the old API with `[Obsolete("Use XYZ instead. This will be removed in vN+1.")]` in the current version and remove it in the next major. +- Behavioral changes (different ordering, different filtering semantics, different error propagation) are breaking even if the signature is unchanged. Call these out. +- Internal types (`internal` visibility) can change freely — they are not part of the public contract. + +**What counts as breaking:** +- Changing the signature of a public extension method (parameters, return type, generic constraints) +- Changing observable behavior (emission order, filtering semantics, error/completion propagation) +- Removing or renaming public types, methods, or properties +- Adding required parameters to existing methods +- Changing the default behavior of an existing overload + +## Maintaining These Instructions + +These instruction files are living documentation. **They must be kept in sync with the code.** + +- When a **new operator** is added, it **MUST** be added to the appropriate instruction file (`dynamicdata-cache.instructions.md` or `dynamicdata-list.instructions.md`) with its change reason handling table. +- When an **operator's behavior changes**, update its table in the instruction file. +- When a **new test utility** is added, update the test utilities reference in the main instructions and the appropriate `testing-*.instructions.md`. +- When a **new domain type** is added to `Tests/Domain/`, add it to the Domain Types section. + +## Repository Structure + +``` +src/ +├── DynamicData/ # The library +│ ├── Cache/ # Cache (keyed collection) operators +│ │ ├── Internal/ # Operator implementations (private) +│ │ ├── ObservableCache.cs # Core observable cache implementation +│ │ └── ObservableCacheEx.cs # Public API: extension methods for cache operators +│ ├── List/ # List (ordered collection) operators +│ │ ├── Internal/ # Operator implementations (private) +│ │ └── ObservableListEx.cs # Public API: extension methods for list operators +│ ├── Binding/ # UI binding operators (SortAndBind, etc.) +│ ├── Internal/ # Shared internal infrastructure +│ └── Kernel/ # Low-level types (Optional, Error, etc.) +├── DynamicData.Tests/ # Tests (xUnit + FluentAssertions) +│ ├── Cache/ # Cache operator tests +│ ├── List/ # List operator tests +│ └── Domain/ # Test domain types using Bogus fakers +``` + +## Operator Architecture Pattern + +Most operators follow the same two-part pattern: + +```csharp +// 1. Public API: extension method in ObservableCacheEx.cs (thin wrapper) +public static IObservable> Transform( + this IObservable> source, + Func transformFactory) +{ + return new Transform(source, transformFactory).Run(); +} + +// 2. Internal: sealed class in Cache/Internal/ with a Run() method +internal sealed class Transform +{ + public IObservable> Run() => + Observable.Create>(observer => + { + // Subscribe to source, process changesets, emit results + // Use ChangeAwareCache for incremental state + // Call CaptureChanges() to produce the output changeset + }); +} +``` + +**Key points:** +- The extension method is the **public API surface** — keep it thin +- The internal class holds constructor parameters and implements `Run()` +- `Run()` returns `Observable.Create` which **defers subscription** (cold observable) +- Inside `Create`, operators subscribe to sources and wire up changeset processing +- `ChangeAwareCache` tracks incremental changes and produces immutable snapshots via `CaptureChanges()` +- Operators must handle all change reasons: `Add`, `Update`, `Remove`, `Refresh` + +## Thread Safety in Operators + +When an operator has multiple input sources that share mutable state: +- All sources must be serialized through a shared lock +- Use `Synchronize(gate)` with a shared lock object to serialize multiple sources +- Keep lock hold times as short as practical + +When operators use `Synchronize(lock)` from Rx: +- The lock is held during the **entire** downstream delivery chain +- This ensures serialized delivery across multiple sources sharing a lock +- Always use a private lock object — never expose it to external consumers + +## Testing + +**All new code MUST come with unit tests that prove 100% correctness. All bug fixes MUST include a regression test that reproduces the bug before verifying the fix.** No exceptions. Untested code is broken code — you just don't know it yet. + +### Frameworks and Tools + +- **xUnit** — test framework (`[Fact]`, `[Theory]`, `[InlineData]`) +- **FluentAssertions** — via the `AwesomeAssertions` NuGet package (`.Should().Be()`, `.Should().BeEquivalentTo()`, etc.) +- **Bogus** — fake data generation via `Faker` in `DynamicData.Tests/Domain/Fakers.cs` +- **TestSourceCache** and **TestSourceList** — enhanced source collections in `Tests/Utilities/` that support `.Complete()` and `.SetError()` for testing terminal Rx events + +### Test File Naming and Organization + +Tests live in `src/DynamicData.Tests/` mirroring the library structure: +- `Cache/` — cache operator tests (one fixture class per operator, e.g., `TransformFixture.cs`) +- `List/` — list operator tests +- `Domain/` — shared domain types (`Person`, `Animal`, `AnimalOwner`, `Market`, etc.) and Bogus fakers +- `Utilities/` — test infrastructure (aggregators, validators, recording observers, stress helpers) + +Naming convention: `{OperatorName}Fixture.cs`. For operators with multiple overloads, use partial classes: `FilterFixture.Static.cs`, `FilterFixture.DynamicPredicate.IntegrationTests.cs`, etc. + +For cache-specific testing patterns (observation patterns, changeset assertions, stub pattern), see `.github/instructions/testing-cache.instructions.md`. + +For list-specific testing patterns, see `.github/instructions/testing-list.instructions.md`. + +### Testing the Rx Contract + +`.ValidateSynchronization()` detects Rx contract violations. It tracks in-flight notifications with `Interlocked.Exchange` — if two threads enter `OnNext` simultaneously, it throws `UnsynchronizedNotificationException`. It uses raw observer/observable types to bypass Rx's built-in safety guards so violations are surfaced, not masked. + +```csharp +source.Connect() + .Transform(x => new ViewModel(x)) + .ValidateSynchronization() // THROWS if concurrent delivery detected + .RecordCacheItems(out var results); +``` + +### Writing Regression Tests for Bug Fixes + +Every bug fix **must** include a test that: +1. **Reproduces the bug** — the test fails without the fix +2. **Verifies the fix** — the test passes with the fix +3. **Is named descriptively** — describes the scenario, not the bug ID + +```csharp +// GOOD: describes the scenario that was broken +[Fact] +public void RemoveThenReAddWithSameKey_ShouldNotDuplicate() + +// BAD: meaningless to future readers +[Fact] +public void FixBug1234() +``` + +### Stress Test Principles + +- Use `Barrier` for simultaneous start — maximizes contention. Include the main thread in participant count. +- Use deterministic data so failures are reproducible. Use `Bogus.Randomizer` with a fixed seed — **never `System.Random`**. +- Assert the **exact final state**, not just "count > 0". +- Use `Task.WhenAny(completed, Task.Delay(timeout))` to detect deadlocks with a meaningful timeout. +- Include mixed operations: adds, updates, removes, property mutations, dynamic parameter changes. +- Use the `StressAddRemove` extension methods in `Tests/Utilities/` for standard add/remove patterns with timed removal. + +### Test Utilities Reference + +The `Tests/Utilities/` directory provides powerful helpers — **use them** instead of reinventing: + +| Utility | Purpose | +|---------|---------| +| `ValidateSynchronization()` | Detects concurrent `OnNext` — Rx contract violation | +| `ValidateChangeSets(keySelector)` | Validates structural integrity of every changeset | +| `RecordCacheItems(out results)` | Cache recording observer with keyed + sorted tracking | +| `RecordListItems(out results)` | List recording observer | +| `TestSourceCache` | SourceCache with `.Complete()` and `.SetError()` support | +| `TestSourceList` | SourceList with `.Complete()` and `.SetError()` support | +| `StressAddRemove` extensions | Add/remove stress patterns with timed automatic removal | +| `ForceFail(count, exception)` | Forces an observable to error after N emissions | +| `Parallelize(count, parallel)` | Creates parallel subscriptions for stress testing | +| `ObservableSpy` | Diagnostic logging for pipeline debugging | +| `FakeScheduler` | Controlled scheduler for time-dependent tests | +| `Fakers.*` | Bogus fakers for `Person`, `Animal`, `AnimalOwner`, `Market` | + +### Domain Types + +Shared domain types in `Tests/Domain/`: + +- **`Person`** — `Name` (key), `Age`, `Gender`, `FavoriteColor`, `PetType`. Implements `INotifyPropertyChanged`. +- **`Animal`** — `Name` (key), `Type`, `Family` (enum: Mammal, Reptile, Fish, Amphibian, Bird) +- **`AnimalOwner`** — `Name` (key), `Animals` (ObservableCollection). Ideal for `TransformMany`/`MergeManyChangeSets` tests. +- **`Market`** / **`MarketPrice`** — financial-style streaming data tests +- **`PersonWithGender`**, **`PersonWithChildren`**, etc. — transform output types + +Generate test data with Bogus: +```csharp +var people = Fakers.Person.Generate(100); +var animals = Fakers.Animal.Generate(50); +var owners = Fakers.AnimalOwnerWithAnimals.Generate(10); // pre-populated with animals +``` + +### Test Anti-Patterns + +**❌ Testing implementation details instead of behavior:** +```csharp +// BAD: message count is an implementation detail — fragile +results.Messages.Count.Should().Be(3); +// GOOD: test the observable behavior and final state +results.Data.Count.Should().Be(expectedCount); +results.Data.Items.Should().BeEquivalentTo(expectedItems); +``` + +**❌ Using `Thread.Sleep` for timing:** +```csharp +// BAD: flaky and slow +Thread.Sleep(1000); +// GOOD: use test schedulers or deterministic waiting +var scheduler = new TestScheduler(); +scheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks); +``` + +**❌ Ignoring disposal:** +```csharp +// BAD: leaks subscriptions, masks errors +var results = source.Connect().Filter(p => true).AsAggregator(); +// GOOD: using ensures cleanup even if assertion throws +using var results = source.Connect().Filter(p => true).AsAggregator(); +``` + +**❌ Non-deterministic data without seeds:** +```csharp +// BAD: failures aren't reproducible across runs +var random = new Random(); +// GOOD: use Bogus Randomizer with a fixed seed +var randomizer = new Randomizer(42); +var people = Fakers.Person.UseSeed(randomizer.Int()).Generate(100); +``` diff --git a/.github/instructions/dynamicdata-cache.instructions.md b/.github/instructions/dynamicdata-cache.instructions.md new file mode 100644 index 00000000..aaa2016b --- /dev/null +++ b/.github/instructions/dynamicdata-cache.instructions.md @@ -0,0 +1,906 @@ +--- +applyTo: "src/DynamicData/**/*.cs" +--- +# DynamicData Cache Operators — Comprehensive Guide + +Cache operators work with **keyed collections**: `IObservable>`. Every item has a unique key. This is the most commonly used side of DynamicData. + +## SourceCache — Where Changesets Come From + +`SourceCache` is the entry point. It is a **mutable, observable, keyed collection**. You mutate it, and it emits changesets describing what changed. + +```csharp +// Create — provide a key selector (like a primary key) +var cache = new SourceCache(p => p.Name); + +// Mutate — all changes inside Edit() produce ONE changeset +cache.Edit(updater => +{ + updater.AddOrUpdate(new Person("Alice", 30)); + updater.AddOrUpdate(new Person("Bob", 25)); + updater.Remove("Charlie"); +}); +// ^ This produces 1 changeset with 2 Adds + 1 Remove + +// Single-item convenience methods (each produces its own changeset) +cache.AddOrUpdate(new Person("Dave", 40)); // 1 changeset with 1 Add +cache.Remove("Bob"); // 1 changeset with 1 Remove + +// Observe — Connect() returns the changeset stream +cache.Connect() + .Subscribe(changes => Console.WriteLine($"Got {changes.Count} changes")); +``` + +**Key behaviors:** +- `Edit()` batches — all mutations inside a single `Edit()` lambda produce **one** changeset +- Single-item methods (`AddOrUpdate`, `Remove`) each produce their own changeset +- `Connect()` immediately emits the current cache contents as the first changeset (adds for all existing items) +- `Connect(predicate)` pre-filters at the source +- Multiple subscribers each get their own initial snapshot — `Connect()` creates a cold observable per subscriber + +### ISourceUpdater — The Edit API + +Inside `Edit()`, you receive an `ISourceUpdater`: + +```csharp +cache.Edit(updater => +{ + updater.AddOrUpdate(item); // add or replace by key + updater.AddOrUpdate(items); // batch add/replace + updater.Remove(key); // remove by key + updater.Remove(keys); // batch remove + updater.Clear(); // remove all items + updater.Refresh(key); // emit Refresh for downstream re-evaluation + updater.Refresh(); // refresh ALL items + updater.Lookup(key); // returns Optional +}); +``` + +## ObservableChangeSet.Create — Implicit Cache Factory + +`ObservableChangeSet.Create` is the cache equivalent of `Observable.Create`. It gives you a `SourceCache` inside a lambda and returns `IObservable>` — the cache is created and disposed automatically per subscriber. + +This is the **preferred way to bridge imperative code into DynamicData** without managing a `SourceCache` lifetime yourself. + +```csharp +// Synchronous — populate the cache, return a cleanup action +IObservable> people = ObservableChangeSet.Create( + cache => + { + // Populate the cache — changes flow to subscribers automatically + cache.AddOrUpdate(new Person("Alice", 30)); + cache.AddOrUpdate(new Person("Bob", 25)); + + // Return cleanup action (called on unsubscribe) + return () => { /* cleanup resources */ }; + }, + keySelector: p => p.Name); + +// Synchronous — return IDisposable for cleanup +IObservable> devices = ObservableChangeSet.Create( + cache => + { + // Subscribe to an external event source and pump into the cache + var watcher = new DeviceWatcher(); + watcher.DeviceAdded += (s, d) => cache.AddOrUpdate(d); + watcher.DeviceRemoved += (s, d) => cache.Remove(d.Id); + watcher.Start(); + + return Disposable.Create(() => watcher.Dispose()); + }, + keySelector: d => d.Id); + +// Async — useful for loading from APIs, databases, etc. +IObservable> products = ObservableChangeSet.Create( + async (cache, cancellationToken) => + { + var items = await _api.GetProductsAsync(cancellationToken); + cache.AddOrUpdate(items); + + // Set up SignalR for live updates + var connection = new HubConnectionBuilder().WithUrl("/products").Build(); + connection.On("Updated", p => cache.AddOrUpdate(p)); + connection.On("Removed", id => cache.Remove(id)); + await connection.StartAsync(cancellationToken); + + return Disposable.Create(() => connection.DisposeAsync().AsTask().Wait()); + }, + keySelector: p => p.Id); +``` + +**Key behaviors:** +- A new `SourceCache` is created **per subscriber** (cold observable) +- The cache's `Connect()` is wired to the subscriber automatically +- The lambda can populate the cache synchronously or asynchronously +- On unsubscribe, cleanup runs and the cache is disposed +- Exceptions in the lambda propagate as `OnError` + +**Overloads:** + +| Signature | Use when | +|-----------|----------| +| `Create(Func, keySelector)` | Sync, cleanup is an Action | +| `Create(Func, keySelector)` | Sync, cleanup is IDisposable | +| `Create(Func>, keySelector)` | Async setup | +| `Create(Func>, keySelector)` | Async with cancellation | +| `Create(Func>, keySelector)` | Async, cleanup is Action | +| `Create(Func>, keySelector)` | Async with cancellation, Action cleanup | +| `Create(Func, keySelector)` | Async, no explicit cleanup | +| `Create(Func, keySelector)` | Async with cancellation, no cleanup | + +## Changesets — The Core Data Model + +A changeset (`IChangeSet`) is an `IEnumerable>` — a batch of individual changes. + +### Change + +```csharp +public readonly struct Change +{ + public ChangeReason Reason { get; } // Add, Update, Remove, Refresh, Moved + public TKey Key { get; } // the item's key + public TObject Current { get; } // the current value + public Optional Previous { get; } // previous value (Update/Remove only) + public int CurrentIndex { get; } // position (-1 if unsorted) + public int PreviousIndex { get; } // previous position (-1 if unsorted) +} +``` + +### ChangeReason + +| Reason | Meaning | `Previous` populated? | +|--------|---------|----------------------| +| `Add` | New key, first time seen | No | +| `Update` | Existing key, value replaced | Yes — the old value | +| `Remove` | Item removed from cache | Yes — the removed value | +| `Refresh` | No data change — signal to re-evaluate (filter/sort/group) | No | +| `Moved` | Item changed position in sorted collection | No (same item) | + +### How Changesets Flow + +``` +SourceCache.Edit() + │ + ▼ +ChangeSet { Add("Alice"), Add("Bob"), Remove("Charlie") } + │ + ▼ .Filter(p => p.Age >= 18) +ChangeSet { Add("Alice"), Add("Bob") } ← Charlie was filtered out + │ + ▼ .Transform(p => new PersonVM(p)) +ChangeSet { Add(VM("Alice")), Add(VM("Bob")) } + │ + ▼ .Sort(comparer) +ISortedChangeSet { sorted items with index positions } + │ + ▼ .Bind(out collection) +ReadOnlyObservableCollection updated in-place +``` + +Each operator reconstructs a **new** changeset from its internal state — changesets are not passed through, they are re-emitted. + +## ChangeAwareCache — How Operators Build Changesets + +Inside operators, `ChangeAwareCache` (a `Dictionary` that records every mutation) tracks state. Call `CaptureChanges()` to harvest the changeset and reset. + +```csharp +var cache = new ChangeAwareCache(); + +foreach (var change in incoming) +{ + switch (change.Reason) + { + case ChangeReason.Add: + case ChangeReason.Update: + cache.AddOrUpdate(transform(change.Current), change.Key); + break; + case ChangeReason.Remove: + cache.Remove(change.Key); + break; + case ChangeReason.Refresh: + cache.Refresh(change.Key); + break; + } +} + +var output = cache.CaptureChanges(); +if (output.Count > 0) + observer.OnNext(output); +``` + +## Operator Reference — Change Reason Handling + +Below is every cache operator with its exact handling of each `ChangeReason`. This documents the contract — what the operator emits downstream for each input reason. + +Legend: +- **→ Add** = emits an Add downstream +- **→ Update** = emits an Update downstream +- **→ Remove** = emits a Remove downstream +- **→ Refresh** = emits a Refresh downstream +- **→ (nothing)** = swallowed, no downstream emission +- **→ conditional** = depends on state (explained in notes) + +--- + +### Filter (static predicate) + +Evaluates a `Func` predicate against each item. + +| Input | Behavior | +|-------|----------| +| **Add** | If predicate matches → Add. If not → nothing. | +| **Update** | If new value matches → AddOrUpdate (Add if first match, Update if already downstream). If not → Remove (if was downstream). | +| **Remove** | If item was downstream → Remove. If not → nothing. | +| **Refresh** | Re-evaluates predicate. If now matches and wasn't → Add. If still matches → Refresh. If no longer matches → Remove. | + +### Filter (dynamic predicate observable) + +Like static filter, but when the predicate observable fires, **all items** are re-evaluated against the new predicate. + +Per-item handling is the same as static filter. Additionally: + +| Event | Behavior | +|-------|----------| +| **Predicate fires** | Full re-evaluation of all items: items newly matching → Add, no longer matching → Remove, still matching → Refresh or Update. | + +### FilterOnObservable + +Each item gets its own `IObservable` controlling inclusion. + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to per-item observable. When observable emits `true` → Add downstream. | +| **Update** | Disposes old subscription, subscribes to new item's observable. | +| **Remove** | Disposes subscription. If item was downstream → Remove. | +| **Refresh** | Forwarded as Refresh if item is currently downstream. | +| **Item observable fires** | `true` and not downstream → Add. `false` and downstream → Remove. | + +### FilterImmutable + +Optimized filter that assumes items never change — Refresh is ignored entirely. + +| Input | Behavior | +|-------|----------| +| **Add** | If predicate matches → Add. | +| **Update** | If new value matches → AddOrUpdate. If not → Remove. | +| **Remove** | If downstream → Remove. | +| **Refresh** | **Ignored** — items are assumed immutable. | + +### WhereReasonsAre / WhereReasonsAreNot + +Passes through only changes with specified reasons. + +| Input | Behavior | +|-------|----------| +| **Any reason** | If reason is in the allowed set → pass through. Otherwise → nothing. | + +--- + +### Transform + +Applies `Func` to produce a parallel keyed collection of transformed items. + +| Input | Behavior | +|-------|----------| +| **Add** | Calls transform factory → Add transformed item. | +| **Update** | Calls transform factory with current and previous → Update transformed item. | +| **Remove** | → Remove (no factory call). | +| **Refresh** | Default: → Refresh (forwarded, no re-transform). With `transformOnRefresh: true`: calls factory again → Update. | + +### TransformSafe + +Same as Transform, but catches exceptions in the transform factory and routes them to an error callback instead of `OnError`. + +Same as Transform, but catches exceptions in the transform factory and routes them to an error callback instead of `OnError`. The changeset is still emitted — only the failed item is skipped and reported. + +### TransformAsync + +Async version of Transform — `Func>`. + +Same change handling as Transform, but the factory returns `Task` and is awaited. + +### TransformWithInlineUpdate + +On Add: creates new transformed item. On Update: **mutates the existing transformed item** instead of replacing. + +| Input | Behavior | +|-------|----------| +| **Add** | Calls transform factory → Add. | +| **Update** | Calls inline update action on existing transformed item → Update (same reference, mutated). | +| **Remove** | → Remove. | +| **Refresh** | Default: → Refresh. With `transformOnRefresh: true`: inline update → Update. | + +### TransformImmutable + +Optimized transform that skips Refresh handling. + +| Input | Behavior | +|-------|----------| +| **Add** | Calls factory → Add. | +| **Update** | Calls factory → Update. | +| **Remove** | → Remove. | +| **Refresh** | **Ignored.** | + +### TransformOnObservable + +Each item gets a per-item `IObservable`. The latest emitted value is the transformed result. + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to per-item observable. First emission → Add downstream. Subsequent → Update. | +| **Update** | Disposes old subscription, subscribes to new observable. | +| **Remove** | Disposes subscription → Remove. | +| **Refresh** | Forwarded if item is downstream. | + +### TransformMany + +Flattens 1:N — each source item produces multiple destination items with their own keys. + +| Input | Behavior | +|-------|----------| +| **Add** | Expands item into N children via selector → Add for each child. If child observable provided, subscribes for live updates. | +| **Update** | Diff old children vs new children → Remove old-only, Add new-only, Update shared keys. | +| **Remove** | → Remove all children of this parent. Dispose child subscription. | +| **Refresh** | Re-expands → diff children (effectively same as Update). | + +### ChangeKey + +Re-keys items using a new key selector. + +| Input | Behavior | +|-------|----------| +| **Add** | → Add with new key. | +| **Update** | If new key same as old → Update. If key changed → Remove(old key) + Add(new key). | +| **Remove** | → Remove with mapped key. | +| **Refresh** | → Refresh with mapped key. | + +### TransformToTree + +Builds hierarchical tree from flat list using a parent key selector. + +| Input | Behavior | +|-------|----------| +| **Add** | Creates tree node, attaches to parent (or root) → Add. | +| **Update** | Updates node. If parent changed → re-parents (Remove from old, Add to new). | +| **Remove** | Removes node and orphans/re-parents children → Remove. | +| **Refresh** | → Refresh on node. | + +--- + +### Sort + +Sorts items using `IComparer`. Emits `ISortedChangeSet` with index positions. + +| Input | Behavior | +|-------|----------| +| **Add** | Inserts at sorted position → Add with index. May emit Moves for displaced items. | +| **Update** | If sort position unchanged → Update. If position changed → Update + Move. | +| **Remove** | → Remove at index. May emit Moves for displaced items. | +| **Refresh** | Re-evaluates sort position. If unchanged → Refresh. If changed → Move. | +| **Comparer fires** | Full re-sort of all items. Emits Moves for items that changed position. | + +### SortAndBind + +Combines Sort + Bind into a single operator for efficiency. Maintains a sorted `IList` in-place. + +Same change handling as Sort, but directly applies insert/remove/move operations to the bound `IList` instead of emitting a changeset. + +### Page + +Takes a sorted stream and applies page number + page size windowing. + +| Input | Behavior | +|-------|----------| +| **Add/Update/Remove/Refresh** | From Sort output, applies page window. Items outside page → filtered out. | +| **Page request fires** | Recalculates page window → Add items now in page, Remove items now outside. | + +### Virtualise + +Takes a sorted stream and applies start index + size windowing (sliding window). + +Same as Page but uses absolute start index + size instead of page number + page size. + +### Top + +Takes the first N items from a sorted stream. + +| Input | Behavior | +|-------|----------| +| **Add** | If within top N → Add. Bumps Nth item out → Remove. | +| **Remove** | If was in top N → Remove. Next item enters → Add. | +| **Other** | Maintains top N invariant after each change. | + +--- + +### Group (GroupOn) + +Groups items by a key selector. Emits `IChangeSet>`. + +| Input | Behavior | +|-------|----------| +| **Add** | Determines group → adds item to that group's sub-cache. If new group → Add(group). | +| **Update** | If group unchanged → Update within group. If group changed → Remove from old group, Add to new group. Empty old group → Remove(group). | +| **Remove** | Removes from group. If group now empty → Remove(group). | +| **Refresh** | Re-evaluates group key. If same → Refresh within group. If changed → moves between groups. | + +### GroupWithImmutableState + +Same grouping as Group, but emits immutable snapshots instead of live sub-caches. + +Same grouping logic as Group, but emits immutable snapshots instead of live sub-caches. Each affected group emits a new immutable snapshot on every change. + +### GroupOnObservable + +Group key is determined by a per-item `IObservable`. Items can move between groups reactively. + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to group key observable. On first emission → Add to group. | +| **Update** | Disposes old subscription, subscribes to new. | +| **Remove** | Disposes subscription → Remove from current group. | +| **Item observable fires** | If new group key ≠ current → Remove from old group, Add to new group. | + +--- + +### InnerJoin + +Combines two keyed streams. Only emits for keys present in **both** left and right. + +| Input (left) | Behavior | +|-------|----------| +| **Add** | If matching right exists → Add joined result. | +| **Update** | If matching right exists → Update joined result. | +| **Remove** | → Remove joined result (if was downstream). | +| **Refresh** | → Refresh joined result (if downstream). | + +| Input (right) | Behavior | +|-------|----------| +| **Add** | If matching left exists → Add joined result. | +| **Update** | → Update joined result. | +| **Remove** | → Remove joined result. | + +### LeftJoin + +All left items, optional right. Right side is `Optional`. + +| Input (left) | Behavior | +|-------|----------| +| **Add** | → Add (with right if exists, `Optional.None` otherwise). | +| **Update** | → Update. | +| **Remove** | → Remove. | +| **Refresh** | → Refresh. | + +| Input (right) | Behavior | +|-------|----------| +| **Add** | If matching left exists → Update (left now has a right). | +| **Update** | → Update. | +| **Remove** | If matching left exists → Update (right becomes None). | + +### RightJoin + +Mirror of LeftJoin — all right items, optional left. + +### FullJoin + +All items from both sides. Both sides are `Optional`. + +| Input (either side) | Behavior | +|-------|----------| +| **Add** | → Add (or Update if other side already has entry). | +| **Update** | → Update. | +| **Remove** | If other side still has entry → Update (this side becomes None). If neither → Remove. | + +### *JoinMany variants + +Same join semantics, but the non-primary side is grouped — produces `IGrouping` instead of single items. + +--- + +### Or (Union) + +Items present in **any** source. + +| Input | Behavior | +|-------|----------| +| **Add** (from any source) | If key not yet downstream → Add. If already present (from another source) → reference count incremented, no emission. | +| **Remove** (from any source) | Decrement reference count. If count reaches 0 → Remove. Otherwise → nothing. | +| **Update** | → Update if downstream. | + +### And (Intersection) + +Items present in **all** sources. + +| Input | Behavior | +|-------|----------| +| **Add** | If key is now present in all sources → Add. Otherwise → nothing. | +| **Remove** | If was present in all → Remove. | +| **Update** | → Update if still in all sources. | + +### Except (Difference) + +Items in first source but **not** in any other source. + +| Input | Behavior | +|-------|----------| +| **Add** (first source) | If not in any other source → Add. | +| **Add** (other source) | If key was downstream → Remove. | +| **Remove** (other source) | If key is in first source and now absent from all others → Add. | + +### Xor (Symmetric Difference) + +Items present in exactly **one** source. + +| Input | Behavior | +|-------|----------| +| **Add** | If key is now in exactly 1 source → Add. If now in 2+ → Remove. | +| **Remove** | If key is now in exactly 1 → Add. If now in 0 → Remove. | + +### MergeChangeSets + +Merges N changeset streams into one. Last-writer-wins by default, or use comparer/equalityComparer for conflict resolution. + +| Input (from any source) | Behavior | +|-------|----------| +| **Add** | → Add (or Update if key already from another source, resolved by comparer). | +| **Update** | → Update (resolved by comparer if configured). | +| **Remove** | → Remove (unless another source still has the key). | +| **Refresh** | → Refresh. | + +--- + +### AutoRefresh + +Monitors `INotifyPropertyChanged` on items and emits Refresh when a specified property changes. + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to PropertyChanged on item → passes through as Add. | +| **Update** | Disposes old subscription, subscribes to new item → passes through as Update. | +| **Remove** | Disposes subscription → passes through as Remove. | +| **Refresh** | → passes through as Refresh. | +| **Property changes** | Emits new changeset with `ChangeReason.Refresh` for that key. | + +### AutoRefreshOnObservable + +Like AutoRefresh, but uses a per-item `IObservable` instead of `INotifyPropertyChanged`. + +Same as AutoRefresh but uses a per-item `IObservable` to trigger Refresh instead of `INotifyPropertyChanged`. + +### SuppressRefresh + +Strips all Refresh changes from the stream. + +| Input | Behavior | +|-------|----------| +| **Add/Update/Remove** | → passes through unchanged. | +| **Refresh** | **Dropped.** | + +--- + +### MergeMany + +Subscribes to a per-item `IObservable` for each item, merges all into a single `IObservable` (not a changeset stream). + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to per-item observable. Emissions → merged output. | +| **Update** | Disposes old subscription, subscribes to new item's observable. | +| **Remove** | Disposes subscription. | +| **Refresh** | No effect on subscriptions. | + +### MergeManyChangeSets + +Each item produces its own `IObservable`. All are merged into a single flattened changeset stream. + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to child changeset stream. Child changes → merged into output. | +| **Update** | Disposes old child subscription, subscribes to new. | +| **Remove** | Disposes child subscription. Emits Remove for all child items. | +| **Refresh** | No effect on child subscriptions. | + +### MergeManyItems + +Like MergeMany but wraps each value with its parent item. + +Same as MergeMany, but wraps each emission as `ItemWithValue` — pairing the parent item with its emitted value. + +### SubscribeMany + +Creates an `IDisposable` subscription per item. Disposes on removal/update. + +| Input | Behavior | +|-------|----------| +| **Add** | Calls subscription factory → stores IDisposable. Passes through Add. | +| **Update** | Disposes old subscription, creates new. Passes through Update. | +| **Remove** | Disposes subscription. Passes through Remove. | +| **Refresh** | Passes through. No subscription change. | + +--- + +### DisposeMany / AsyncDisposeMany + +Calls `Dispose()` (or `DisposeAsync()`) on items when they are removed or replaced. + +| Input | Behavior | +|-------|----------| +| **Add** | Passes through. Tracks item for future disposal. | +| **Update** | Disposes **previous** item. Passes through Update. | +| **Remove** | Disposes item. Passes through Remove. | +| **Refresh** | Passes through. No disposal. | +| **Subscription disposed** | Disposes **all** tracked items. | + +### OnItemAdded / OnItemUpdated / OnItemRemoved / OnItemRefreshed + +Side-effect callbacks for specific lifecycle events. The changeset is forwarded unchanged. + +| Operator | Fires on | +|----------|----------| +| `OnItemAdded` | Add only | +| `OnItemUpdated` | Update only (receives current + previous) | +| `OnItemRemoved` | Remove only. Also fires for **all items** on subscription disposal. | +| `OnItemRefreshed` | Refresh only | + +### ForEachChange + +Invokes an `Action>` for every individual change. All change reasons trigger the action. The changeset is forwarded unchanged. + +--- + +### QueryWhenChanged + +Materializes the cache on each changeset and emits a snapshot or projected value. + +| Input | Behavior | +|-------|----------| +| **Any change** | Updates internal cache. Emits `IQuery` snapshot (or projected value). | + +### ToCollection + +Emits `IReadOnlyCollection` on every changeset. + +| Input | Behavior | +|-------|----------| +| **Any change** | Rebuilds full collection from internal state → emits. | + +### ToSortedCollection + +Same as ToCollection but sorted. + +### DistinctValues + +Tracks distinct values of a property across all items. Emits `DistinctChangeSet`. + +| Input | Behavior | +|-------|----------| +| **Add** | Extracts value. If value first seen → Add. If already tracked → reference count++. | +| **Update** | If value changed: old value count--, new value count++. Add/Remove distinct values accordingly. | +| **Remove** | Decrements count. If count reaches 0 → Remove distinct value. | +| **Refresh** | Re-evaluates value. Same as Update logic. | + +### TrueForAll / TrueForAny + +Emits `bool` based on whether all/any items match a condition (via per-item observable). + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to per-item observable. Recalculates aggregate → emits bool. | +| **Update** | Re-subscribes. Recalculates. | +| **Remove** | Disposes subscription. Recalculates. | + +### Watch / WatchValue + +Filters the stream to a single key. + +| Input | Behavior | +|-------|----------| +| **Add/Update/Remove/Refresh for target key** | Emits the Change (Watch) or just the value (WatchValue). | +| **Other keys** | Ignored. | + +### ToObservableOptional + +Watches a single key and emits `Optional` — `Some` when present, `None` when removed. + +--- + +### BatchIf + +Buffers changesets while a condition is true, flushes as a single combined changeset when condition becomes false. + +| Input | Behavior | +|-------|----------| +| **Any (while paused)** | Buffered — combined into internal changeset list. | +| **Any (while active)** | Passed through immediately. | +| **Condition becomes false** | Flushes all buffered changesets as a batch. | + +### Batch (time-based) + +Standard Rx `Buffer` applied to changeset streams. + +### BufferInitial + +Buffers the initial burst of changesets for a time window, then passes through. + +--- + +### Bind + +Materializes a sorted changeset stream into a `ReadOnlyObservableCollection`. + +| Input | Behavior | +|-------|----------| +| **Add** | Insert into collection at correct index. | +| **Update** | Replace item at index. | +| **Remove** | Remove from collection at index. | +| **Moved** | Move item in collection. | +| **Refresh** | Depends on binding adaptor. | + +### PopulateInto + +Writes changesets into another `SourceCache`. + +| Input | Behavior | +|-------|----------| +| **Add** | → `AddOrUpdate` on target cache. | +| **Update** | → `AddOrUpdate` on target cache. | +| **Remove** | → `Remove` on target cache. | +| **Refresh** | → `Refresh` on target cache. | + +--- + +### AsObservableCache + +Materializes the stream into a read-only `IObservableCache`. + +### DeferUntilLoaded + +Suppresses emissions until the first non-empty changeset, then passes all through. + +### SkipInitial + +Skips the first changeset (typically the initial snapshot from Connect()). + +### NotEmpty + +Filters out empty changesets. + +### StartWithEmpty / StartWithItem + +Emits an empty changeset (or a single Add) immediately on subscription. + +### ExpireAfter + +Auto-removes items after a timeout. + +| Input | Behavior | +|-------|----------| +| **Add** | Schedules removal after timeout → passes through Add. | +| **Update** | Resets timer → passes through Update. | +| **Remove** | Cancels timer → passes through Remove. | +| **Timer fires** | Emits Remove for expired item. | + +### LimitSizeTo + +FIFO eviction when cache exceeds a size limit. + +| Input | Behavior | +|-------|----------| +| **Add** | If cache exceeds limit → Remove oldest items. | +| **Other** | Passed through. | + +### Switch + +`IObservable>>` → subscribes to the latest inner observable, disposing previous. + +### RefCount + +Shares the upstream subscription with reference counting. + +### Cast / OfType + +Cast items to a different type or filter by type. + +### Flatten + +Converts `IChangeSet` into `IObservable>` — one emission per individual change. + +### RemoveKey + +Converts `IChangeSet` to `IChangeSet` — drops the key to produce a list changeset. + +### EnsureUniqueKeys + +Validates that all keys in each changeset are unique. Throws if duplicates detected. + +### IgnoreSameReferenceUpdate / IgnoreUpdateWhen / IncludeUpdateWhen + +Filters Update changes based on reference equality or a custom predicate. If filtered out, the Update is silently dropped. + +--- + +### Property Observation + +| Operator | Behavior | +|----------|----------| +| `WhenPropertyChanged(expr)` | Emits `PropertyValue` (item + value) when the specified property changes on any item. Subscribes per-item on Add, disposes on Remove. | +| `WhenValueChanged(expr)` | Like above but emits just the property value (no sender). | +| `WhenAnyPropertyChanged()` | Emits the item when **any** property changes (no specific property). | + +--- + +## Writing a New Cache Operator + +### Step 1: Extension Method (Public API) + +In `ObservableCacheEx.cs`: + +```csharp +public static IObservable> MyOperator( + this IObservable> source, + Func selector) + where TSource : notnull + where TKey : notnull + where TDest : notnull +{ + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + selector.ThrowArgumentNullExceptionIfNull(nameof(selector)); + return new MyOperator(source, selector).Run(); +} +``` + +### Step 2: Internal Class + +In `Cache/Internal/MyOperator.cs`: + +```csharp +internal sealed class MyOperator( + IObservable> source, + Func selector) + where TSource : notnull + where TKey : notnull + where TDest : notnull +{ + public IObservable> Run() => + Observable.Create>(observer => + { + var cache = new ChangeAwareCache(); + + return source.SubscribeSafe(Observer.Create>( + onNext: changes => + { + foreach (var change in changes) + { + switch (change.Reason) + { + case ChangeReason.Add: + case ChangeReason.Update: + cache.AddOrUpdate(selector(change.Current), change.Key); + break; + case ChangeReason.Remove: + cache.Remove(change.Key); + break; + case ChangeReason.Refresh: + cache.Refresh(change.Key); + break; + } + } + + var output = cache.CaptureChanges(); + if (output.Count > 0) + observer.OnNext(output); + }, + onError: observer.OnError, + onCompleted: observer.OnCompleted)); + }); +} +``` + +### Checklist + +1. Handle **all four change reasons**: Add, Update, Remove, Refresh +2. Use `ChangeAwareCache` for state — call `CaptureChanges()` for output +3. Never emit empty changesets +4. Propagate `OnError` and `OnCompleted` +5. Multiple sources → serialize with `Synchronize(gate)` using a shared lock +6. Return proper `IDisposable` (`CompositeDisposable` if multiple subscriptions) +7. Write tests covering all scenarios (see Testing section in main instructions) \ No newline at end of file diff --git a/.github/instructions/dynamicdata-list.instructions.md b/.github/instructions/dynamicdata-list.instructions.md new file mode 100644 index 00000000..5570eee1 --- /dev/null +++ b/.github/instructions/dynamicdata-list.instructions.md @@ -0,0 +1,604 @@ +--- +applyTo: "src/DynamicData/**/*.cs" +--- +# DynamicData List Operators — Comprehensive Guide + +List operators work with **unkeyed, ordered collections**: `IObservable>`. Items are identified by **index position**, not by key. This is the counterpart to Cache operators. + +## SourceList — Where List Changesets Come From + +`SourceList` is the entry point. It is a **mutable, observable, ordered collection**. + +```csharp +// Create +var list = new SourceList(); + +// Mutate — all changes inside Edit() produce ONE changeset +list.Edit(inner => +{ + inner.Add("Alice"); + inner.AddRange(new[] { "Bob", "Charlie" }); + inner.Remove("Alice"); + inner.Move(0, 1); // move Bob from index 0 to index 1 + inner.RemoveAt(0); + inner.Insert(0, "Dave"); + inner.Clear(); +}); +// ^ Produces 1 changeset with all the above changes + +// Single-item convenience methods (each produces its own changeset) +list.Add("Eve"); +list.Remove("Eve"); + +// Observe +list.Connect() + .Subscribe(changes => Console.WriteLine($"Got {changes.Count} changes")); +``` + +**Key behaviors:** +- `Edit()` batches — all mutations produce **one** changeset +- Single-item methods (`Add`, `Remove`) each produce their own changeset +- `Connect()` immediately emits current contents as an `AddRange` changeset +- List operations preserve index positions — insertions and removals shift subsequent items +- Can be seeded from an `IObservable>` in the constructor + +### IExtendedList — The Edit API + +Inside `Edit()`, you receive an `IExtendedList` (extends `IList`): + +```csharp +list.Edit(inner => +{ + inner.Add(item); // append + inner.Insert(index, item); // insert at position + inner.AddRange(items); // append multiple + inner.InsertRange(items, index); // insert multiple at position + inner[index] = newItem; // replace at index (produces Replace) + inner.Remove(item); // remove first occurrence + inner.RemoveAt(index); // remove at position + inner.RemoveRange(index, count); // remove range + inner.Move(from, to); // move item between positions + inner.Clear(); // remove all +}); +``` + +## ObservableChangeSet.Create — Implicit List Factory + +`ObservableChangeSet.Create` (single type parameter, no key) is the list equivalent. It gives you a `SourceList` inside a lambda and returns `IObservable>`. + +```csharp +// Synchronous — populate the list, return cleanup +IObservable> logLines = ObservableChangeSet.Create( + list => + { + var watcher = new FileSystemWatcher("logs"); + watcher.Changed += (s, e) => + { + var newLines = File.ReadAllLines(e.FullPath); + list.Edit(inner => inner.AddRange(newLines)); + }; + watcher.EnableRaisingEvents = true; + + return Disposable.Create(() => watcher.Dispose()); + }); + +// Async with cancellation +IObservable> entries = ObservableChangeSet.Create( + async (list, cancellationToken) => + { + var stream = _client.GetLogStreamAsync(cancellationToken); + await foreach (var entry in stream.WithCancellation(cancellationToken)) + { + list.Add(entry); + } + + return Disposable.Empty; + }); +``` + +**Key behaviors:** +- A new `SourceList` is created **per subscriber** (cold observable) +- No key selector needed (lists are unkeyed) +- Same overload set as the cache version (sync, async, cancellable) +- On unsubscribe, cleanup runs and the list is disposed + +## List Changesets — The Core Data Model + +A list changeset (`IChangeSet`) is an `IEnumerable>`. Each change has a different structure than cache changes. + +### Change + +A list change is either an **item change** (single item) or a **range change** (batch): + +```csharp +public sealed class Change +{ + public ListChangeReason Reason { get; } // Add, AddRange, Replace, Remove, etc. + public ItemChange Item { get; } // for single-item changes + public RangeChange Range { get; } // for range changes (AddRange, RemoveRange, Clear) +} + +public struct ItemChange +{ + public T Current { get; } // the current item + public Optional Previous { get; } // previous item (Replace only) + public int CurrentIndex { get; } // current position (-1 if unknown) + public int PreviousIndex { get; } // previous position (Move, Replace) +} +``` + +### ListChangeReason + +| Reason | Type | Meaning | +|--------|------|---------| +| `Add` | Item | Single item inserted at a position | +| `AddRange` | Range | Multiple items inserted | +| `Replace` | Item | Item at a position replaced with a new item (`Previous` available) | +| `Remove` | Item | Single item removed from a position | +| `RemoveRange` | Range | Multiple items removed | +| `Moved` | Item | Item moved from one position to another | +| `Refresh` | Item | Signal to re-evaluate (no data change) | +| `Clear` | Range | All items removed | + +**Key difference from Cache:** List changes are **index-aware**. `Add` has a `CurrentIndex`, `Move` has both `CurrentIndex` and `PreviousIndex`, `Remove` has the index where the item was. + +### ChangeAwareList — How List Operators Build Changesets + +`ChangeAwareList` is the list equivalent of `ChangeAwareCache`. It's a `List` that records every mutation. + +```csharp +var list = new ChangeAwareList(); + +// Mutations are recorded +list.Add(item); // records Add +list.Insert(0, item); // records Add at index 0 +list.AddRange(items); // records AddRange +list[2] = newItem; // records Replace +list.Remove(item); // records Remove +list.RemoveRange(0, 5); // records RemoveRange +list.Move(1, 3); // records Moved +list.Clear(); // records Clear + +// Harvest the changeset +var changes = list.CaptureChanges(); +if (changes.Count > 0) + observer.OnNext(changes); +``` + +## Operator Reference — Change Reason Handling + +--- + +### Filter (static predicate) + +Evaluates a `Func` predicate per item. + +| Input | Behavior | +|-------|----------| +| **Add** | If matches → Add at calculated index. If not → nothing. | +| **AddRange** | Filters range, emits AddRange of matching items. | +| **Replace** | Re-evaluates. New matches + old didn't → Add. Both match → Replace. Old matched + new doesn't → Remove. | +| **Remove** | If was downstream → Remove. | +| **RemoveRange** | Removes matching items from downstream. | +| **Refresh** | Re-evaluates predicate. Adds/removes as needed. | +| **Clear** | → Clear. | +| **Moved** | If item is downstream → recalculates downstream index and emits Move. | + +### Filter (dynamic predicate observable) + +Same as static filter per-item, but when predicate observable fires, **all items** are re-evaluated. + +### FilterOnObservable + +Per-item `IObservable` controlling inclusion. Same as cache version but index-aware. + +--- + +### Transform + +Applies `Func` to produce a parallel list of transformed items. + +| Input | Behavior | +|-------|----------| +| **Add** | Calls factory → Add at same index. | +| **AddRange** | Calls factory for each → AddRange. | +| **Replace** | Calls factory on new item → Replace. | +| **Remove** | → Remove at same index (no factory call). | +| **RemoveRange** | → RemoveRange. | +| **Refresh** | Re-evaluates transform → Replace (or Refresh if same reference). | +| **Clear** | → Clear. | +| **Moved** | → Moved (same transformed item, new positions). | + +### TransformAsync + +Async version — `Func>`. + +### TransformMany + +Flattens 1:N — each source item produces multiple destination items. + +| Input | Behavior | +|-------|----------| +| **Add** | Expands into N items → AddRange. | +| **Replace** | Diff old children vs new children. Remove old, Add new. | +| **Remove** | → RemoveRange of all children. | +| **Clear** | → Clear. | + +--- + +### Sort + +Sorts items using `IComparer`. Maintains a sorted `ChangeAwareList`. + +| Input | Behavior | +|-------|----------| +| **Add** | Inserts at sorted position → Add with index. | +| **AddRange** | Inserts each at sorted position (or full reset if over threshold). | +| **Replace** | Removes old, inserts new at sorted position → Remove + Add (or Move). | +| **Remove** | → Remove at sorted index. | +| **Clear** | → Clear. | +| **Refresh** | Re-evaluates sort position. If position changed → Move. | +| **Comparer fires** | Full re-sort. Emits Moves/Adds/Removes as needed. | +| **Resort signal** | Reorders in-place using current comparer. | + +--- + +### Or / And / Except / Xor (Set Operations) + +Combine multiple list changeset streams using set logic with reference counting. + +| Operator | Inclusion rule | +|----------|---------------| +| `Or` | Item in **any** source (union) | +| `And` | Item in **all** sources (intersection) | +| `Except` | Item in first but **not** others (difference) | +| `Xor` | Item in exactly **one** source (symmetric difference) | + +For each operator, Add/Remove from any source updates the reference counts and the downstream list is recalculated. + +### MergeChangeSets + +Merges N list changeset streams into one. All changes are forwarded in order. + +All changes from any source are forwarded directly to the merged output stream in the order they arrive. + +--- + +### MergeMany + +Subscribes to per-item observables, merges into single `IObservable`. + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to per-item observable. | +| **Replace** | Disposes old, subscribes to new. | +| **Remove** | Disposes subscription. | +| **Clear** | Disposes all subscriptions. | + +### MergeManyChangeSets (list → list) + +Each item produces `IObservable>`. All flattened into one stream. + +### MergeManyChangeSets (list → cache) + +Each item produces `IObservable>`. Flattened into one keyed stream. + +### SubscribeMany + +Creates `IDisposable` per item. Disposes on removal/replacement. + +| Input | Behavior | +|-------|----------| +| **Add** | Creates subscription → passes through. | +| **Replace** | Disposes old subscription, creates new → passes through. | +| **Remove** | Disposes → passes through. | +| **Clear** | Disposes all → passes through. | + +--- + +### AutoRefresh + +Monitors `INotifyPropertyChanged` and emits Refresh when specified property changes. + +| Input | Behavior | +|-------|----------| +| **Add** | Subscribes to PropertyChanged → passes through. | +| **Replace** | Re-subscribes → passes through. | +| **Remove** | Disposes subscription → passes through. | +| **Property fires** | Emits Refresh changeset for that item. | + +### AutoRefreshOnObservable + +Per-item `IObservable` triggers Refresh. Same pattern as AutoRefresh. + +### SuppressRefresh + +Strips Refresh changes from the stream. + +--- + +### Page + +Applies page number + page size windowing. + +| Input | Behavior | +|-------|----------| +| **Any change** | Recalculates page window. Items entering page → Add. Leaving page → Remove. | +| **Page request fires** | Full recalculation of page contents. | + +### Virtualise + +Start index + size sliding window. + +Same as Page but uses absolute start index + size instead of page number. + +### Top + +Takes first N items. + +--- + +### Bind + +Materializes list changes into an `ObservableCollectionExtended` or `ReadOnlyObservableCollection`. + +| Input | Behavior | +|-------|----------| +| **Add** | Insert into bound collection at index. | +| **AddRange** | InsertRange or Reset (based on threshold). | +| **Replace** | Replace at index. | +| **Remove** | Remove at index. | +| **RemoveRange** | RemoveRange. | +| **Moved** | Move in collection. | +| **Clear** | Clear collection. | + +### Clone + +Applies changes to any `IList`. Lower-level than Bind. + +--- + +### GroupOn + +Groups by a key selector. Emits `IChangeSet>`. + +| Input | Behavior | +|-------|----------| +| **Add** | Determines group → adds to that group. New group → Add(group). | +| **Replace** | If group changed → move between groups. | +| **Remove** | Removes from group. Empty group → Remove(group). | +| **Clear** | Clears all groups. | + +### GroupWithImmutableState + +Same logic, but emits immutable snapshots. + +--- + +### DistinctValues + +Tracks distinct values of a property with reference counting. + +| Input | Behavior | +|-------|----------| +| **Add** | If value first seen → Add. Otherwise count++. | +| **Replace** | If value changed → old count--, new count++. | +| **Remove** | Count--. If reaches 0 → Remove distinct value. | +| **Clear** | Removes all distinct values. | + +### QueryWhenChanged + +Emits `IReadOnlyCollection` on each change. + +### ToCollection + +Same as QueryWhenChanged — emits the full collection snapshot. + +### ToSortedCollection + +Emits a sorted `IReadOnlyCollection`. + +--- + +### DisposeMany + +Disposes items on removal/replacement. + +| Input | Behavior | +|-------|----------| +| **Add/AddRange** | Tracks items → passes through. | +| **Replace** | Disposes previous → passes through. | +| **Remove/RemoveRange** | Disposes removed items → passes through. | +| **Clear** | Disposes all → passes through. | +| **Subscription disposed** | Disposes all tracked items. | + +### OnItemAdded / OnItemRemoved / OnItemRefreshed + +Side-effect callbacks for specific lifecycle events. + +| Operator | Fires on | +|----------|----------| +| `OnItemAdded` | Add, AddRange | +| `OnItemRemoved` | Remove, RemoveRange, Clear. Also fires for **all items** on disposal. | +| `OnItemRefreshed` | Refresh | + +### ForEachChange / ForEachItemChange + +Side effect per change. `ForEachChange` sees range changes too; `ForEachItemChange` only item-level. + +--- + +### BufferIf + +Buffers changes while condition is true, flushes when false. + +| Input | Behavior | +|-------|----------| +| **Any (while paused)** | Accumulated into buffer. | +| **Condition → false** | Flushes all buffered changes. | +| **Any (while active)** | Passes through immediately. | + +### BufferInitial + +Buffers the initial burst for a time window. + +--- + +### ExpireAfter + +Auto-removes items after a timeout. + +| Input | Behavior | +|-------|----------| +| **Add** | Schedules removal → passes through. | +| **Replace** | Resets timer → passes through. | +| **Remove** | Cancels timer → passes through. | +| **Timer fires** | Emits Remove. | + +### LimitSizeTo + +FIFO eviction when list exceeds size limit. + +--- + +### Conversion & Utilities + +```csharp +list.Connect() + .PopulateInto(targetList) // pipe changes into another SourceList + .AsObservableList() // materialize as read-only IObservableList + .DeferUntilLoaded() // suppress until first non-empty changeset + .SkipInitial() // skip the initial snapshot + .NotEmpty() // filter out empty changesets + .StartWithEmpty() // emit empty changeset on subscribe + .RefCount() // share subscription with ref counting + .Switch() // IObservable>> → latest + .Cast() // cast items + .RemoveIndex() // strip index information + .Reverse() // reverse the collection order + .WhereReasonsAre(reasons) // only pass specific change reasons + .WhereReasonsAreNot(reasons) // exclude specific change reasons + .FlattenBufferResult() // flatten IChangeSet> to IChangeSet +``` + +### ToObservableChangeSet + +Converts a regular `IObservable` or `IObservable>` into a list changeset stream. This is the **bridge from standard Rx into DynamicData**. + +```csharp +// From a regular observable — each emission becomes an Add +myObservable.ToObservableChangeSet() + +// With size limit +myObservable.ToObservableChangeSet(limitSizeTo: 100) + +// With expiration +myObservable.ToObservableChangeSet(expireAfter: item => TimeSpan.FromMinutes(5)) +``` + +--- + +### Property Observation + +```csharp +// Observe a property on all items (requires INotifyPropertyChanged) +list.Connect() + .WhenValueChanged(p => p.Age) + .Subscribe(age => ...); + +// Observe any property change +list.Connect() + .WhenAnyPropertyChanged() + .Subscribe(item => ...); +``` + +--- + +## Converting Between List and Cache + +```csharp +// List → Cache: add a key +list.Connect() + .Transform(item => item) // optional + .AddKey(item => item.Id) // IChangeSet → IChangeSet + // or use: + .ToObservableChangeSet(item => item.Id) + +// Cache → List: remove the key +cache.Connect() + .RemoveKey() // IChangeSet → IChangeSet +``` + +--- + +## Writing a New List Operator + +Same two-part pattern as cache operators: + +1. **Extension method** in `ObservableListEx.cs` +2. **Internal class** in `List/Internal/` with `Run()` method + +Use `ChangeAwareList` instead of `ChangeAwareCache`: + +```csharp +internal sealed class MyListOperator(IObservable> source) + where T : notnull +{ + public IObservable> Run() => + Observable.Create>(observer => + { + var list = new ChangeAwareList(); + + return source.SubscribeSafe(Observer.Create>( + onNext: changes => + { + foreach (var change in changes) + { + switch (change.Reason) + { + case ListChangeReason.Add: + var item = change.Item; + if (ShouldInclude(item.Current)) + list.Add(item.Current); + break; + case ListChangeReason.AddRange: + list.AddRange(change.Range.Where(ShouldInclude)); + break; + case ListChangeReason.Replace: + // handle replacement + break; + case ListChangeReason.Remove: + list.Remove(change.Item.Current); + break; + case ListChangeReason.RemoveRange: + case ListChangeReason.Clear: + // handle range removal + break; + case ListChangeReason.Moved: + // handle move + break; + case ListChangeReason.Refresh: + // handle refresh + break; + } + } + + var output = list.CaptureChanges(); + if (output.Count > 0) + observer.OnNext(output); + }, + onError: observer.OnError, + onCompleted: observer.OnCompleted)); + }); +} +``` + +### Checklist + +1. Handle **all eight change reasons**: Add, AddRange, Replace, Remove, RemoveRange, Moved, Refresh, Clear +2. Use `ChangeAwareList` for state management +3. Pay attention to **index positions** — list changes are index-aware +4. Never emit empty changesets +5. Propagate `OnError` and `OnCompleted` +6. Multiple sources → serialize with `Synchronize(gate)` +7. Write tests (see Testing section in main instructions) \ No newline at end of file diff --git a/.github/instructions/rx.instructions.md b/.github/instructions/rx.instructions.md new file mode 100644 index 00000000..d74a5466 --- /dev/null +++ b/.github/instructions/rx.instructions.md @@ -0,0 +1,533 @@ +--- +applyTo: "**/*.cs" +--- +# Reactive Extensions (Rx) — Comprehensive Guide + +Reference: [ReactiveX Observable Contract](http://reactivex.io/documentation/contract.html) | [Rx.NET GitHub](https://github.com/dotnet/reactive) | [IntroToRx.com](http://introtorx.com/) + +## Core Concepts + +### Observables are Composable + +Rx's power comes from composition. Every operator returns a new `IObservable`, enabling fluent chaining: + +```csharp +source + .Where(x => x.IsValid) // filter + .Select(x => x.Transform()) // project + .DistinctUntilChanged() // deduplicate + .ObserveOn(RxApp.MainThreadScheduler) // marshal to UI thread + .Subscribe(x => UpdateUI(x)); // consume +``` + +Each operator in the chain is a separate subscription. Disposing the final subscription cascades disposal upstream through the entire chain. This composability is what makes Rx powerful — and what makes contract violations devastating, since a bug in any operator corrupts the entire downstream chain. + +### Hot vs Cold Observables + +**Cold**: starts producing items when subscribed to. Each subscriber gets its own sequence. Created with `Observable.Create`, `Observable.Defer`, `Observable.Return`, etc. + +```csharp +// Cold: each subscriber triggers a new HTTP call +var cold = Observable.FromAsync(() => httpClient.GetAsync(url)); +``` + +**Hot**: produces items regardless of subscribers. All subscribers share the same sequence. Examples: `Subject`, `Observable.FromEventPattern`, UI events. + +```csharp +// Hot: events fire whether or not anyone is listening +var hot = Observable.FromEventPattern(button, nameof(button.Click)); +``` + +**Converting**: `Publish()` + `Connect()` or `Publish().RefCount()` converts cold to hot (shared). + +```csharp +var shared = coldSource.Publish().RefCount(); // auto-connect on first sub, auto-disconnect on last unsub +``` + +## The Observable Contract + +### 1. Serialized Notifications (THE critical rule) + +`OnNext`, `OnError`, and `OnCompleted` calls MUST be serialized — they must never execute concurrently. This is the most commonly violated rule and causes the most insidious bugs. + +```csharp +// WRONG: two sources can call OnNext concurrently +source1.Subscribe(x => observer.OnNext(Process(x))); // thread A +source2.Subscribe(x => observer.OnNext(Process(x))); // thread B — RACE! + +// RIGHT: use Synchronize to serialize +source1.Synchronize(gate).Subscribe(observer); +source2.Synchronize(gate).Subscribe(observer); + +// RIGHT: use Merge (serializes internally) +source1.Merge(source2).Subscribe(observer); + +// RIGHT: use Subject (serializes OnNext calls via Synchronize) +var subject = new Subject(); +source1.Subscribe(subject); // Subject.OnNext is NOT thread-safe by default! +// Use Subject with Synchronize if multiple threads call OnNext +``` + +**Why it matters**: operators maintain mutable internal state (caches, dictionaries, counters). Concurrent `OnNext` calls corrupt this state silently — no exception, just wrong data. + +### 2. Terminal Notifications + +``` +Grammar: OnNext* (OnError | OnCompleted)? +``` + +- Zero or more `OnNext`, followed by at most one terminal notification +- `OnError` and `OnCompleted` are **mutually exclusive** — emit one or neither, never both +- After a terminal notification, **no further notifications** of any kind +- Operators receiving a terminal notification should release resources + +### 3. Subscription Lifecycle + +- `Subscribe` returns `IDisposable` — disposing it **unsubscribes** +- After disposal, no further notifications should be delivered +- Disposal must be **idempotent** (safe to call multiple times) and **thread-safe** +- Operators should stop producing when their subscription is disposed + +### 4. Error Handling + +- Exceptions thrown inside `OnNext` handlers propagate synchronously to the producing operator +- Use `SubscribeSafe` instead of `Subscribe` to route subscriber exceptions to `OnError`: + +```csharp +// Subscribe: exception in handler crashes the source +source.Subscribe(x => MayThrow(x)); // if MayThrow throws, exception propagates up + +// SubscribeSafe: exception in handler routes to OnError +source.SubscribeSafe(Observer.Create( + onNext: x => MayThrow(x), + onError: ex => HandleError(ex))); // MayThrow exception goes here +``` + +## Schedulers + +Schedulers control **when** and **where** work executes. They are Rx's abstraction over threading. + +### Common Schedulers + +| Scheduler | Use | Thread | +|-----------|-----|--------| +| `Scheduler.Default` | CPU-bound work | ThreadPool | +| `Scheduler.CurrentThread` | Trampoline (queue on current thread) | Current | +| `Scheduler.Immediate` | Execute synchronously, inline | Current | +| `TaskPoolScheduler.Default` | Task-based ThreadPool | ThreadPool | +| `NewThreadScheduler.Default` | Dedicated new thread per operation | New thread | +| `EventLoopScheduler` | Single dedicated thread (event loop) | Dedicated | +| `TestScheduler` | Deterministic virtual time (testing) | Test thread | + +### Using Schedulers + +```csharp +// Time-based operators accept an optional scheduler +Observable.Timer(TimeSpan.FromSeconds(1), scheduler) +Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler) +source.Delay(TimeSpan.FromMilliseconds(500), scheduler) +source.Throttle(TimeSpan.FromMilliseconds(300), scheduler) +source.Buffer(TimeSpan.FromSeconds(1), scheduler) +source.Timeout(TimeSpan.FromSeconds(5), scheduler) +source.Sample(TimeSpan.FromMilliseconds(100), scheduler) + +// ObserveOn: deliver notifications on a specific scheduler +source.ObserveOn(RxApp.MainThreadScheduler) // marshal to UI thread + +// SubscribeOn: subscribe (and produce) on a specific scheduler +source.SubscribeOn(TaskPoolScheduler.Default) // subscribe on background thread +``` + +### Scheduler Injection for Testability + +**Always inject schedulers** instead of using defaults. This enables deterministic testing: + +```csharp +// WRONG: hardcoded scheduler — untestable time-dependent behavior +public IObservable GetData() => + _source.Throttle(TimeSpan.FromMilliseconds(300)); + +// RIGHT: injectable scheduler — testable +public IObservable GetData(IScheduler? scheduler = null) => + _source.Throttle(TimeSpan.FromMilliseconds(300), scheduler ?? Scheduler.Default); + +// TEST: use TestScheduler for deterministic time control +var testScheduler = new TestScheduler(); +var results = new List(); +GetData(testScheduler).Subscribe(results.Add); +testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(300).Ticks); +results.Should().HaveCount(1); +``` + +## Disposable Helpers + +Rx provides several `IDisposable` implementations for managing subscription lifecycles: + +### Disposable.Create + +Creates a disposable from an action. The action runs exactly once on first disposal. + +```csharp +var cleanup = Disposable.Create(() => +{ + connection.Close(); + Log("Cleaned up"); +}); +// Later: cleanup.Dispose() runs the action once +``` + +### Disposable.Empty + +A no-op disposable. Useful as a default or placeholder. + +```csharp +public IDisposable Subscribe(IObservable source) => + isEnabled ? source.Subscribe(handler) : Disposable.Empty; +``` + +### CompositeDisposable + +Collects multiple disposables and disposes them all at once. **The workhorse of Rx resource management.** + +```csharp +var cleanup = new CompositeDisposable(); + +cleanup.Add(source1.Subscribe(handler1)); +cleanup.Add(source2.Subscribe(handler2)); +cleanup.Add(Disposable.Create(() => Log("All done"))); + +// Later: disposes ALL contained disposables +cleanup.Dispose(); +``` + +Use in `Observable.Create` to manage multiple subscriptions: + +```csharp +Observable.Create(observer => +{ + var cleanup = new CompositeDisposable(); + cleanup.Add(source1.Subscribe(observer)); + cleanup.Add(source2.Subscribe(x => observer.OnNext(Transform(x)))); + cleanup.Add(Disposable.Create(() => cache.Clear())); + return cleanup; +}); +``` + +### SerialDisposable + +Holds a single disposable that can be **replaced**. Disposing the previous value when a new one is set. Useful for "switch" patterns. + +```csharp +var serial = new SerialDisposable(); + +// Each assignment disposes the previous +serial.Disposable = source1.Subscribe(handler); // subscribes to source1 +serial.Disposable = source2.Subscribe(handler); // disposes source1 sub, subscribes to source2 +serial.Disposable = Disposable.Empty; // disposes source2 sub + +// Disposing the SerialDisposable disposes the current inner +serial.Dispose(); +``` + +### SingleAssignmentDisposable + +Like SerialDisposable but can only be assigned **once**. Throws on second assignment. Useful when a subscription is created asynchronously but disposal might happen before it's ready. + +```csharp +var holder = new SingleAssignmentDisposable(); + +// Start async subscription +Task.Run(() => +{ + var sub = source.Subscribe(handler); + holder.Disposable = sub; // safe even if Dispose was already called +}); + +// Can dispose before assignment — the subscription will be disposed when assigned +holder.Dispose(); +``` + +### RefCountDisposable + +Tracks multiple "dependent" disposables. The underlying resource is only disposed when **all** dependents (plus the primary) are disposed. + +```csharp +var primary = new RefCountDisposable(expensiveResource); + +var dep1 = primary.GetDisposable(); // increment ref count +var dep2 = primary.GetDisposable(); // increment ref count + +dep1.Dispose(); // decrement — resource still alive +primary.Dispose(); // decrement — resource still alive (dep2 still holds) +dep2.Dispose(); // decrement to 0 — resource disposed! +``` + +### BooleanDisposable / CancellationDisposable + +```csharp +// BooleanDisposable: check if disposed +var bd = new BooleanDisposable(); +bd.IsDisposed; // false +bd.Dispose(); +bd.IsDisposed; // true — useful for cancellation checks + +// CancellationDisposable: bridges IDisposable and CancellationToken +var cd = new CancellationDisposable(); +cd.Token; // CancellationToken that cancels on Dispose +cd.Dispose(); // triggers cancellation +``` + +## Standard Rx Operators Reference + +### Creation + +| Operator | Description | +|----------|-------------| +| `Observable.Return(value)` | Emit one value, then complete | +| `Observable.Empty()` | Complete immediately with no values | +| `Observable.Never()` | Never emit, never complete | +| `Observable.Throw(ex)` | Emit error immediately | +| `Observable.Create(subscribe)` | Build a custom observable from a subscribe function | +| `Observable.Defer(factory)` | Defer observable creation until subscription | +| `Observable.Range(start, count)` | Emit a range of integers | +| `Observable.Generate(init, cond, iter, result)` | Iterative generation | +| `Observable.Timer(dueTime)` | Emit one value after a delay | +| `Observable.Interval(period)` | Emit incrementing long values at regular intervals | +| `Observable.FromAsync(asyncFactory)` | Wrap an async method as an observable | +| `Observable.FromEventPattern(add, remove)` | Convert .NET events to observables | +| `Observable.Start(func)` | Run a function asynchronously, emit result | +| `Observable.Using(resourceFactory, obsFactory)` | Create a resource with the subscription, dispose with it | + +### Transformation + +| Operator | Description | +|----------|-------------| +| `Select(selector)` | Project each item (aka Map) | +| `SelectMany(selector)` | Project and flatten (aka FlatMap) | +| `Scan(accumulator)` | Running aggregate (like Aggregate but emits each step) | +| `Buffer(count)` / `Buffer(timeSpan)` | Collect items into batches | +| `Window(count)` / `Window(timeSpan)` | Split into sub-observables | +| `GroupBy(keySelector)` | Group items by key into sub-observables | +| `Cast()` | Cast items to a type | +| `OfType()` | Filter and cast to a type | +| `Materialize()` | Wrap each notification as a `Notification` value | +| `Dematerialize()` | Unwrap `Notification` values back to notifications | +| `Timestamp()` | Attach timestamp to each item | +| `TimeInterval()` | Attach time interval since previous item | + +### Filtering + +| Operator | Description | +|----------|-------------| +| `Where(predicate)` | Filter items by predicate | +| `Distinct()` | Remove duplicates (all-time) | +| `DistinctUntilChanged()` | Remove consecutive duplicates | +| `Take(count)` | Take first N items, then complete | +| `TakeLast(count)` | Take last N items (buffers until complete) | +| `TakeWhile(predicate)` | Take while predicate is true | +| `TakeUntil(other)` | Take until another observable emits | +| `Skip(count)` | Skip first N items | +| `SkipLast(count)` | Skip last N items | +| `SkipWhile(predicate)` | Skip while predicate is true | +| `SkipUntil(other)` | Skip until another observable emits | +| `First()` / `FirstOrDefault()` | First item (or default), then complete | +| `Last()` / `LastOrDefault()` | Last item (or default), then complete | +| `Single()` / `SingleOrDefault()` | Exactly one item, error if more/less | +| `ElementAt(index)` | Item at specific index | +| `IgnoreElements()` | Suppress all values, pass through error/completed | +| `Throttle(timeSpan)` | Suppress items followed by another within timespan | +| `Debounce(timeSpan)` | Alias for Throttle | +| `Sample(timeSpan)` | Emit most recent value at regular intervals | + +### Combining + +| Operator | Description | +|----------|-------------| +| `Merge(other)` | Merge multiple streams into one (interleaved) | +| `Concat(other)` | Append one stream after another completes | +| `Switch()` | Subscribe to latest inner observable, unsubscribe previous | +| `Amb(other)` | Take whichever stream emits first, ignore the other | +| `Zip(other, selector)` | Pair items 1:1 from two streams | +| `CombineLatest(other, selector)` | Combine latest values whenever either emits | +| `WithLatestFrom(other, selector)` | Combine with latest from other (only when source emits) | +| `StartWith(values)` | Prepend values before the source | +| `Append(value)` | Append a value after the source completes | +| `Publish()` | Convert cold to hot via multicast (returns `IConnectableObservable`) | +| `Publish().RefCount()` | Auto-connect on first subscriber, auto-disconnect on last | +| `Replay(bufferSize)` | Multicast with replay buffer | + +### Aggregation + +| Operator | Description | +|----------|-------------| +| `Aggregate(accumulator)` | Final aggregate (emits one value on complete) | +| `Count()` | Count of items (on complete) | +| `Sum()` / `Min()` / `Max()` / `Average()` | Numeric aggregates (on complete) | +| `ToArray()` | Collect all items into array (on complete) | +| `ToList()` | Collect all items into list (on complete) | +| `ToDictionary(keySelector)` | Collect into dictionary (on complete) | + +### Error Handling + +| Operator | Description | +|----------|-------------| +| `Catch(handler)` | Handle error by switching to another observable | +| `Catch(handler)` | Handle specific exception type | +| `Retry()` / `Retry(count)` | Resubscribe on error | +| `OnErrorResumeNext(other)` | Continue with another observable on error or complete | +| `Finally(action)` | Run action on dispose, error, or complete | +| `Do(onNext, onError, onCompleted)` | Side effects without affecting the stream | +| `DoFinally(action)` | Side effect on termination (like Finally but for observation) | + +### Scheduling & Threading + +| Operator | Description | +|----------|-------------| +| `ObserveOn(scheduler)` | Deliver notifications on specified scheduler | +| `SubscribeOn(scheduler)` | Subscribe (and produce) on specified scheduler | +| `Delay(timeSpan)` | Delay each notification by a time span | +| `Timeout(timeSpan)` | Error if no notification within timeout | +| `Synchronize()` | Serialize notifications with internal gate | +| `Synchronize(gate)` | Serialize notifications with external gate object | + +### Utility + +| Operator | Description | +|----------|-------------| +| `Do(action)` | Perform side effect for each notification | +| `Publish().RefCount()` | Share a subscription among multiple subscribers | +| `Replay(bufferSize).RefCount()` | Share with replay | +| `AsObservable()` | Hide the implementation type (e.g., Subject → IObservable) | +| `Subscribe(observer)` | Subscribe with an IObserver | +| `Subscribe(onNext, onError, onCompleted)` | Subscribe with callbacks | +| `SubscribeSafe(observer)` | Subscribe with exception routing to OnError | +| `ForEachAsync(action)` | Async iteration (returns Task) | +| `Wait()` | Block until complete (avoid on UI thread) | +| `ToTask()` | Convert to Task (last value) | + +## Writing Custom Operators + +### The Observable.Create Pattern + +```csharp +public static IObservable MyOperator( + this IObservable source, + Func selector) +{ + return Observable.Create(observer => + { + return source.SubscribeSafe(Observer.Create( + onNext: item => + { + try + { + var result = selector(item); + observer.OnNext(result); + } + catch (Exception ex) + { + observer.OnError(ex); + } + }, + onError: observer.OnError, + onCompleted: observer.OnCompleted)); + }); +} +``` + +### Multi-Source Operator Pattern + +When combining multiple sources, serialize their notifications: + +```csharp +public static IObservable MyMerge( + this IObservable source1, + IObservable source2) +{ + return Observable.Create(observer => + { + var gate = new object(); + + var sub1 = source1.Synchronize(gate).SubscribeSafe(observer); + var sub2 = source2.Synchronize(gate).SubscribeSafe(observer); + + return new CompositeDisposable(sub1, sub2); + }); +} +``` + +**Note**: `Synchronize(gate)` holds the lock during downstream `OnNext` delivery. This ensures serialization but means the lock is held for the duration of all downstream processing. Keep downstream chains lightweight when using shared gates. + +### Operator Checklist + +When writing or reviewing an Rx operator: + +- [ ] **Serialized delivery**: can `OnNext` be called concurrently? If multiple sources, are they serialized? +- [ ] **Terminal semantics**: does `OnError`/`OnCompleted` propagate correctly? No notifications after terminal? +- [ ] **Disposal**: does disposing the subscription clean up all resources? Is it idempotent? +- [ ] **Error handling**: does `SubscribeSafe` catch subscriber exceptions? Are errors propagated, not swallowed? +- [ ] **Back-pressure**: does the operator buffer unboundedly? Could it cause memory issues? +- [ ] **Scheduler**: are time-dependent operations using an injectable scheduler? +- [ ] **Cold/Hot**: is the observable cold (deferred via `Observable.Create`)? If hot, is sharing handled correctly? +- [ ] **Thread safety**: is mutable state protected? Are there race conditions between subscribe/dispose/OnNext? + +## Common Pitfalls + +### 1. Subscribing Multiple Times to a Cold Observable + +```csharp +// WRONG: two HTTP calls! +var data = Observable.FromAsync(() => httpClient.GetAsync(url)); +data.Subscribe(handler1); // call 1 +data.Subscribe(handler2); // call 2 — probably not intended + +// RIGHT: share the result +var shared = data.Publish().RefCount(); +shared.Subscribe(handler1); // shares +shared.Subscribe(handler2); // same result +``` + +### 2. Forgetting to Dispose Subscriptions + +```csharp +// WRONG: subscription leaks — handler keeps running forever +source.Subscribe(x => UpdateUI(x)); + +// RIGHT: track and dispose +_cleanup.Add(source.Subscribe(x => UpdateUI(x))); +// In Dispose: _cleanup.Dispose(); +``` + +### 3. Blocking on Rx (sync-over-async) + +```csharp +// WRONG: blocks the thread, can hang on UI thread +var result = source.FirstAsync().Wait(); + +// RIGHT: use async/await +var result = await source.FirstAsync(); +``` + +### 4. Using Subject as a Public API + +```csharp +// WRONG: exposes mutation to consumers +public Subject Values { get; } = new(); + +// RIGHT: expose as IObservable, keep Subject private +private readonly Subject _values = new(); +public IObservable Values => _values.AsObservable(); +``` + +### 5. Not Handling OnError + +```csharp +// WRONG: unhandled OnError crashes the app (routes to DefaultExceptionHandler) +source.Subscribe(x => Process(x)); + +// RIGHT: always handle errors +source.Subscribe( + x => Process(x), + ex => LogError(ex), + () => LogComplete()); +``` diff --git a/.github/instructions/testing-cache.instructions.md b/.github/instructions/testing-cache.instructions.md new file mode 100644 index 00000000..5b4b9ae5 --- /dev/null +++ b/.github/instructions/testing-cache.instructions.md @@ -0,0 +1,227 @@ +--- +applyTo: "src/DynamicData.Tests/Cache/**/*.cs" +--- +# Testing Cache Operators + +This covers testing patterns specific to **cache** (`IChangeSet`) operators. For general testing philosophy and requirements, see the main `copilot-instructions.md`. + +## Observation Patterns + +Cache tests use two distinct patterns for capturing pipeline output. **Know both — use the right one.** + +### Pattern 1: ChangeSetAggregator (legacy, still widely used) + +`AsAggregator()` materializes the stream into a `ChangeSetAggregator` that captures every changeset. Defined in the library assembly under `Cache/Tests/`. + +```csharp +using var source = new SourceCache(p => p.Key); +using var results = source.Connect() + .Filter(p => p.Age >= 18) + .AsAggregator(); + +source.AddOrUpdate(new Person("Adult", 25)); +source.AddOrUpdate(new Person("Child", 10)); + +results.Data.Count.Should().Be(1); +results.Messages.Count.Should().Be(1, "child was filtered, only 1 changeset emitted"); +results.Messages[0].Adds.Should().Be(1); +results.Data.Items[0].Name.Should().Be("Adult"); +``` + +**ChangeSetAggregator properties:** +- `Data` — `IObservableCache` materialized view of current state +- `Messages` — `IList>` every changeset received +- `Summary` — `ChangeSummary` aggregated statistics +- `Error` — captured `OnError` exception (if any) +- `IsCompleted` — whether `OnCompleted` was received + +**Specialized aggregator variants:** +- **`SortedChangeSetAggregator`** — for `.Sort()`, exposes `Messages[i].SortedItems` +- **`PagedChangeSetAggregator`** — for `.Page()` +- **`VirtualChangeSetAggregator`** — for `.Virtualise()` +- **`GroupChangeSetAggregator`** — for `.Group()` +- **`DistinctChangeSetAggregator`** — for `.DistinctValues()` + +### Pattern 2: RecordCacheItems (modern, preferred for new tests) + +`RecordCacheItems` creates a `CacheItemRecordingObserver` with keyed + sorted index tracking. Pair it with `.ValidateSynchronization()` and `.ValidateChangeSets()`. + +```csharp +using var source = new TestSourceCache(Item.SelectId); + +using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() // detects concurrent OnNext (Rx violation!) + .ValidateChangeSets(Item.SelectId) // validates changeset structural integrity + .RecordCacheItems(out var results); + +source.AddOrUpdate(new Item(1) { IsIncluded = true }); +source.AddOrUpdate(new Item(2) { IsIncluded = false }); + +results.RecordedItemsByKey.Should().ContainKey(1); +results.RecordedItemsByKey.Should().NotContainKey(2); +results.RecordedChangeSets.Should().HaveCount(1); +results.Error.Should().BeNull(); +results.HasCompleted.Should().BeFalse(); +``` + +**CacheItemRecordingObserver properties:** +- `RecordedItemsByKey` — `IReadOnlyDictionary` current items by key +- `RecordedItemsSorted` — `IReadOnlyList` items with sorted index tracking +- `RecordedChangeSets` — `IReadOnlyList>` all changesets +- `Error` — captured exception +- `HasCompleted` — completion flag + +**When to use which:** +- **New tests**: Prefer `RecordCacheItems` + `ValidateSynchronization` + `ValidateChangeSets`. +- **Existing tests**: Don't refactor from `AsAggregator` unless asked to do so. +- **Sort/Page/Virtual tests**: The specialized aggregators have no `RecordCacheItems` equivalent yet — use them. + +## Asserting Cache Changeset Contents + +Cache changesets carry rich metadata — **use it** for precise assertions: + +```csharp +// Assert changeset structure (counts by reason) +results.Messages[0].Adds.Should().Be(5); +results.Messages[0].Updates.Should().Be(0); +results.Messages[0].Removes.Should().Be(0); +results.Messages[0].Refreshes.Should().Be(0); + +// Assert individual changes +var change = results.Messages[0].First(); +change.Reason.Should().Be(ChangeReason.Add); +change.Key.Should().Be("key1"); +change.Current.Should().Be(expectedItem); + +// For updates, Previous is populated +var update = results.Messages[1].First(); +update.Reason.Should().Be(ChangeReason.Update); +update.Previous.HasValue.Should().BeTrue(); +update.Previous.Value.Should().Be(previousItem); + +// Assert materialized cache state +results.Data.Count.Should().Be(5); +results.Data.Items.Should().BeEquivalentTo(expectedItems); +results.Data.Lookup("key1").HasValue.Should().BeTrue(); +``` + +## Testing Completion and Error Propagation + +Use `TestSourceCache` to inject terminal events: + +```csharp +[Theory] +[InlineData(CompletionStrategy.Asynchronous)] // complete after subscription +[InlineData(CompletionStrategy.Immediate)] // complete before subscription +public void SourceCompletes_CompletionPropagates(CompletionStrategy completionStrategy) +{ + using var source = new TestSourceCache(Item.SelectId); + + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect() + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.HasCompleted.Should().BeTrue(); + results.Error.Should().BeNull(); +} + +[Fact] +public void SourceErrors_ErrorPropagates() +{ + using var source = new TestSourceCache(Item.SelectId); + var testError = new Exception("Test error"); + + using var subscription = source.Connect() + .Transform(x => new ViewModel(x)) + .RecordCacheItems(out var results); + + source.SetError(testError); + results.Error.Should().BeSameAs(testError); +} +``` + +## The Stub/Fixture Pattern + +Many cache tests use an inner helper class that sets up source, pipeline, and aggregator: + +```csharp +private sealed class TransformStub : IDisposable +{ + public SourceCache Source { get; } = new(p => p.Key); + public Func TransformFactory { get; } + = p => new PersonWithGender(p, p.Gender == "M" ? "Male" : "Female"); + public ChangeSetAggregator Results { get; } + + public TransformStub(IObservable? forceTransform = null) + { + Results = Source.Connect() + .Transform(TransformFactory, forceTransform: forceTransform) + .AsAggregator(); + } + + public void Dispose() + { + Source.Dispose(); + Results.Dispose(); + } +} +``` + +This keeps individual `[Fact]` methods short and focused on the scenario under test. + +## Cache Stress Tests + +Multi-threaded stress tests prove cache operators are thread-safe: + +```csharp +[Fact] +public async Task MultiThreadedStressTest() +{ + const int writerThreads = 8; + const int itemsPerThread = 500; + + using var source = new SourceCache(p => p.Key); + using var results = source.Connect() + .Filter(p => p.Age >= 18) + .Sort(SortExpressionComparer.Ascending(p => p.Name)) + .AsAggregator(); + + using var barrier = new Barrier(writerThreads + 1); + + var tasks = Enumerable.Range(0, writerThreads).Select(threadId => Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < itemsPerThread; i++) + source.AddOrUpdate(new Person($"Thread{threadId}_Item{i}", 20 + i)); + })).ToArray(); + + barrier.SignalAndWait(); + await Task.WhenAll(tasks); + + results.Data.Count.Should().Be(writerThreads * itemsPerThread); +} +``` + +## What Every Cache Operator Test Must Cover + +1. **Single item**: Add, Update, Remove, Refresh individually +2. **Batch**: Multiple items in a single `Edit()` call +3. **Empty changeset**: Operator doesn't emit empty changesets +4. **Error propagation**: Source `OnError` propagates +5. **Completion propagation**: Source `OnCompleted` propagates +6. **Disposal**: Disposing unsubscribes from all sources +7. **Edge cases**: Duplicate keys, boundary values + +For operators with dynamic parameters: + +8. **Parameter changes**: Predicate/comparer change re-evaluates correctly +9. **Parameter completion**: What happens when parameter observable completes +10. **Parameter error**: What happens when parameter observable errors \ No newline at end of file diff --git a/.github/instructions/testing-list.instructions.md b/.github/instructions/testing-list.instructions.md new file mode 100644 index 00000000..88eefcc2 --- /dev/null +++ b/.github/instructions/testing-list.instructions.md @@ -0,0 +1,185 @@ +--- +applyTo: "src/DynamicData.Tests/List/**/*.cs" +--- +# Testing List Operators + +This covers testing patterns specific to **list** (`IChangeSet`) operators. For general testing philosophy and requirements, see the main `copilot-instructions.md`. + +## Observation Patterns + +### ChangeSetAggregator (primary pattern) + +The list `ChangeSetAggregator` (single type parameter, no key) captures every list changeset. Defined in `List/Tests/`. + +```csharp +using var source = new SourceList(); +using var results = new ChangeSetAggregator( + source.Connect().Filter(p => p.Age >= 18)); + +source.Add(new Person("Adult", 25)); +source.Add(new Person("Child", 10)); + +results.Data.Count.Should().Be(1); +results.Messages.Count.Should().Be(1); +results.Messages[0].Adds.Should().Be(1); +results.Data.Items[0].Name.Should().Be("Adult"); +``` + +**Properties:** +- `Data` — `IObservableList` materialized view +- `Messages` — `IList>` all changesets +- `Exception` — captured error (note: `Exception`, not `Error` like cache) +- `IsCompleted` — completion flag + +### RecordListItems (modern) + +`RecordListItems` creates a `ListItemRecordingObserver` — the list parallel to `RecordCacheItems`. + +```csharp +using var source = new TestSourceList(); + +using var subscription = source.Connect() + .Filter(item => item.IsIncluded) + .ValidateSynchronization() + .ValidateChangeSets() + .RecordListItems(out var results); + +source.Add(new Item { IsIncluded = true }); +source.Add(new Item { IsIncluded = false }); + +results.RecordedItems.Should().HaveCount(1); +results.Error.Should().BeNull(); +``` + +## Asserting List Changeset Contents + +List changesets differ from cache — changes can be item-level or range-level: + +```csharp +// Assert changeset structure +results.Messages[0].Adds.Should().Be(5); +results.Messages[0].Removes.Should().Be(0); +results.Messages[0].Replaced.Should().Be(0); +results.Messages[0].Refreshes.Should().Be(0); + +// Item-level change +var change = results.Messages[0].First(); +change.Reason.Should().Be(ListChangeReason.Add); +change.Item.Current.Should().Be(expectedItem); +change.Item.CurrentIndex.Should().BeGreaterOrEqualTo(0); + +// Range-level change (AddRange, RemoveRange, Clear) +var rangeChange = results.Messages[0].First(c => c.Reason == ListChangeReason.AddRange); +rangeChange.Range.Should().HaveCount(10); +rangeChange.Range.Index.Should().Be(0); + +// Replace (list equivalent of Update) +var replace = results.Messages[1].First(); +replace.Reason.Should().Be(ListChangeReason.Replace); +replace.Item.Previous.HasValue.Should().BeTrue(); +replace.Item.Previous.Value.Should().Be(oldItem); + +// Assert materialized list state +results.Data.Count.Should().Be(5); +results.Data.Items.Should().BeEquivalentTo(expectedItems); +``` + +## Key Differences from Cache Testing + +| Aspect | Cache | List | +|--------|-------|------| +| **Aggregator type** | `ChangeSetAggregator` | `ChangeSetAggregator` | +| **Data property** | `IObservableCache` (has `Lookup(key)`) | `IObservableList` (index-based) | +| **Add assertion** | `change.Key` available | `change.Item.CurrentIndex` available | +| **Update vs Replace** | `ChangeReason.Update` with `Previous` | `ListChangeReason.Replace` with `Previous` | +| **Batch add** | Each item is a separate Add change | `AddRange` is a single range change | +| **Clear** | Individual Remove per item | Single `Clear` range change | +| **Fixture setup** | `new SourceCache(keySelector)` | `new SourceList()` | +| **Connect** | `cache.Connect()` | `list.Connect()` | +| **Mutate** | `cache.Edit(u => u.AddOrUpdate(...))` | `list.Edit(l => l.Add(...))` | + +## The List Fixture Pattern + +List test fixtures follow the same pattern as cache but with `SourceList`: + +```csharp +public class TransformFixture : IDisposable +{ + private readonly ChangeSetAggregator _results; + private readonly ISourceList _source; + private readonly Func _transformFactory = + p => new PersonWithGender(p, p.Age % 2 == 0 ? "M" : "F"); + + public TransformFixture() + { + _source = new SourceList(); + _results = new ChangeSetAggregator( + _source.Connect().Transform(_transformFactory)); + } + + [Fact] + public void Add() + { + var person = new Person("Adult1", 50); + _source.Add(person); + + _results.Messages.Count.Should().Be(1); + _results.Data.Count.Should().Be(1); + _results.Data.Items[0].Should().Be(_transformFactory(person)); + } + + public void Dispose() + { + _source.Dispose(); + _results.Dispose(); + } +} +``` + +## List Stress Tests + +List stress tests are similar to cache but use `SourceList` APIs: + +```csharp +[Fact] +public async Task MultiThreadedStressTest() +{ + const int writerThreads = 8; + const int itemsPerThread = 500; + + using var source = new SourceList(); + using var results = new ChangeSetAggregator( + source.Connect().Filter(p => p.Age >= 18)); + + using var barrier = new Barrier(writerThreads + 1); + + var tasks = Enumerable.Range(0, writerThreads).Select(threadId => Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < itemsPerThread; i++) + source.Add(new Person($"Thread{threadId}_Item{i}", 20 + i)); + })).ToArray(); + + barrier.SignalAndWait(); + await Task.WhenAll(tasks); + + // List allows duplicates — total is threads × items + results.Data.Count.Should().Be(writerThreads * itemsPerThread); +} +``` + +## What Every List Operator Test Must Cover + +1. **Single item**: Add, Remove, Replace, Refresh, Move individually +2. **Range operations**: AddRange, RemoveRange, Clear +3. **Index correctness**: Verify CurrentIndex and PreviousIndex are correct +4. **Empty changeset**: Operator doesn't emit empty changesets +5. **Error propagation**: Source `OnError` propagates +6. **Completion propagation**: Source `OnCompleted` propagates +7. **Disposal**: Disposing unsubscribes from all sources +8. **Edge cases**: Empty list, single item, boundary indices + +For operators with dynamic parameters: + +9. **Parameter changes**: Re-evaluates correctly +10. **Parameter completion/error**: Proper handling \ No newline at end of file