44using System . Reactive . Linq ;
55using System . Reactive . Subjects ;
66
7+ using DynamicData . Tests . Utilities ;
8+
79using FluentAssertions ;
810using Xunit ;
911
@@ -16,9 +18,10 @@ public void ItemsAreManipulated_UnmatchedItemsAreExcludedAndIndexesAreDiscarded(
1618 {
1719 using var source = new Subject < IChangeSet < Item , int > > ( ) ;
1820
19- using var results = source
21+ using var subscription = source
2022 . FilterImmutable ( predicate : Item . Predicate )
21- . AsAggregator ( ) ;
23+ . ValidateChangeSets ( Item . KeySelector )
24+ . RecordCacheItems ( out var results ) ;
2225
2326
2427 // Add items
@@ -31,8 +34,8 @@ public void ItemsAreManipulated_UnmatchedItemsAreExcludedAndIndexesAreDiscarded(
3134 } ) ;
3235
3336 results . Error . Should ( ) . BeNull ( ) ;
34- results . Messages . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
35- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "2 items were added, with 1 excluded" ) ;
37+ results . RecordedChangeSets . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
38+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "2 items were added, with 1 excluded" ) ;
3639
3740
3841 // Replace items, changing inclusion
@@ -45,8 +48,8 @@ public void ItemsAreManipulated_UnmatchedItemsAreExcludedAndIndexesAreDiscarded(
4548 } ) ;
4649
4750 results . Error . Should ( ) . BeNull ( ) ;
48- results . Messages . Skip ( 1 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
49- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item4 } , "2 items were replaced, with 1 excluded" ) ;
51+ results . RecordedChangeSets . Skip ( 1 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
52+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item4 } , "2 items were replaced, with 1 excluded" ) ;
5053
5154
5255 // Replace items, not changing inclusion
@@ -59,8 +62,8 @@ public void ItemsAreManipulated_UnmatchedItemsAreExcludedAndIndexesAreDiscarded(
5962 } ) ;
6063
6164 results . Error . Should ( ) . BeNull ( ) ;
62- results . Messages . Skip ( 2 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
63- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item6 } , "2 items were replaced, with 1 excluded" ) ;
65+ results . RecordedChangeSets . Skip ( 2 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
66+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item6 } , "2 items were replaced, with 1 excluded" ) ;
6467
6568
6669 // Refresh items
@@ -71,8 +74,8 @@ public void ItemsAreManipulated_UnmatchedItemsAreExcludedAndIndexesAreDiscarded(
7174 } ) ;
7275
7376 results . Error . Should ( ) . BeNull ( ) ;
74- results . Messages . Skip ( 3 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
75- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item6 } , "2 items were refreshed, with 1 excluded" ) ;
77+ results . RecordedChangeSets . Skip ( 3 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
78+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item6 } , "2 items were refreshed, with 1 excluded" ) ;
7679
7780
7881 // Remove items
@@ -83,28 +86,29 @@ public void ItemsAreManipulated_UnmatchedItemsAreExcludedAndIndexesAreDiscarded(
8386 } ) ;
8487
8588 results . Error . Should ( ) . BeNull ( ) ;
86- results . Messages . Skip ( 4 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
87- results . Data . Items . Should ( ) . BeEmpty ( "2 items were removed, with one excluded" ) ;
89+ results . RecordedChangeSets . Skip ( 4 ) . Count ( ) . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
90+ results . RecordedItemsByKey . Should ( ) . BeEmpty ( "2 items were removed, with one excluded" ) ;
8891
8992
90- results . Messages . SelectMany ( static changes => changes ) . Should ( ) . AllSatisfy (
93+ results . RecordedChangeSets . SelectMany ( static changes => changes ) . Should ( ) . AllSatisfy (
9194 change =>
9295 {
9396 change . CurrentIndex . Should ( ) . Be ( - 1 ) ;
9497 change . PreviousIndex . Should ( ) . Be ( - 1 ) ;
9598 } ,
9699 because : "indexes should not be preserved" ) ;
97- results . IsCompleted . Should ( ) . BeFalse ( ) ;
100+ results . HasCompleted . Should ( ) . BeFalse ( ) ;
98101 }
99102
100103 [ Fact ]
101104 public void ItemsAreMoved_ChangesAreNotPropagated ( )
102105 {
103106 using var source = new Subject < IChangeSet < Item , int > > ( ) ;
104107
105- using var results = source
108+ using var subscription = source
106109 . FilterImmutable ( predicate : Item . Predicate )
107- . AsAggregator ( ) ;
110+ . ValidateChangeSets ( Item . KeySelector )
111+ . RecordCacheItems ( out var results ) ;
108112
109113 // Initial setup
110114 var item1 = new Item ( ) { Id = 1 , IsIncluded = true } ;
@@ -116,18 +120,18 @@ public void ItemsAreMoved_ChangesAreNotPropagated()
116120 new ( reason : ChangeReason . Add , key : item2 . Id , current : item2 , index : 1 ) ,
117121 new ( reason : ChangeReason . Add , key : item3 . Id , current : item3 , index : 2 )
118122 } ) ;
119- results . Messages . Clear ( ) ;
123+ var changeSetsBeforeMove = results . RecordedChangeSets . Count ;
120124
121125
122126 // Move items
123127 source . OnNext ( new ChangeSet < Item , int > ( )
124128 {
125129 new ( reason : ChangeReason . Moved , key : item1 . Id , current : item1 , previous : default , currentIndex : 2 , previousIndex : 0 ) ,
126- new ( reason : ChangeReason . Moved , key : item2 . Id , current : item1 , previous : default , currentIndex : 0 , previousIndex : 1 )
130+ new ( reason : ChangeReason . Moved , key : item2 . Id , current : item2 , previous : default , currentIndex : 0 , previousIndex : 1 )
127131 } ) ;
128132
129133 results . Error . Should ( ) . BeNull ( ) ;
130- results . Messages . Should ( ) . BeEmpty ( "move operations should not be propagated" ) ;
134+ results . RecordedChangeSets . Skip ( changeSetsBeforeMove ) . Should ( ) . BeEmpty ( "move operations should not be propagated" ) ;
131135 }
132136
133137 [ Fact ]
@@ -144,9 +148,10 @@ public void PredicateThrows_ExceptionIsCaptured()
144148
145149 var error = new Exception ( ) ;
146150
147- using var results = source
151+ using var subscription = source
148152 . FilterImmutable ( predicate : _ => throw error )
149- . AsAggregator ( ) ;
153+ . ValidateChangeSets ( Item . KeySelector )
154+ . RecordCacheItems ( out var results ) ;
150155
151156
152157 var item1 = new Item ( ) { Id = 1 , IsIncluded = true } ;
@@ -156,18 +161,19 @@ public void PredicateThrows_ExceptionIsCaptured()
156161 } ) ;
157162
158163 results . Error . Should ( ) . Be ( error ) ;
159- results . Messages . Should ( ) . BeEmpty ( "no source operations should have been processed" ) ;
160- results . IsCompleted . Should ( ) . BeFalse ( ) ;
164+ results . RecordedChangeSets . Should ( ) . BeEmpty ( "no source operations should have been processed" ) ;
165+ results . HasCompleted . Should ( ) . BeFalse ( ) ;
161166 }
162167
163168 [ Fact ]
164169 public void SourceCompletes_CompletionIsPropagated ( )
165170 {
166171 using var source = new Subject < IChangeSet < Item , int > > ( ) ;
167172
168- using var results = source
173+ using var subscription = source
169174 . FilterImmutable ( predicate : Item . Predicate )
170- . AsAggregator ( ) ;
175+ . ValidateChangeSets ( Item . KeySelector )
176+ . RecordCacheItems ( out var results ) ;
171177
172178
173179 var item1 = new Item ( ) { Id = 1 , IsIncluded = true } ;
@@ -178,19 +184,19 @@ public void SourceCompletes_CompletionIsPropagated()
178184 source . OnCompleted ( ) ;
179185
180186 results . Error . Should ( ) . BeNull ( ) ;
181- results . IsCompleted . Should ( ) . BeTrue ( ) ;
182- results . Messages . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
183- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
187+ results . HasCompleted . Should ( ) . BeTrue ( ) ;
188+ results . RecordedChangeSets . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
189+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
190+
184191
185-
186192 // Make sure no extraneous notifications are published.
187193 var item2 = new Item ( ) { Id = 2 , IsIncluded = true } ;
188194 source . OnNext ( new ChangeSet < Item , int > ( )
189195 {
190196 new ( reason : ChangeReason . Add , key : item2 . Id , current : item2 )
191197 } ) ;
192198
193- results . Messages . Skip ( 1 ) . Should ( ) . BeEmpty ( "no source operations should have been processed" ) ;
199+ results . RecordedChangeSets . Skip ( 1 ) . Should ( ) . BeEmpty ( "no source operations should have been processed" ) ;
194200 }
195201
196202 [ Fact ]
@@ -210,17 +216,16 @@ public void SourceCompletesImmediately_CompletionIsPropagated()
210216 return Disposable . Empty ;
211217 } ) ;
212218
213- var error = new Exception ( ) ;
214-
215- using var results = source
219+ using var subscription = source
216220 . FilterImmutable ( predicate : Item . Predicate )
217- . AsAggregator ( ) ;
221+ . ValidateChangeSets ( Item . KeySelector )
222+ . RecordCacheItems ( out var results ) ;
218223
219224
220225 results . Error . Should ( ) . BeNull ( ) ;
221- results . IsCompleted . Should ( ) . BeTrue ( ) ;
222- results . Messages . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
223- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
226+ results . HasCompleted . Should ( ) . BeTrue ( ) ;
227+ results . RecordedChangeSets . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
228+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
224229 }
225230
226231 [ Fact ]
@@ -230,9 +235,10 @@ public void SourceErrors_ErrorIsPropagated()
230235
231236 var error = new Exception ( ) ;
232237
233- using var results = source
238+ using var subscription = source
234239 . FilterImmutable ( predicate : Item . Predicate )
235- . AsAggregator ( ) ;
240+ . ValidateChangeSets ( Item . KeySelector )
241+ . RecordCacheItems ( out var results ) ;
236242
237243
238244 var item1 = new Item ( ) { Id = 1 , IsIncluded = true } ;
@@ -243,19 +249,19 @@ public void SourceErrors_ErrorIsPropagated()
243249 source . OnError ( error ) ;
244250
245251 results . Error . Should ( ) . Be ( error ) ;
246- results . IsCompleted . Should ( ) . BeFalse ( ) ;
247- results . Messages . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
248- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
252+ results . HasCompleted . Should ( ) . BeFalse ( ) ;
253+ results . RecordedChangeSets . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
254+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
255+
249256
250-
251257 // Make sure no extraneous notifications are published.
252258 var item2 = new Item ( ) { Id = 2 , IsIncluded = true } ;
253259 source . OnNext ( new ChangeSet < Item , int > ( )
254260 {
255261 new ( reason : ChangeReason . Add , key : item2 . Id , current : item2 )
256262 } ) ;
257263
258- results . Messages . Skip ( 1 ) . Should ( ) . BeEmpty ( "no source operations should have been processed" ) ;
264+ results . RecordedChangeSets . Skip ( 1 ) . Should ( ) . BeEmpty ( "no source operations should have been processed" ) ;
259265 }
260266
261267 [ Fact ]
@@ -276,15 +282,16 @@ public void SourceErrorsImmediately_ErrorIsPropagated()
276282 return Disposable . Empty ;
277283 } ) ;
278284
279- using var results = source
285+ using var subscription = source
280286 . FilterImmutable ( predicate : Item . Predicate )
281- . AsAggregator ( ) ;
287+ . ValidateChangeSets ( Item . KeySelector )
288+ . RecordCacheItems ( out var results ) ;
282289
283290
284291 results . Error . Should ( ) . Be ( error ) ;
285- results . IsCompleted . Should ( ) . BeFalse ( ) ;
286- results . Messages . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
287- results . Data . Items . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
292+ results . HasCompleted . Should ( ) . BeFalse ( ) ;
293+ results . RecordedChangeSets . Count . Should ( ) . Be ( 1 , "1 source operation was performed" ) ;
294+ results . RecordedItemsByKey . Values . Should ( ) . BeEquivalentTo ( new [ ] { item1 } , "1 item was added" ) ;
288295 }
289296
290297 [ Fact ]
@@ -299,36 +306,38 @@ public void SuppressEmptyChangesetsIsFalse_EmptyChangesetsArePublished()
299306 {
300307 using var source = new Subject < IChangeSet < Item , int > > ( ) ;
301308
302- using var results = source
309+ using var subscription = source
303310 . FilterImmutable (
304311 predicate : Item . Predicate ,
305312 suppressEmptyChangeSets : false )
306- . AsAggregator ( ) ;
313+ . ValidateChangeSets ( Item . KeySelector )
314+ . RecordCacheItems ( out var results ) ;
307315
308316
309317 ManipulateExcludedItems ( source ) ;
310318
311319 results . Error . Should ( ) . BeNull ( ) ;
312- results . IsCompleted . Should ( ) . BeFalse ( ) ;
313- results . Messages . Count . Should ( ) . Be ( 5 , "5 source operations were performed" ) ;
314- results . Messages . Should ( ) . AllSatisfy ( changes => changes . Should ( ) . BeEmpty ( ) , "no included items were manipulated" ) ;
320+ results . HasCompleted . Should ( ) . BeFalse ( ) ;
321+ results . RecordedChangeSets . Count . Should ( ) . Be ( 5 , "5 source operations were performed" ) ;
322+ results . RecordedChangeSets . Should ( ) . AllSatisfy ( changes => changes . Should ( ) . BeEmpty ( ) , "no included items were manipulated" ) ;
315323 }
316324
317325 [ Fact ]
318326 public void SuppressEmptyChangesetsIsTrue_EmptyChangesetsAreNotPublished ( )
319327 {
320328 using var source = new Subject < IChangeSet < Item , int > > ( ) ;
321329
322- using var results = source
330+ using var subscription = source
323331 . FilterImmutable ( predicate : Item . Predicate )
324- . AsAggregator ( ) ;
332+ . ValidateChangeSets ( Item . KeySelector )
333+ . RecordCacheItems ( out var results ) ;
325334
326335
327336 ManipulateExcludedItems ( source ) ;
328337
329338 results . Error . Should ( ) . BeNull ( ) ;
330- results . IsCompleted . Should ( ) . BeFalse ( ) ;
331- results . Messages . Should ( ) . BeEmpty ( "no source operations should have generated changes" ) ;
339+ results . HasCompleted . Should ( ) . BeFalse ( ) ;
340+ results . RecordedChangeSets . Should ( ) . BeEmpty ( "no source operations should have generated changes" ) ;
332341 }
333342
334343 private static void ManipulateExcludedItems ( ISubject < IChangeSet < Item , int > > source )
@@ -364,6 +373,40 @@ private static void ManipulateExcludedItems(ISubject<IChangeSet<Item, int>> sour
364373 } ) ;
365374 }
366375
376+ [ Fact ]
377+ public void Update_PreviousMatchedCurrentDoesNot_EmitsRemoveCarryingPreviousAsCurrent ( )
378+ {
379+ // Per Change<T,K> contract, a Remove change carries the item that was removed in Current.
380+ // For an Update where Previous matched the predicate but Current does not, the item that
381+ // leaves the filtered view is Previous (it was downstream; Current never reached downstream).
382+ using var source = new Subject < IChangeSet < Item , int > > ( ) ;
383+
384+ using var subscription = source
385+ . FilterImmutable ( predicate : Item . Predicate )
386+ . ValidateChangeSets ( Item . KeySelector )
387+ . RecordCacheItems ( out var results ) ;
388+
389+ var included = new Item ( ) { Id = 1 , IsIncluded = true } ;
390+ var excluded = new Item ( ) { Id = 1 , IsIncluded = false } ;
391+
392+ source . OnNext ( new ChangeSet < Item , int > ( )
393+ {
394+ new ( reason : ChangeReason . Add , key : included . Id , current : included , index : 0 )
395+ } ) ;
396+
397+ source . OnNext ( new ChangeSet < Item , int > ( )
398+ {
399+ new ( reason : ChangeReason . Update , key : excluded . Id , current : excluded , previous : included , currentIndex : 0 , previousIndex : 0 )
400+ } ) ;
401+
402+ var lastChangeSet = results . RecordedChangeSets [ results . RecordedChangeSets . Count - 1 ] ;
403+ lastChangeSet . Count . Should ( ) . Be ( 1 ) ;
404+
405+ var removeChange = lastChangeSet . Single ( ) ;
406+ removeChange . Reason . Should ( ) . Be ( ChangeReason . Remove ) ;
407+ removeChange . Current . Should ( ) . BeSameAs ( included , "Remove.Current must carry the item that left downstream (the previously-matching value), not the new value that never reached downstream" ) ;
408+ }
409+
367410 private class Item
368411 {
369412 public static readonly Func < Item , int > KeySelector
0 commit comments