Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ namespace CodeCasa.AutomationPipelines.Lights.ReactiveNode;
/// </summary>
public class ReactiveNode : PipelineNode<LightTransition>
{
private readonly Lock _lock = new();
private readonly string? _name;
private readonly ILogger<ReactiveNode>? _logger;
private readonly IEqualityComparer<LightTransition>? _equalityComparer;
private readonly Subject<Unit> _nodeChangedSubject = new();
private readonly Subject<Action> _stateQueue = new();
private readonly IDisposable _queueSubscription;
private IDisposable? _nodeObservableSubscription;
private IDisposable? _activeNodeSubscription;

Expand Down Expand Up @@ -45,23 +46,28 @@ public ReactiveNode(string? name, IObservable<IPipelineNode<LightTransition>?> n
_equalityComparer = equalityComparer;
PassThrough = true;

_queueSubscription = _stateQueue
.Synchronize()
.Subscribe(action => action());

_nodeObservableSubscription = nodeObservable
.Subscribe(n =>
{
lock (_lock)
_stateQueue.OnNext(() =>
{
if (n == null)
{
DeactivateActiveNode();
PassThrough = true;
_logger?.LogTrace($"{LogPrefix}No active node. Passing through data.");
_nodeChangedSubject.OnNext(Unit.Default);
return;
}
else
{
ActivateNode(n);
}

ActivateNode(n);
_nodeChangedSubject.OnNext(Unit.Default);
}
});
});
}

Expand All @@ -79,17 +85,13 @@ public ReactiveNode(string? name, IObservable<IPipelineNode<LightTransition>?> n
/// <inheritdoc />
protected override void InputReceived(LightTransition? input)
{
if (ActiveNode == null)
{
return;
}
lock (_lock)
_stateQueue.OnNext(() =>
{
if (ActiveNode != null)
{
ActiveNode.Input = input;
}
}
});
}

private void DeactivateActiveNode()
Expand Down Expand Up @@ -117,29 +119,37 @@ private void ActivateNode(IPipelineNode<LightTransition> node)
}
_activeNodeSubscription = ActiveNode.OnNewOutput.Subscribe(output =>
{
if (_equalityComparer != null && _equalityComparer.Equals(Output, output))
{
return;
}
lock (_lock)
{
if (_equalityComparer == null || !_equalityComparer.Equals(Output, output))
{
Output = output;
}
}
_stateQueue.OnNext(() => UpdateOutput(output));
});
PassThrough = false;
}

private void UpdateOutput(LightTransition? newOutput)
{
if (_equalityComparer == null || !_equalityComparer.Equals(Output, newOutput))
{
Output = newOutput;
}
}

/// <inheritdoc />
public override string ToString() => _name ?? base.ToString();

/// <inheritdoc />
public override ValueTask DisposeAsync()
public override async ValueTask DisposeAsync()
{
_nodeObservableSubscription?.Dispose();
_nodeObservableSubscription = null;
return base.DisposeAsync();

if (ActiveNode != null)
{
await ActiveNode.DisposeOrDisposeAsync();
}

_queueSubscription.Dispose();
_stateQueue.Dispose();
_nodeChangedSubject.Dispose();

await base.DisposeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,25 @@ public void Register(T reference)
/// <inheritdoc />
public void Unregister(T reference)
{
bool becameEmpty = false;
var becameEmpty = false;

lock (_lock)
{
if (_isDisposed)
{
return;
}

if (_items.Remove(reference) && _items.Count == 0)
{
becameEmpty = true;
}
}

if (becameEmpty)
{
_lastUnregistered.OnNext(Unit.Default);
}
}

/// <inheritdoc />
Expand All @@ -294,7 +300,9 @@ public void Dispose()
lock (_lock)
{
if (_isDisposed)
{
return;
}

_isDisposed = true;
_items.Clear();
Expand Down
47 changes: 19 additions & 28 deletions src/CodeCasa.AutomationPipelines/Pipeline.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace CodeCasa.AutomationPipelines;
Expand All @@ -8,7 +9,6 @@ namespace CodeCasa.AutomationPipelines;
/// </summary>
public class Pipeline<TState> : PipelineNode<TState>, IPipeline<TState>
{
private readonly Lock _lock = new();
private readonly List<IPipelineNode<TState>> _nodes = new();
private readonly Subject<PipelineTelemetry<TState>> _telemetrySubject = new();
private readonly List<IDisposable> _nestedPipelineSubscriptions = new();
Expand Down Expand Up @@ -78,7 +78,7 @@ public Pipeline(TState defaultState, params IPipelineNode<TState>[] nodes)
public IReadOnlyCollection<IPipelineNode<TState>> Nodes => _nodes.AsReadOnly();

/// <inheritdoc />
public IObservable<PipelineTelemetry<TState>> Telemetry => _telemetrySubject.AsObservable();
public IObservable<PipelineTelemetry<TState>> Telemetry => _telemetrySubject.ObserveOn(TaskPoolScheduler.Default);

/// <inheritdoc />
public IPipeline<TState> SetDefault(TState state)
Expand Down Expand Up @@ -107,17 +107,14 @@ public IPipeline<TState> RegisterNode(IPipelineNode<TState> node)
var destinationIndex = _nodes.Count;
previousNode.OnNewOutput.Subscribe(output =>
{
lock (_lock)
{
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
sourceIndex,
previousNode.ToString(),
destinationIndex,
node.ToString(),
previousNode.Output
));
node.Input = output;
}
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
sourceIndex,
previousNode.ToString(),
destinationIndex,
node.ToString(),
previousNode.Output
));
node.Input = output;
});

_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
Expand All @@ -141,10 +138,7 @@ public IPipeline<TState> RegisterNode(IPipelineNode<TState> node)
var nestedPipelineName = nestedPipeline.ToString() ?? string.Empty;
_nestedPipelineSubscriptions.Add(nestedPipeline.Telemetry.Subscribe(t =>
{
lock (_lock)
{
_telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName));
}
_telemetrySubject.OnNext(t.WithParentPipeline(nestedIndex, nestedPipelineName));
}));
}

Expand All @@ -156,17 +150,14 @@ public IPipeline<TState> RegisterNode(IPipelineNode<TState> node)
var nodeIndex = _nodes.Count - 1;
_subscription = node.OnNewOutput.Subscribe(o =>
{
lock (_lock)
{
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
nodeIndex,
node.ToString(),
null, null,
o
));
_telemetrySubject.OnNext(new PipelineTelemetry<TState>(
nodeIndex,
node.ToString(),
null, null,
o
));

SetOutputAndCallActionWhenApplicable(o);
}
SetOutputAndCallActionWhenApplicable(o);
});

var newOutput = node.Output;
Expand Down