Skip to content

Commit 0b4571e

Browse files
authored
Fix potential deadlocks (#225)
* Preventing potential deadlocks in reactive node. * Fixed locking in pipeline. * Introduced _stateQueue * Finetune
1 parent 45b9a47 commit 0b4571e

3 files changed

Lines changed: 63 additions & 54 deletions

File tree

src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ namespace CodeCasa.AutomationPipelines.Lights.ReactiveNode;
1313
/// </summary>
1414
public class ReactiveNode : PipelineNode<LightTransition>
1515
{
16-
private readonly Lock _lock = new();
1716
private readonly string? _name;
1817
private readonly ILogger<ReactiveNode>? _logger;
1918
private readonly IEqualityComparer<LightTransition>? _equalityComparer;
2019
private readonly Subject<Unit> _nodeChangedSubject = new();
20+
private readonly Subject<Action> _stateQueue = new();
21+
private readonly IDisposable _queueSubscription;
2122
private IDisposable? _nodeObservableSubscription;
2223
private IDisposable? _activeNodeSubscription;
2324

@@ -45,23 +46,28 @@ public ReactiveNode(string? name, IObservable<IPipelineNode<LightTransition>?> n
4546
_equalityComparer = equalityComparer;
4647
PassThrough = true;
4748

49+
_queueSubscription = _stateQueue
50+
.Synchronize()
51+
.Subscribe(action => action());
52+
4853
_nodeObservableSubscription = nodeObservable
4954
.Subscribe(n =>
5055
{
51-
lock (_lock)
56+
_stateQueue.OnNext(() =>
5257
{
5358
if (n == null)
5459
{
5560
DeactivateActiveNode();
5661
PassThrough = true;
5762
_logger?.LogTrace($"{LogPrefix}No active node. Passing through data.");
58-
_nodeChangedSubject.OnNext(Unit.Default);
59-
return;
63+
}
64+
else
65+
{
66+
ActivateNode(n);
6067
}
6168

62-
ActivateNode(n);
6369
_nodeChangedSubject.OnNext(Unit.Default);
64-
}
70+
});
6571
});
6672
}
6773

@@ -79,17 +85,13 @@ public ReactiveNode(string? name, IObservable<IPipelineNode<LightTransition>?> n
7985
/// <inheritdoc />
8086
protected override void InputReceived(LightTransition? input)
8187
{
82-
if (ActiveNode == null)
83-
{
84-
return;
85-
}
86-
lock (_lock)
88+
_stateQueue.OnNext(() =>
8789
{
8890
if (ActiveNode != null)
8991
{
9092
ActiveNode.Input = input;
9193
}
92-
}
94+
});
9395
}
9496

9597
private void DeactivateActiveNode()
@@ -117,29 +119,37 @@ private void ActivateNode(IPipelineNode<LightTransition> node)
117119
}
118120
_activeNodeSubscription = ActiveNode.OnNewOutput.Subscribe(output =>
119121
{
120-
if (_equalityComparer != null && _equalityComparer.Equals(Output, output))
121-
{
122-
return;
123-
}
124-
lock (_lock)
125-
{
126-
if (_equalityComparer == null || !_equalityComparer.Equals(Output, output))
127-
{
128-
Output = output;
129-
}
130-
}
122+
_stateQueue.OnNext(() => UpdateOutput(output));
131123
});
132124
PassThrough = false;
133125
}
134126

127+
private void UpdateOutput(LightTransition? newOutput)
128+
{
129+
if (_equalityComparer == null || !_equalityComparer.Equals(Output, newOutput))
130+
{
131+
Output = newOutput;
132+
}
133+
}
134+
135135
/// <inheritdoc />
136136
public override string ToString() => _name ?? base.ToString();
137137

138138
/// <inheritdoc />
139-
public override ValueTask DisposeAsync()
139+
public override async ValueTask DisposeAsync()
140140
{
141141
_nodeObservableSubscription?.Dispose();
142142
_nodeObservableSubscription = null;
143-
return base.DisposeAsync();
143+
144+
if (ActiveNode != null)
145+
{
146+
await ActiveNode.DisposeOrDisposeAsync();
147+
}
148+
149+
_queueSubscription.Dispose();
150+
_stateQueue.Dispose();
151+
_nodeChangedSubject.Dispose();
152+
153+
await base.DisposeAsync();
144154
}
145155
}

