Skip to content

Commit 5de9389

Browse files
authored
Feature/pipeline observable (#209)
* Adding pipeline telemetry observable * Removing json ignore. * Moved pipeline logger * Fixed unit test
1 parent db3d171 commit 5de9389

15 files changed

Lines changed: 182 additions & 163 deletions

File tree

src/CodeCasa.AutomationPipelines.Lights/Extensions/LightTransitionNodeExtensions.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Reactive;
2-
using System.Reactive.Concurrency;
1+
using System.Reactive.Concurrency;
32
using System.Reactive.Linq;
43
using CodeCasa.AutomationPipelines.Lights.Nodes;
54
using CodeCasa.Lights;

src/CodeCasa.AutomationPipelines.Lights/Nodes/LightTransitionNode.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,5 +166,17 @@ private void SetOutputInternal(LightTransition? output)
166166

167167
/// <inheritdoc />
168168
public override string ToString() => GetType().Name;
169+
170+
/// <inheritdoc />
171+
public ValueTask DisposeAsync()
172+
{
173+
if (_newOutputSubject.IsDisposed)
174+
{
175+
return ValueTask.CompletedTask;
176+
}
177+
_newOutputSubject.OnCompleted();
178+
_newOutputSubject.Dispose();
179+
return ValueTask.CompletedTask;
180+
}
169181
}
170182
}

src/CodeCasa.AutomationPipelines.Lights/Nodes/ServiceScopedNode.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,6 @@ namespace CodeCasa.AutomationPipelines.Lights.Nodes
66
internal class ServiceScopedNode<TState>(IServiceScope serviceScope, IPipelineNode<TState> innerNode)
77
: IPipelineNode<TState>, IAsyncDisposable
88
{
9-
public async ValueTask DisposeAsync()
10-
{
11-
await serviceScope.DisposeOrDisposeAsync();
12-
await innerNode.DisposeOrDisposeAsync();
13-
}
14-
159
public TState? Input
1610
{
1711
get => innerNode.Input;
@@ -22,5 +16,11 @@ public TState? Input
2216
public IObservable<TState?> OnNewOutput => innerNode.OnNewOutput;
2317

2418
public override string? ToString() => $"{innerNode} (scoped)";
19+
20+
public async ValueTask DisposeAsync()
21+
{
22+
await serviceScope.DisposeOrDisposeAsync();
23+
await innerNode.DisposeOrDisposeAsync();
24+
}
2525
}
2626
}

