Skip to content

Commit 2cc9955

Browse files
committed
fix: MergeMany queue-first disposal prevents spurious OnCompleted, use StrongBox
Finally callbacks fire on disposal, not just natural completion. With queue-last ordering, child Finally handlers could deliver OnCompleted to the observer during teardown. Fixed by using queue-first disposal (same as DisposeMany/OnBeingRemoved) and guarding CheckCompleted against terminated queue. Replaced int[] counter with StrongBox<int> for clearer intent.
1 parent 4ba3ec9 commit 2cc9955

1 file changed

Lines changed: 9 additions & 6 deletions

File tree

src/DynamicData/Cache/Internal/MergeMany.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
using System.Reactive.Disposables;
66
using System.Reactive.Linq;
7+
using System.Runtime.CompilerServices;
78

89
namespace DynamicData.Cache.Internal;
910

@@ -32,25 +33,27 @@ public MergeMany(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IO
3233
public IObservable<TDestination> Run() => Observable.Create<TDestination>(
3334
observer =>
3435
{
35-
var counter = new[] { 1 };
36+
var counter = new StrongBox<int>(1);
3637
var queue = new DeliveryQueue<TDestination>(observer);
3738

38-
return new CompositeDisposable(_source
39+
// Queue first: terminate before subscription disposal to prevent
40+
// Finally callbacks from delivering spurious OnCompleted during teardown.
41+
return new CompositeDisposable(queue, _source
3942
.Do(static _ => { }, static _ => { }, () => CheckCompleted(counter, queue))
4043
.Concat(Observable.Never<IChangeSet<TObject, TKey>>())
4144
.SubscribeMany((t, key) =>
4245
{
43-
Interlocked.Increment(ref counter[0]);
46+
Interlocked.Increment(ref counter.Value);
4447
return _observableSelector(t, key)
4548
.Finally(() => CheckCompleted(counter, queue))
4649
.Subscribe(queue.OnNext, static _ => { });
4750
})
48-
.Subscribe(static _ => { }, observer.OnError), queue);
51+
.Subscribe(static _ => { }, observer.OnError));
4952
});
5053

51-
private static void CheckCompleted(int[] counter, DeliveryQueue<TDestination> queue)
54+
private static void CheckCompleted(StrongBox<int> counter, DeliveryQueue<TDestination> queue)
5255
{
53-
if (Interlocked.Decrement(ref counter[0]) == 0)
56+
if (Interlocked.Decrement(ref counter.Value) == 0 && !queue.IsTerminated)
5457
{
5558
queue.OnCompleted();
5659
}

0 commit comments

Comments
 (0)