Skip to content

Commit a7d793a

Browse files
committed
test: add DeadlockTortureTest proving ABBA fix across all dangerous operators
12 tests: Sort, AutoRefresh, GroupOn, Page, Virtualise, Transform(force), BatchIf, DisposeMany, OnItemRemoved, stacked chain, 8 parallel pairs, and three-way circular (A->B->C->A). Each test creates bidirectional cross-cache pipelines through dangerous operators and hammers them with concurrent writes from two threads. On main (Synchronize(lock)): deadlocks within seconds (3 deadlocks + crash in first run). On the PR branch (SynchronizeSafe): 12/12 pass.
1 parent 2a131de commit a7d793a

1 file changed

Lines changed: 148 additions & 0 deletions

File tree

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
2+
// Roland Pheasant licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
using System;
6+
using System.Reactive.Subjects;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
using DynamicData.Binding;
11+
using DynamicData.Tests.Domain;
12+
13+
using FluentAssertions;
14+
15+
using Xunit;
16+
17+
namespace DynamicData.Tests.Cache;
18+
19+
/// <summary>
20+
/// Deadlock torture test. Every dangerous operator (one that holds a lock during
21+
/// downstream delivery) is wired into a bidirectional cross-cache pipeline.
22+
/// Two threads write simultaneously, creating the ABBA lock cycle:
23+
/// Thread A: sourceA._locker -> operator lock -> PopulateInto -> sourceB._locker
24+
/// Thread B: sourceB._locker -> operator lock -> PopulateInto -> sourceA._locker
25+
///
26+
/// On main (Synchronize(lock)): deadlocks reliably within seconds.
27+
/// On the PR branch (SynchronizeSafe queue-drain): no deadlock possible.
28+
/// </summary>
29+
public sealed class DeadlockTortureTest
30+
{
31+
private const int ItemCount = 200;
32+
private const int Iterations = 50;
33+
private const int TimeoutSeconds = 15;
34+
35+
private static async Task<bool> RunBidirectionalDeadlockTest(
36+
Func<IObservable<IChangeSet<Person, string>>, IObservable<IChangeSet<Person, string>>> pipeline,
37+
int iterations = Iterations)
38+
{
39+
for (var iter = 0; iter < iterations; iter++)
40+
{
41+
using var sourceA = new SourceCache<Person, string>(p => p.UniqueKey);
42+
using var sourceB = new SourceCache<Person, string>(p => p.UniqueKey);
43+
44+
using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB);
45+
using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA);
46+
47+
using var barrier = new Barrier(2);
48+
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); });
49+
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); });
50+
51+
var completed = Task.WhenAll(taskA, taskB);
52+
if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed)
53+
return false;
54+
}
55+
return true;
56+
}
57+
58+
[Fact] public async Task Sort_DoesNotDeadlock() =>
59+
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)))).Should().BeTrue();
60+
61+
[Fact] public async Task AutoRefresh_DoesNotDeadlock() =>
62+
(await RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age))).Should().BeTrue();
63+
64+
[Fact] public async Task GroupOn_DoesNotDeadlock() =>
65+
(await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue();
66+
67+
[Fact] public async Task Page_DoesNotDeadlock()
68+
{
69+
using var req = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
70+
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(req))).Should().BeTrue();
71+
}
72+
73+
[Fact] public async Task Virtualise_DoesNotDeadlock()
74+
{
75+
using var req = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
76+
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue();
77+
}
78+
79+
[Fact] public async Task TransformWithForce_DoesNotDeadlock()
80+
{
81+
using var force = new Subject<Func<Person, string, bool>>();
82+
(await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue();
83+
}
84+
85+
[Fact] public async Task BatchIf_DoesNotDeadlock() =>
86+
(await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null))).Should().BeTrue();
87+
88+
[Fact] public async Task DisposeMany_DoesNotDeadlock() =>
89+
(await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue();
90+
91+
[Fact] public async Task OnItemRemoved_DoesNotDeadlock() =>
92+
(await RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }))).Should().BeTrue();
93+
94+
[Fact] public async Task AllDangerous_Stacked_DoNotDeadlock()
95+
{
96+
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 100));
97+
using var force = new Subject<Func<Person, string, bool>>();
98+
(await RunBidirectionalDeadlockTest(
99+
s => s.AutoRefresh(p => p.Age)
100+
.Filter(p => p.Age >= 0)
101+
.Transform((p, k) => new Person("X-" + p.Name, p.Age), force)
102+
.OnItemRemoved(_ => { })
103+
.DisposeMany()
104+
.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age))
105+
.Page(pageReq),
106+
iterations: Iterations * 2)).Should().BeTrue();
107+
}
108+
109+
[Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock()
110+
{
111+
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
112+
using var virtReq = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
113+
var results = await Task.WhenAll(
114+
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)), 30),
115+
RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30),
116+
RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30),
117+
RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30),
118+
RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30),
119+
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(pageReq), 30),
120+
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(virtReq), 30),
121+
RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null), 30));
122+
results.Should().AllSatisfy(r => r.Should().BeTrue());
123+
}
124+
125+
[Fact] public async Task ThreeWayCircular_DoesNotDeadlock()
126+
{
127+
for (var iter = 0; iter < Iterations; iter++)
128+
{
129+
using var a = new SourceCache<Person, string>(p => p.UniqueKey);
130+
using var b = new SourceCache<Person, string>(p => p.UniqueKey);
131+
using var c = new SourceCache<Person, string>(p => p.UniqueKey);
132+
133+
using var ab = a.Connect().Filter(p => p.Name.StartsWith("A")).Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).PopulateInto(b);
134+
using var bc = b.Connect().Filter(p => p.Name.StartsWith("A")).AutoRefresh(p => p.Age).PopulateInto(c);
135+
using var ca = c.Connect().Filter(p => p.Name.StartsWith("A")).Transform((p, _) => new Person("C-" + p.Name, p.Age)).Filter(p => p.Name.StartsWith("C")).PopulateInto(a);
136+
137+
using var barrier = new Barrier(3);
138+
var tasks = new[]
139+
{
140+
Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) a.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }),
141+
Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) b.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }),
142+
Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) c.AddOrUpdate(new Person("CC-" + iter + "-" + i, i)); }),
143+
};
144+
var completed = Task.WhenAll(tasks);
145+
(await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter);
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)