src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,19 +273,25 @@ public void Register(T reference)
273273
/// <inheritdoc />
274274
public void Unregister(T reference)
275275
{
276-
bool becameEmpty = false;
276+
var becameEmpty = false;
277277

278278
lock (_lock)
279279
{
280280
if (_isDisposed)
281+
{
281282
return;
283+
}
282284

283285
if (_items.Remove(reference) && _items.Count == 0)
286+
{
284287
becameEmpty = true;
288+
}
285289
}
286290

287291
if (becameEmpty)
292+
{
288293
_lastUnregistered.OnNext(Unit.Default);
294+
}
289295
}
290296

291297
/// <inheritdoc />
@@ -294,7 +300,9 @@ public void Dispose()
294300
lock (_lock)
295301
{
296302
if (_isDisposed)
303+
{
297304
return;
305+
}
298306

299307
_isDisposed = true;
300308
_items.Clear();

src/CodeCasa.AutomationPipelines/Pipeline.cs

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Reactive.Linq;
1+
using System.Reactive.Concurrency;
2+
using System.Reactive.Linq;
23
using System.Reactive.Subjects;
34

45
namespace CodeCasa.AutomationPipelines;
@@ -8,7 +9,6 @@ namespace CodeCasa.AutomationPipelines;
89
/// </summary>
910
public class Pipeline<TState> : PipelineNode<TState>, IPipeline<TState>
1011
{
11-
private readonly Lock _lock = new();
1212
private readonly List<IPipelineNode<TState>> _nodes = new();
1313
private readonly Subject<PipelineTelemetry<TState>> _telemetrySubject = new();
1414
private readonly List<IDisposable> _nestedPipelineSubscriptions = new();
@@ -78,7 +78,7 @@ public Pipeline(TState defaultState, params IPipelineNode<TState>[] nodes)
7878
public IReadOnlyCollection<IPipelineNode<TState>> Nodes => _nodes.AsReadOnly();
7979

8080
/// <inheritdoc />
81-
public IObservable<PipelineTelemetry<TState>> Telemetry => _telemetrySubject.AsObservable();
81+
public IObservable<PipelineTelemetry<TState>> Telemetry => _telemetrySubject.ObserveOn(TaskPoolScheduler.Default);
8282

8383
/// <inheritdoc />
8484
public IPipeline<TState> SetDefault(TState state)
@@ -107,17 +107,14 @@ public IPipeline<TState> RegisterNode(IPipelineNode<TState> node)
107107
var destinationIndex = _nodes.Count;
108108
previousNode.OnNewOutput.Subscribe(output =>
109109
{
110-
lock (_lock)
111-
{
112-
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
113-
sourceIndex,
114-
previousNode.ToString(),
115-
destinationIndex,
116-
node.ToString(),
117-
previousNode.Output
118-
));
119-
node.Input = output;
120-
}
110+
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
111+
sourceIndex,
112+
previousNode.ToString(),
113+
destinationIndex,
114+
node.ToString(),
115+
previousNode.Output
116+
));
117+
node.Input = output;
121118
});
122119

123120
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
@@ -141,10 +138,7 @@ public IPipeline<TState> RegisterNode(IPipelineNode<TState> node)
141138
var nestedPipelineName = nestedPipeline.ToString() ?? string.Empty;
142139
_nestedPipelineSubscriptions.Add(nestedPipeline.Telemetry.Subscribe(t =>
143140
{
144-
lock (_lock)
145-
{
146-
_telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName));
147-
}
141+
_telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName));
148142
}));
149143
}
150144

@@ -156,17 +150,14 @@ public IPipeline<TState> RegisterNode(IPipelineNode<TState> node)
156150
var nodeIndex = _nodes.Count - 1;
157151
_subscription = node.OnNewOutput.Subscribe(o =>
158152
{
159-
lock (_lock)
160-
{
161-
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
162-
nodeIndex,
163-
node.ToString(),
164-
null, null,
165-
o
166-
));
153+
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
154+
nodeIndex,
155+
node.ToString(),
156+
null, null,
157+
o
158+
));
167159

168-
SetOutputAndCallActionWhenApplicable(o);
169-
}
160+
SetOutputAndCallActionWhenApplicable(o);
170161
});
171162

172163
var newOutput = node.Output;

0 commit comments

Comments
 (0)