diff --git a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs index 226a053..f9a92bb 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNode.cs @@ -13,11 +13,12 @@ namespace CodeCasa.AutomationPipelines.Lights.ReactiveNode; /// public class ReactiveNode : PipelineNode { - private readonly Lock _lock = new(); private readonly string? _name; private readonly ILogger? _logger; private readonly IEqualityComparer? _equalityComparer; private readonly Subject _nodeChangedSubject = new(); + private readonly Subject _stateQueue = new(); + private readonly IDisposable _queueSubscription; private IDisposable? _nodeObservableSubscription; private IDisposable? _activeNodeSubscription; @@ -45,23 +46,28 @@ public ReactiveNode(string? name, IObservable?> n _equalityComparer = equalityComparer; PassThrough = true; + _queueSubscription = _stateQueue + .Synchronize() + .Subscribe(action => action()); + _nodeObservableSubscription = nodeObservable .Subscribe(n => { - lock (_lock) + _stateQueue.OnNext(() => { if (n == null) { DeactivateActiveNode(); PassThrough = true; _logger?.LogTrace($"{LogPrefix}No active node. Passing through data."); - _nodeChangedSubject.OnNext(Unit.Default); - return; + } + else + { + ActivateNode(n); } - ActivateNode(n); _nodeChangedSubject.OnNext(Unit.Default); - } + }); }); } @@ -79,17 +85,13 @@ public ReactiveNode(string? name, IObservable?> n /// protected override void InputReceived(LightTransition? input) { - if (ActiveNode == null) - { - return; - } - lock (_lock) + _stateQueue.OnNext(() => { if (ActiveNode != null) { ActiveNode.Input = input; } - } + }); } private void DeactivateActiveNode() @@ -117,29 +119,37 @@ private void ActivateNode(IPipelineNode node) } _activeNodeSubscription = ActiveNode.OnNewOutput.Subscribe(output => { - if (_equalityComparer != null && _equalityComparer.Equals(Output, output)) - { - return; - } - lock (_lock) - { - if (_equalityComparer == null || !_equalityComparer.Equals(Output, output)) - { - Output = output; - } - } + _stateQueue.OnNext(() => UpdateOutput(output)); }); PassThrough = false; } + private void UpdateOutput(LightTransition? newOutput) + { + if (_equalityComparer == null || !_equalityComparer.Equals(Output, newOutput)) + { + Output = newOutput; + } + } + /// public override string ToString() => _name ?? base.ToString(); /// - public override ValueTask DisposeAsync() + public override async ValueTask DisposeAsync() { _nodeObservableSubscription?.Dispose(); _nodeObservableSubscription = null; - return base.DisposeAsync(); + + if (ActiveNode != null) + { + await ActiveNode.DisposeOrDisposeAsync(); + } + + _queueSubscription.Dispose(); + _stateQueue.Dispose(); + _nodeChangedSubject.Dispose(); + + await base.DisposeAsync(); } } \ No newline at end of file diff --git a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs index 73fb8a4..ed56f0a 100644 --- a/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs +++ b/src/CodeCasa.AutomationPipelines.Lights/ReactiveNode/ReactiveNodeFactory.cs @@ -273,19 +273,25 @@ public void Register(T reference) /// public void Unregister(T reference) { - bool becameEmpty = false; + var becameEmpty = false; lock (_lock) { if (_isDisposed) + { return; + } if (_items.Remove(reference) && _items.Count == 0) + { becameEmpty = true; + } } if (becameEmpty) + { _lastUnregistered.OnNext(Unit.Default); + } } /// @@ -294,7 +300,9 @@ public void Dispose() lock (_lock) { if (_isDisposed) + { return; + } _isDisposed = true; _items.Clear(); diff --git a/src/CodeCasa.AutomationPipelines/Pipeline.cs b/src/CodeCasa.AutomationPipelines/Pipeline.cs index 2811e9d..ec9a03d 100644 --- a/src/CodeCasa.AutomationPipelines/Pipeline.cs +++ b/src/CodeCasa.AutomationPipelines/Pipeline.cs @@ -1,4 +1,5 @@ -using System.Reactive.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Linq; using System.Reactive.Subjects; namespace CodeCasa.AutomationPipelines; @@ -8,7 +9,6 @@ namespace CodeCasa.AutomationPipelines; /// public class Pipeline : PipelineNode, IPipeline { - private readonly Lock _lock = new(); private readonly List> _nodes = new(); private readonly Subject> _telemetrySubject = new(); private readonly List _nestedPipelineSubscriptions = new(); @@ -78,7 +78,7 @@ public Pipeline(TState defaultState, params IPipelineNode[] nodes) public IReadOnlyCollection> Nodes => _nodes.AsReadOnly(); /// - public IObservable> Telemetry => _telemetrySubject.AsObservable(); + public IObservable> Telemetry => _telemetrySubject.ObserveOn(TaskPoolScheduler.Default); /// public IPipeline SetDefault(TState state) @@ -107,17 +107,14 @@ public IPipeline RegisterNode(IPipelineNode node) var destinationIndex = _nodes.Count; previousNode.OnNewOutput.Subscribe(output => { - lock (_lock) - { - _telemetrySubject.OnNext(new PipelineTelemetry( - sourceIndex, - previousNode.ToString(), - destinationIndex, - node.ToString(), - previousNode.Output - )); - node.Input = output; - } + _telemetrySubject.OnNext(new PipelineTelemetry( + sourceIndex, + previousNode.ToString(), + destinationIndex, + node.ToString(), + previousNode.Output + )); + node.Input = output; }); _telemetrySubject.OnNext(new PipelineTelemetry( @@ -141,10 +138,7 @@ public IPipeline RegisterNode(IPipelineNode node) var nestedPipelineName = nestedPipeline.ToString() ?? string.Empty; _nestedPipelineSubscriptions.Add(nestedPipeline.Telemetry.Subscribe(t => { - lock (_lock) - { - _telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName)); - } + _telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName)); })); } @@ -156,17 +150,14 @@ public IPipeline RegisterNode(IPipelineNode node) var nodeIndex = _nodes.Count - 1; _subscription = node.OnNewOutput.Subscribe(o => { - lock (_lock) - { - _telemetrySubject.OnNext(new PipelineTelemetry( - nodeIndex, - node.ToString(), - null, null, - o - )); + _telemetrySubject.OnNext(new PipelineTelemetry( + nodeIndex, + node.ToString(), + null, null, + o + )); - SetOutputAndCallActionWhenApplicable(o); - } + SetOutputAndCallActionWhenApplicable(o); }); var newOutput = node.Output;