Skip to content

Commit efe89ad

Browse files
authored
Added possibility to publish pipeline state over mqtt using extension method PublishTelemetryToMqtt (#210)
* Allowing telemetry subscriptions on light transition pipelines. * Adding action overload. * Adding subscribing to the configurators. * Adding nested telemetry. * Moved name to IPipelineNode. * Added naming to internal nodes. * Implemented PublishTelemetryToMqtt * Added xml comment. * Added additional xml comments. * Added xml comments. * Fixed test warning.
1 parent 5de9389 commit efe89ad

54 files changed

Lines changed: 673 additions & 144 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CodeCasa.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeCasa.NetDaemon.Sensors.
4545
EndProject
4646
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeCasa.Lights.Timelines", "src\CodeCasa.Lights.Timelines\CodeCasa.Lights.Timelines.csproj", "{2308FD7B-1A02-4901-9914-094A51936E30}"
4747
EndProject
48+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeCasa.AutomationPipelines.Lights.Mqtt", "src\CodeCasa.AutomationPipelines.Lights.Mqtt\CodeCasa.AutomationPipelines.Lights.Mqtt.csproj", "{84685AB7-AEAA-41E5-8086-254ACDC5EE6B}"
49+
EndProject
4850
Global
4951
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5052
Debug|Any CPU = Debug|Any CPU
@@ -131,6 +133,10 @@ Global
131133
{2308FD7B-1A02-4901-9914-094A51936E30}.Debug|Any CPU.Build.0 = Debug|Any CPU
132134
{2308FD7B-1A02-4901-9914-094A51936E30}.Release|Any CPU.ActiveCfg = Release|Any CPU
133135
{2308FD7B-1A02-4901-9914-094A51936E30}.Release|Any CPU.Build.0 = Release|Any CPU
136+
{84685AB7-AEAA-41E5-8086-254ACDC5EE6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
137+
{84685AB7-AEAA-41E5-8086-254ACDC5EE6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
138+
{84685AB7-AEAA-41E5-8086-254ACDC5EE6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
139+
{84685AB7-AEAA-41E5-8086-254ACDC5EE6B}.Release|Any CPU.Build.0 = Release|Any CPU
134140
EndGlobalSection
135141
GlobalSection(SolutionProperties) = preSolution
136142
HideSolutionNode = FALSE
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using Microsoft.Extensions.Options;
2+
using MQTTnet;
3+
using MQTTnet.Formatter;
4+
5+
namespace CodeCasa.AutomationPipelines.Lights.Mqtt.BackgroundService
6+
{
7+
public class MqttWorker(IMqttClient client, IOptions<MqttOptions> options)
8+
: Microsoft.Extensions.Hosting.BackgroundService
9+
{
10+
private readonly MqttOptions _options = options.Value;
11+
12+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
13+
{
14+
var connectionBuilder = new MqttClientOptionsBuilder()
15+
.WithTcpServer(_options.Host, _options.Port)
16+
.WithProtocolVersion(MqttProtocolVersion.V500);
17+
if (_options.User != null)
18+
{
19+
connectionBuilder.WithCredentials(_options.User, _options.Password!);
20+
}
21+
var connectOptions = connectionBuilder.Build();
22+
23+
while (!stoppingToken.IsCancellationRequested)
24+
{
25+
try
26+
{
27+
if (!client.IsConnected)
28+
{
29+
await client.ConnectAsync(connectOptions, stoppingToken);
30+
Console.WriteLine("MQTT Connected.");
31+
}
32+
}
33+
catch (Exception ex)
34+
{
35+
Console.WriteLine($"Connection failed: {ex.Message}. Retrying...");
36+
}
37+
38+
// Wait before checking connection status again
39+
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
40+
}
41+
}
42+
}
43+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.5" />
11+
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.5" />
12+
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.5" />
13+
<PackageReference Include="MQTTnet" Version="5.1.0.1559" />
14+
<PackageReference Include="Newtonsoft.Json" Version="13.0.4" />
15+
</ItemGroup>
16+
17+
<ItemGroup>
18+
<ProjectReference Include="..\CodeCasa.AutomationPipelines.Lights\CodeCasa.AutomationPipelines.Lights.csproj" />
19+
</ItemGroup>
20+
21+
</Project>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System.Reactive;
2+
using System.Reactive.Linq;
3+
using CodeCasa.AutomationPipelines.Lights.Pipeline;
4+
using CodeCasa.Lights;
5+
using Newtonsoft.Json;
6+
using Newtonsoft.Json.Serialization;
7+
8+
namespace CodeCasa.AutomationPipelines.Lights.Mqtt.Extensions
9+
{
10+
public static class ILightTransitionPipelineConfiguratorExtensions
11+
{
12+
public static ILightTransitionPipelineConfigurator<TLight> PublishTelemetryToMqtt<TLight>(this ILightTransitionPipelineConfigurator<TLight> configurator, MqttPublisher publisher) where TLight : ILight
13+
{
14+
configurator.OnCompleted(e =>
15+
{
16+
PublishPipelineState(publisher, e.Pipeline, e.Light.Id).GetAwaiter().GetResult();
17+
});
18+
configurator.ConfigureTelemetrySubscriber(stream =>
19+
{
20+
return stream.SelectMany(async t =>
21+
{
22+
await PublishPipelineState(publisher, t.Pipeline, t.Light.Id);
23+
return Unit.Default;
24+
}).Subscribe();
25+
});
26+
return configurator;
27+
}
28+
29+
private static async Task PublishPipelineState(MqttPublisher publisher, IPipeline<LightTransition> pipeline, string lightEntityId)
30+
{
31+
var settings = new JsonSerializerSettings { Formatting = Formatting.Indented, ContractResolver = new IgnoreObservablesContractResolver(), ReferenceLoopHandling = ReferenceLoopHandling.Ignore };
32+
33+
var jsonString = JsonConvert.SerializeObject(pipeline, settings);
34+
await publisher.PublishAsync(lightEntityId, jsonString);
35+
}
36+
37+
private class IgnoreObservablesContractResolver : DefaultContractResolver
38+
{
39+
protected override JsonProperty CreateProperty(System.Reflection.MemberInfo member, MemberSerialization memberSerialization)
40+
{
41+
var property = base.CreateProperty(member, memberSerialization);
42+
43+
if (property.PropertyType is not null &&
44+
property.PropertyType.IsGenericType &&
45+
property.PropertyType.GetGenericTypeDefinition() == typeof(IObservable<>))
46+
{
47+
property.ShouldSerialize = _ => false;
48+
}
49+
50+
return property;
51+
}
52+
}
53+
}
54+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using CodeCasa.AutomationPipelines.Lights.Mqtt.BackgroundService;
2+
using Microsoft.Extensions.Configuration;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using MQTTnet;
5+
6+
namespace CodeCasa.AutomationPipelines.Lights.Mqtt.Extensions;
7+
8+
public static class ServiceCollectionExtensions
9+
{
10+
public static IServiceCollection AddCodeCasaMqtt(this IServiceCollection serviceCollection, IConfiguration configuration)
11+
{
12+
serviceCollection.Configure<MqttOptions>(configuration.GetSection("MqttOptions"));
13+
14+
serviceCollection
15+
.AddSingleton(new MqttClientFactory().CreateMqttClient())
16+
.AddSingleton<MqttPublisher>();
17+
serviceCollection.AddHostedService<MqttWorker>();
18+
19+
return serviceCollection;
20+
}
21+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+

2+
namespace CodeCasa.AutomationPipelines.Lights.Mqtt
3+
{
4+
public class MqttOptions
5+
{
6+
public string Host { get; set; } = null!;
7+
public int Port { get; set; } = 1883;
8+
public string? User { get; set; }
9+
public string? Password { get; set; }
10+
public string BaseTopic { get; set; } = string.Empty;
11+
}
12+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using Microsoft.Extensions.Options;
2+
using MQTTnet;
3+
4+
namespace CodeCasa.AutomationPipelines.Lights.Mqtt
5+
{
6+
public class MqttPublisher(IMqttClient mqttClient, IOptions<MqttOptions> options)
7+
{
8+
private readonly MqttOptions _options = options.Value;
9+
10+
public async Task<MqttClientPublishResult?> PublishAsync(string entityId, string message)
11+
{
12+
if (!mqttClient.IsConnected)
13+
{
14+
return null;
15+
}
16+
17+
var mqttMessage = new MqttApplicationMessageBuilder()
18+
.WithTopic($"{_options.BaseTopic}/{entityId}")
19+
.WithPayload(message)
20+
.WithRetainFlag()
21+
.Build();
22+
23+
return await mqttClient.PublishAsync(mqttMessage);
24+
}
25+
}
26+
}

src/CodeCasa.AutomationPipelines.Lights/Extensions/ActionExtensions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ namespace CodeCasa.AutomationPipelines.Lights.Extensions
66
{
77
internal static class ActionExtensions
88
{
9-
public static Action<T> SetLoggingContext<T>(
9+
public static Action<T> ApplyHierarchySettings<T>(
1010
this Action<T> configure, string parentName, bool enableLogging)
1111
{
1212
return c =>
1313
{
14-
if (c is IInternalLoggingContext loggingContext)
14+
if (c is IPipelineHierarchyContext loggingContext)
1515
{
1616
loggingContext.SetParentName(parentName);
1717
if (enableLogging)
@@ -23,12 +23,12 @@ public static Action<T> SetLoggingContext<T>(
2323
};
2424
}
2525

26-
public static Action<T> SetLoggingContext<T, TLight>(
26+
public static Action<T> ApplyHierarchySettings<T, TLight>(
2727
this Action<T> configure, ILightTransitionReactiveNodeConfigurator<TLight> lightTransitionReactiveNodeConfigurator) where TLight : ILight
2828
{
2929
// Note: This method is used for convenience. As we use this internally only, we can assume that all implementations of ILightTransitionReactiveNodeConfigurator also implement IInternalLoggingContext.
30-
var loggingContext = (IInternalLoggingContext)lightTransitionReactiveNodeConfigurator;
31-
return configure.SetLoggingContext(loggingContext.LogName, loggingContext.LoggingEnabled ?? false);
30+
var loggingContext = (IPipelineHierarchyContext)lightTransitionReactiveNodeConfigurator;
31+
return configure.ApplyHierarchySettings(loggingContext.HierarchyPath, loggingContext.LoggingEnabled ?? false);
3232
}
3333
}
3434
}

src/CodeCasa.AutomationPipelines.Lights/Extensions/ObjectExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ namespace CodeCasa.AutomationPipelines.Lights.Extensions
33
{
44
internal static class ObjectExtensions
55
{
6-
public static T SetLoggingContext<T>(this T obj, string parentName, string name, bool enableLogging)
6+
public static T SetHierarchyContext<T>(this T obj, string parentName, string name, bool enableLogging)
77
{
8-
if (obj is IInternalLoggingContext loggingContext)
8+
if (obj is IPipelineHierarchyContext loggingContext)
99
{
1010
loggingContext.SetParentName(parentName);
1111
loggingContext.SetName(name);

src/CodeCasa.AutomationPipelines.Lights/Extensions/PipelineNodeFactoryExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@ namespace CodeCasa.AutomationPipelines.Lights.Extensions
66
{
77
internal static class PipelineNodeFactoryExtensions
88
{
9-
public static ServiceScopedNode<LightTransition> CreateScopedNode(
9+
public static ManagedNode<LightTransition> CreateScopedNode(
1010
this Func<IServiceProvider, IPipelineNode<LightTransition>> factory,
1111
IServiceProvider serviceProvider)
1212
{
1313
var scope = serviceProvider.CreateScope();
14-
return new ServiceScopedNode<LightTransition>(scope, factory(scope.ServiceProvider));
14+
return new ManagedNode<LightTransition>(scope, factory(scope.ServiceProvider));
1515
}
1616

17-
public static ServiceScopedNode<LightTransition>? CreateScopedNodeOrNull(
17+
public static ManagedNode<LightTransition>? CreateScopedNodeOrNull(
1818
this Func<IServiceProvider, IPipelineNode<LightTransition>?> factory,
1919
IServiceProvider serviceProvider)
2020
{
2121
var scope = serviceProvider.CreateScope();
2222
var node = factory(scope.ServiceProvider);
2323
if (node != null)
2424
{
25-
return new ServiceScopedNode<LightTransition>(scope, node);
25+
return new ManagedNode<LightTransition>(scope, node);
2626
}
2727
scope.Dispose();
2828
return null;

0 commit comments

Comments
 (0)