src/CodeCasa.AutomationPipelines.Lights/Pipeline/LightPipelineFactory.cs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public IAsyncDisposable SetupLightPipeline<TLight>(TLight light,
3838
/// <param name="light">The light to create a pipeline for.</param>
3939
/// <param name="pipelineBuilder">An action to configure the pipeline behavior.</param>
4040
/// <returns>A configured pipeline for controlling the specified light.</returns>
41-
internal IPipeline<LightTransition> CreateLightPipeline<TLight>(TLight light, Action<ILightTransitionPipelineConfigurator<TLight>> pipelineBuilder) where TLight : ILight
41+
public IPipeline<LightTransition> CreateLightPipeline<TLight>(TLight light, Action<ILightTransitionPipelineConfigurator<TLight>> pipelineBuilder) where TLight : ILight
4242
{
4343
return CreateLightPipelines([light], pipelineBuilder)[light.Id];
4444
}
@@ -79,22 +79,14 @@ internal Dictionary<string, IPipeline<LightTransition>> CreateLightPipelines<TLi
7979
return configurators.ToDictionary(kvp => kvp.Key, kvp =>
8080
{
8181
var conf = kvp.Value;
82-
IPipeline<LightTransition> pipeline;
82+
IPipeline<LightTransition> pipeline = new Pipeline<LightTransition>(
83+
LightTransition.Off(),
84+
conf.Nodes,
85+
conf.Light.ApplyTransition);
8386
if (conf.LoggingEnabled ?? false)
8487
{
85-
pipeline = new Pipeline<LightTransition>(
86-
$"[{conf.Light.Id}] {conf.LogName}",
87-
LightTransition.Off(),
88-
conf.Nodes,
89-
conf.Light.ApplyTransition,
90-
logger);
91-
}
92-
else
93-
{
94-
pipeline = new Pipeline<LightTransition>(
95-
LightTransition.Off(),
96-
conf.Nodes,
97-
conf.Light.ApplyTransition);
88+
var pipelineLogger = new PipelineLogger<LightTransition>(logger, $"[{conf.Light.Id}] {conf.LogName}");
89+
pipeline.Telemetry.Subscribe(t => pipelineLogger.Log(t));
9890
}
9991

10092
return (IPipeline<LightTransition>)new ServiceScopedPipeline<LightTransition>(lightContextScopes[kvp.Key], pipeline);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace CodeCasa.AutomationPipelines.Lights.Pipeline
4+
{
5+
internal class PipelineLogger<TState>(ILogger<Pipeline<TState>>? logger, string? name)
6+
{
7+
public void Log(PipelineTelemetry<TState> pipelineTelemetry)
8+
{
9+
if (pipelineTelemetry.SourceNodeIndex == null && pipelineTelemetry.DestinationNodeIndex == null)
10+
{
11+
logger?.LogTrace($"{LogPrefix}Input set to [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}]. No nodes registered, passing to pipeline output immediately.");
12+
return;
13+
}
14+
15+
if (pipelineTelemetry.SourceNodeIndex == null)
16+
{
17+
logger?.LogTrace($"{LogPrefix}Input set to [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}]. Passing input to first [Node {pipelineTelemetry.DestinationNodeIndex}] ({pipelineTelemetry.DestinationNodeName}).");
18+
return;
19+
}
20+
if (pipelineTelemetry.DestinationNodeIndex == null)
21+
{
22+
logger?.LogTrace(
23+
$"{LogPrefix}[Node {pipelineTelemetry.SourceNodeIndex}] ({pipelineTelemetry.SourceNodeName}) passed value [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}] to pipeline output.");
24+
return;
25+
}
26+
logger?.LogTrace($"{LogPrefix}Passing [Node {pipelineTelemetry.SourceNodeIndex}] ({pipelineTelemetry.SourceNodeName}) value [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}] to [Node {pipelineTelemetry.DestinationNodeIndex}] ({pipelineTelemetry.DestinationNodeName}).");
27+
}
28+
29+
private string LogPrefix => name == null ? "" : $"{name}: ";
30+
}
31+
}

src/CodeCasa.AutomationPipelines.Lights/Pipeline/ServiceScopedPipeline.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,8 @@ public IPipeline<TNode> SetOutputHandler(Action<TNode> action, bool callActionDi
5454
_instance.SetOutputHandler(action, callActionDistinct);
5555
return this;
5656
}
57+
58+
public IReadOnlyCollection<IPipelineNode<TNode>> Nodes => _instance.Nodes;
59+
60+
public IObservable<PipelineTelemetry<TNode>> Telemetry => _instance.Telemetry;
5761
}

src/CodeCasa.AutomationPipelines/IPipeline.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,14 @@ public interface IPipeline<TState> : IPipelineNode<TState>, IAsyncDisposable
2525
/// This method can be called at any time during the creation of the pipeline and will be called immediately if the pipeline has already produced an output.
2626
/// </summary>
2727
IPipeline<TState> SetOutputHandler(Action<TState> action, bool callActionDistinct = true);
28+
29+
/// <summary>
30+
/// Gets the collection of nodes registered in the pipeline.
31+
/// </summary>
32+
IReadOnlyCollection<IPipelineNode<TState>> Nodes { get; }
33+
34+
/// <summary>
35+
/// Gets an observable stream of telemetry events that occur during pipeline execution.
36+
/// </summary>
37+
IObservable<PipelineTelemetry<TState>> Telemetry { get; }
2838
}

src/CodeCasa.AutomationPipelines/IPipelineNode.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
namespace CodeCasa.AutomationPipelines;
1+

2+
namespace CodeCasa.AutomationPipelines;
23

34
/// <summary>
45
/// Represents a node in a pipeline.
56
/// </summary>
6-
public interface IPipelineNode<TState>
7+
public interface IPipelineNode<TState> : IAsyncDisposable
78
{
89
/// <summary>
910
/// Sets the input state of the node. This will trigger the processing of the input.

0 commit comments

Comments
 (0)