From cdf7505fb8364c2325558dc3b9eaacb1907f427e Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 22 May 2026 09:54:21 +0200 Subject: [PATCH 1/4] Preventing potential deadlocks in reactive node. --- .../ReactiveNode/ReactiveNode.cs | 11 ++++++----- .../ReactiveNode/ReactiveNodeFactory.cs | 10 +++++++++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs index 226a053..6059df0 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs @@ -55,13 +55,14 @@ public ReactiveNode(string? name, IObservable?> n DeactivateActiveNode(); PassThrough = true; _logger?.LogTrace($"{LogPrefix}No active node. Passing through data."); - _nodeChangedSubject.OnNext(Unit.Default); - return; } - - ActivateNode(n); - _nodeChangedSubject.OnNext(Unit.Default); + else + { + ActivateNode(n); + } } + + _nodeChangedSubject.OnNext(Unit.Default); }); } diff --git a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs index 73fb8a4..ed56f0a 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs @@ -273,19 +273,25 @@ public void Register(T reference) /// public void Unregister(T reference) { - bool becameEmpty = false; + var becameEmpty = false; lock (_lock) { if (_isDisposed) + { return; + } if (_items.Remove(reference) && _items.Count == 0) + { becameEmpty = true; + } } if (becameEmpty) + { _lastUnregistered.OnNext(Unit.Default); + } } /// @@ -294,7 +300,9 @@ public void Dispose() lock (_lock) { if (_isDisposed) + { return; + } _isDisposed = true; _items.Clear(); From 0ce1d2d9103e0715e7e26c51dce6f4dded7fc19c Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 22 May 2026 10:14:00 +0200 Subject: [PATCH 2/4] Fixed locking in pipeline. --- src/CodeCasa.AutomationPipelines/Pipeline.cs | 47 ++++++++------------ 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/src/CodeCasa.AutomationPipelines/Pipeline.cs b/src/CodeCasa.AutomationPipelines/Pipeline.cs index 2811e9d..ec9a03d 100644 --- a/src/CodeCasa.AutomationPipelines/Pipeline.cs +++ b/src/CodeCasa.AutomationPipelines/Pipeline.cs @@ -1,4 +1,5 @@ -using System.Reactive.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Linq; using System.Reactive.Subjects; namespace CodeCasa.AutomationPipelines; @@ -8,7 +9,6 @@ namespace CodeCasa.AutomationPipelines; /// public class Pipeline : PipelineNode, IPipeline { - private readonly Lock _lock = new(); private readonly List> _nodes = new(); private readonly Subject> _telemetrySubject = new(); private readonly List _nestedPipelineSubscriptions = new(); @@ -78,7 +78,7 @@ public Pipeline(TState defaultState, params IPipelineNode[] nodes) public IReadOnlyCollection> Nodes => _nodes.AsReadOnly(); /// - public IObservable> Telemetry => _telemetrySubject.AsObservable(); + public IObservable> Telemetry => _telemetrySubject.ObserveOn(TaskPoolScheduler.Default); /// public IPipeline SetDefault(TState state) @@ -107,17 +107,14 @@ public IPipeline RegisterNode(IPipelineNode node) var destinationIndex = _nodes.Count; previousNode.OnNewOutput.Subscribe(output => { - lock (_lock) - { - _telemetrySubject.OnNext(new PipelineTelemetry( - sourceIndex, - previousNode.ToString(), - destinationIndex, - node.ToString(), - previousNode.Output - )); - node.Input = output; - } + _telemetrySubject.OnNext(new PipelineTelemetry( + sourceIndex, + previousNode.ToString(), + destinationIndex, + node.ToString(), + previousNode.Output + )); + node.Input = output; }); _telemetrySubject.OnNext(new PipelineTelemetry( @@ -141,10 +138,7 @@ public IPipeline RegisterNode(IPipelineNode node) var nestedPipelineName = nestedPipeline.ToString() ?? string.Empty; _nestedPipelineSubscriptions.Add(nestedPipeline.Telemetry.Subscribe(t => { - lock (_lock) - { - _telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName)); - } + _telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName)); })); } @@ -156,17 +150,14 @@ public IPipeline RegisterNode(IPipelineNode node) var nodeIndex = _nodes.Count - 1; _subscription = node.OnNewOutput.Subscribe(o => { - lock (_lock) - { - _telemetrySubject.OnNext(new PipelineTelemetry( - nodeIndex, - node.ToString(), - null, null, - o - )); + _telemetrySubject.OnNext(new PipelineTelemetry( + nodeIndex, + node.ToString(), + null, null, + o + )); - SetOutputAndCallActionWhenApplicable(o); - } + SetOutputAndCallActionWhenApplicable(o); }); var newOutput = node.Output; From 511467f38b3fcee8a324edb0dc72597f3c31c11a Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 22 May 2026 10:32:46 +0200 Subject: [PATCH 3/4] Introduced _stateQueue --- .../ReactiveNode/ReactiveNode.cs | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs index 6059df0..4d205b7 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs @@ -13,11 +13,12 @@ namespace CodeCasa.AutomationPipelines.Lights.ReactiveNode; /// public class ReactiveNode : PipelineNode { - private readonly Lock _lock = new(); private readonly string? _name; private readonly ILogger? _logger; private readonly IEqualityComparer? _equalityComparer; private readonly Subject _nodeChangedSubject = new(); + private readonly Subject _stateQueue = new(); + private readonly IDisposable _queueSubscription; private IDisposable? _nodeObservableSubscription; private IDisposable? _activeNodeSubscription; @@ -45,10 +46,14 @@ public ReactiveNode(string? name, IObservable?> n _equalityComparer = equalityComparer; PassThrough = true; + _queueSubscription = _stateQueue + .Synchronize() + .Subscribe(action => action()); + _nodeObservableSubscription = nodeObservable .Subscribe(n => { - lock (_lock) + _stateQueue.OnNext(() => { if (n == null) { @@ -60,7 +65,7 @@ public ReactiveNode(string? name, IObservable?> n { ActivateNode(n); } - } + }); _nodeChangedSubject.OnNext(Unit.Default); }); @@ -80,17 +85,13 @@ public ReactiveNode(string? name, IObservable?> n /// protected override void InputReceived(LightTransition? input) { - if (ActiveNode == null) - { - return; - } - lock (_lock) + _stateQueue.OnNext(() => { if (ActiveNode != null) { ActiveNode.Input = input; } - } + }); } private void DeactivateActiveNode() @@ -118,29 +119,37 @@ private void ActivateNode(IPipelineNode node) } _activeNodeSubscription = ActiveNode.OnNewOutput.Subscribe(output => { - if (_equalityComparer != null && _equalityComparer.Equals(Output, output)) - { - return; - } - lock (_lock) - { - if (_equalityComparer == null || !_equalityComparer.Equals(Output, output)) - { - Output = output; - } - } + _stateQueue.OnNext(() => UpdateOutput(output)); }); PassThrough = false; } + private void UpdateOutput(LightTransition? newOutput) + { + if (_equalityComparer == null || !_equalityComparer.Equals(Output, newOutput)) + { + Output = newOutput; + } + } + /// public override string ToString() => _name ?? base.ToString(); /// - public override ValueTask DisposeAsync() + public override async ValueTask DisposeAsync() { _nodeObservableSubscription?.Dispose(); _nodeObservableSubscription = null; - return base.DisposeAsync(); + + _queueSubscription.Dispose(); + _stateQueue.Dispose(); + _nodeChangedSubject.Dispose(); + + if (ActiveNode != null) + { + await ActiveNode.DisposeOrDisposeAsync(); + } + + await base.DisposeAsync(); } } \ No newline at end of file From 298e9d805025b54d12a79828f4b308cb2f25c520 Mon Sep 17 00:00:00 2001 From: Jasper Date: Fri, 22 May 2026 10:34:42 +0200 Subject: [PATCH 4/4] Finetune --- .../ReactiveNode/ReactiveNode.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs index 4d205b7..f9a92bb 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs @@ -65,9 +65,9 @@ public ReactiveNode(string? name, IObservable?> n { ActivateNode(n); } - }); - _nodeChangedSubject.OnNext(Unit.Default); + _nodeChangedSubject.OnNext(Unit.Default); + }); }); } @@ -141,15 +141,15 @@ public override async ValueTask DisposeAsync() _nodeObservableSubscription?.Dispose(); _nodeObservableSubscription = null; - _queueSubscription.Dispose(); - _stateQueue.Dispose(); - _nodeChangedSubject.Dispose(); - if (ActiveNode != null) { await ActiveNode.DisposeOrDisposeAsync(); } + _queueSubscription.Dispose(); + _stateQueue.Dispose(); + _nodeChangedSubject.Dispose(); + await base.DisposeAsync(); } } \ No newline at end of file