Skip to content

Commit 60e8310

Browse files
authored
Merge branch 'main' into maint/list_ext_breakup
2 parents 9e47bc4 + 8033135 commit 60e8310

2 files changed

Lines changed: 19 additions & 7 deletions

File tree

src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Reactive.Concurrency;
66
using System.Reactive.Disposables;
77
using System.Reactive.Linq;
8+
using System.Reactive.Threading.Tasks;
89
using System.Threading.Tasks;
910
using Bogus;
1011
using DynamicData.Kernel;
@@ -90,10 +91,11 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
9091
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
9192
.Finally(market.PricesCache.Dispose);
9293

93-
var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices);
94-
using var priceResults = merged.AsAggregator();
95-
94+
var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices).Publish();
9695
var adding = true;
96+
var cacheCompleted = merged.LastOrDefaultAsync().ToTask();
97+
using var priceResults = merged.AsAggregator();
98+
using var connect = merged.Connect();
9799

98100
// Start asynchrononously modifying the parent list and the child lists
99101
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
@@ -119,6 +121,9 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
119121
}
120122
while (adding);
121123

124+
// Wait for the source cache to finish delivering all notifications.
125+
await cacheCompleted;
126+
122127
// Verify the results
123128
CheckResultContents(_marketCacheResults, priceResults);
124129
}

src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Reactive.Concurrency;
66
using System.Reactive.Disposables;
77
using System.Reactive.Linq;
8+
using System.Reactive.Threading.Tasks;
89
using System.Threading.Tasks;
910
using Bogus;
1011
using DynamicData.Kernel;
@@ -86,9 +87,11 @@ IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, int animalCount, int par
8687
.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler))
8788
.Finally(owner.Animals.Dispose);
8889

89-
var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect());
90-
90+
var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()).Publish();
9191
var addingAnimals = true;
92+
var cacheCompleted = mergeAnimals.LastOrDefaultAsync().ToTask();
93+
using var animalResults = mergeAnimals.AsAggregator();
94+
using var connect = mergeAnimals.Connect();
9295

9396
// Start asynchrononously modifying the parent list and the child lists
9497
using var addAnimals = AddRemoveAnimalsStress(ownerCount, animalCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
@@ -114,8 +117,12 @@ IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, int animalCount, int par
114117
}
115118
while (addingAnimals);
116119

117-
// Verify the results
118-
CheckResultContents();
120+
// Wait for the source cache to finish delivering all notifications.
121+
await cacheCompleted;
122+
123+
// Verify the results against the aggregator wired into the same Publish chain
124+
// that cacheCompleted observes.
125+
CheckResultContents(_animalOwners.Items, _animalOwnerResults, animalResults);
119126
}
120127

121128
[Fact]

0 commit comments

Comments
 (0)