Skip to content

Commit 0595fcc

Browse files
committed
Add key metrics to improve observability
Adds events/commands metrics, and duration histograms for both processing (commands) and publishing (events).
1 parent 5ef9f15 commit 0595fcc

7 files changed

Lines changed: 194 additions & 44 deletions

File tree

readme.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,41 @@ negatively impacting run-time app startup, via the dependency on
231231
This also ensures that all proper service interfaces are registered for the various
232232
components.
233233

234+
### Telemetry and Monitoring
235+
236+
The core implementation of the `IMessageBus` is instrumented with `ActivitySource` and
237+
`Metric`, providing out of the box support for [Open Telemetry](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs)-based monitoring, as well
238+
as via [dotnet trace](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/dotnet-trace)
239+
and [dotnet counters](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/dotnet-counters).
240+
241+
To export telemetry using [Open Telemetry](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs),
242+
for example:
243+
244+
```csharp
245+
using var tracer = Sdk
246+
.CreateTracerProviderBuilder()
247+
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("ConsoleApp"))
248+
.AddSource(source.Name)
249+
.AddSource("Merq.Core")
250+
.AddSource("Merq.AutoMapper")
251+
.AddConsoleExporter()
252+
.AddZipkinExporter()
253+
.AddAzureMonitorTraceExporter(o => o.ConnectionString = config["AppInsights"])
254+
.Build();
255+
```
256+
257+
Collecting traces via [dotnet-trace](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/dotnet-trace):
258+
259+
```shell
260+
dotnet trace collect --name [PROCESS_NAME] --providers="Microsoft-Diagnostics-DiagnosticSource:::FilterAndPayloadSpecs=[AS]Merq,System.Diagnostics.Metrics:::Metrics=Merq"
261+
```
262+
263+
Monitoring metrics via [dotnet-counters](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/dotnet-counters):
264+
265+
```shell
266+
dotnet counters monitor --process-id [PROCESS_ID] --counters Merq
267+
```
268+
234269
## Duck Typing Support
235270

236271
<!-- #duck -->

src/Merq.Core/MessageBus.cs

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Concurrent;
3+
using System.Diagnostics;
34
using System.Linq;
45
using System.Reactive.Disposables;
56
using System.Reactive.Subjects;
@@ -9,6 +10,7 @@
910
using System.Threading.Tasks;
1011
using Microsoft.Extensions.DependencyInjection;
1112
using static Merq.Telemetry;
13+
using Tag = System.Collections.Generic.KeyValuePair<string, object?>;
1214

1315
namespace Merq;
1416

@@ -188,7 +190,7 @@ public bool CanHandle(IExecutable command) => canHandleMap.GetOrAdd(GetCommandTy
188190
public void Execute(ICommand command)
189191
{
190192
var type = GetCommandType(command);
191-
using var activity = StartActivity(type, Process);
193+
using var activity = StartActivity(type, Telemetry.Process);
192194

193195
try
194196
{
@@ -220,7 +222,7 @@ public void Execute(ICommand command)
220222
public TResult Execute<TResult>(ICommand<TResult> command)
221223
{
222224
var type = GetCommandType(command);
223-
using var activity = StartActivity(type, Process);
225+
using var activity = StartActivity(type, Telemetry.Process);
224226

225227
try
226228
{
@@ -251,7 +253,7 @@ public TResult Execute<TResult>(ICommand<TResult> command)
251253
public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation = default)
252254
{
253255
var type = GetCommandType(command);
254-
using var activity = StartActivity(type, Process);
256+
using var activity = StartActivity(type, Telemetry.Process);
255257

256258
try
257259
{
@@ -284,7 +286,7 @@ public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation =
284286
public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, CancellationToken cancellation = default)
285287
{
286288
var type = GetCommandType(command);
287-
using var activity = StartActivity(type, Process);
289+
using var activity = StartActivity(type, Telemetry.Process);
288290

289291
try
290292
{
@@ -314,41 +316,49 @@ public void Notify<TEvent>(TEvent e)
314316
{
315317
var type = (e ?? throw new ArgumentNullException(nameof(e))).GetType();
316318
using var activity = StartActivity(type, Publish);
319+
var watch = Stopwatch.StartNew();
317320

318-
// TODO: if we prevent Notify for externally produced events, we won't be
319-
// able to notify base event subscribers when those events are produced.
320-
//var producer = services.GetService<IObservable<TEvent>>();
321-
//if (producer != null)
322-
// throw new NotSupportedException($"Cannot explicitly notify event {type} because it is externally produced by {producer.GetType()}.");
323-
324-
// We call all subjects that are compatible with
325-
// the event type, not just concrete event type subscribers.
326-
// Also adds as compatible the dynamic conversion ones.
327-
var compatible = compatibleSubjects.GetOrAdd(type, eventType => subjects.Keys
328-
.Where(subjectEventType => subjectEventType.IsAssignableFrom(eventType))
329-
.Select(subjectEventType => subjects[subjectEventType])
330-
.Concat(dynamicSubjects
331-
.GetOrAdd(type.FullName, _ => new())
332-
.Where(pair => pair.Key != type && pair.Value != null)
333-
.Select(pair => pair.Value!))
334-
.ToArray());
335-
336-
foreach (var subject in compatible)
321+
try
337322
{
338-
try
323+
// TODO: if we prevent Notify for externally produced events, we won't be
324+
// able to notify base event subscribers when those events are produced.
325+
//var producer = services.GetService<IObservable<TEvent>>();
326+
//if (producer != null)
327+
// throw new NotSupportedException($"Cannot explicitly notify event {type} because it is externally produced by {producer.GetType()}.");
328+
329+
// We call all subjects that are compatible with
330+
// the event type, not just concrete event type subscribers.
331+
// Also adds as compatible the dynamic conversion ones.
332+
var compatible = compatibleSubjects.GetOrAdd(type, eventType => subjects.Keys
333+
.Where(subjectEventType => subjectEventType.IsAssignableFrom(eventType))
334+
.Select(subjectEventType => subjects[subjectEventType])
335+
.Concat(dynamicSubjects
336+
.GetOrAdd(type.FullName, _ => new())
337+
.Where(pair => pair.Key != type && pair.Value != null)
338+
.Select(pair => pair.Value!))
339+
.ToArray());
340+
341+
foreach (var subject in compatible)
339342
{
340-
subject.OnNext(e);
341-
}
342-
catch (Exception ex)
343-
{
344-
activity.RecordException(ex);
345-
// TODO: should we swallow the exception and remove the
346-
// failing subscribers?
347-
// Rethrow original exception to preserve stacktrace.
348-
ExceptionDispatchInfo.Capture(ex).Throw();
349-
throw;
343+
try
344+
{
345+
subject.OnNext(e);
346+
}
347+
catch (Exception ex)
348+
{
349+
activity.RecordException(ex);
350+
// TODO: should we swallow the exception and remove the
351+
// failing subscribers?
352+
// Rethrow original exception to preserve stacktrace.
353+
ExceptionDispatchInfo.Capture(ex).Throw();
354+
throw;
355+
}
350356
}
351357
}
358+
finally
359+
{
360+
Publishing.Record(watch.ElapsedMilliseconds, new Tag("Event", type.FullName));
361+
}
352362
}
353363

354364
/// <summary>
@@ -487,8 +497,18 @@ void ExecuteCore<TCommand>(TCommand command) where TCommand : ICommand
487497
var handler = services.GetService<ICommandHandler<TCommand>>();
488498
if (handler != null)
489499
{
490-
handler.Execute(command);
491-
return;
500+
var watch = Stopwatch.StartNew();
501+
try
502+
{
503+
handler.Execute(command);
504+
return;
505+
}
506+
finally
507+
{
508+
Processing.Record(watch.ElapsedMilliseconds,
509+
new Tag("Command", typeof(TCommand).FullName),
510+
new Tag("Handler", handler.GetType().FullName));
511+
}
492512
}
493513

494514
// See if we can convert from the TCommand to a compatible type with
@@ -509,7 +529,17 @@ Task ExecuteAsyncCore<TCommand>(TCommand command, CancellationToken cancellation
509529
var handler = services.GetService<IAsyncCommandHandler<TCommand>>();
510530
if (handler != null)
511531
{
512-
return handler.ExecuteAsync(command, cancellation);
532+
var watch = Stopwatch.StartNew();
533+
try
534+
{
535+
return handler.ExecuteAsync(command, cancellation);
536+
}
537+
finally
538+
{
539+
Processing.Record(watch.ElapsedMilliseconds,
540+
new Tag("Command", typeof(TCommand).FullName),
541+
new Tag("Handler", handler.GetType().FullName));
542+
}
513543
}
514544

515545
// See if we can convert from the TCommand to a compatible type with
@@ -528,7 +558,19 @@ TResult ExecuteCore<TCommand, TResult>(TCommand command) where TCommand : IComma
528558
{
529559
var handler = services.GetService<ICommandHandler<TCommand, TResult>>();
530560
if (handler != null)
531-
return handler.Execute(command);
561+
{
562+
var watch = Stopwatch.StartNew();
563+
try
564+
{
565+
return handler.Execute(command);
566+
}
567+
finally
568+
{
569+
Processing.Record(watch.ElapsedMilliseconds,
570+
new Tag("Command", typeof(TCommand).FullName),
571+
new Tag("Handler", handler.GetType().FullName));
572+
}
573+
}
532574

533575
// See if we can convert from the TCommand to a compatible type with
534576
// a registered command handler
@@ -546,7 +588,19 @@ Task<TResult> ExecuteAsyncCore<TCommand, TResult>(TCommand command, Cancellation
546588
{
547589
var handler = services.GetService<IAsyncCommandHandler<TCommand, TResult>>();
548590
if (handler != null)
549-
return handler.ExecuteAsync(command, cancellation);
591+
{
592+
var watch = Stopwatch.StartNew();
593+
try
594+
{
595+
return handler.ExecuteAsync(command, cancellation);
596+
}
597+
finally
598+
{
599+
Processing.Record(watch.ElapsedMilliseconds,
600+
new Tag("Command", typeof(TCommand).FullName),
601+
new Tag("Handler", handler.GetType().FullName));
602+
}
603+
}
550604

551605
// See if we can convert from the TCommand to a compatible type with
552606
// a registered command handler

src/Merq.Core/Subject.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Collections.Concurrent;
2-
using System.Diagnostics;
32
using static Merq.Telemetry;
43

54
namespace System.Reactive.Subjects;

src/Merq.Core/Telemetry.cs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,36 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Diagnostics;
4+
using System.Diagnostics.Metrics;
35
using System.Runtime.CompilerServices;
46

57
namespace Merq;
68

79
static class Telemetry
810
{
9-
static readonly ActivitySource tracer = new(ThisAssembly.Project.AssemblyName, ThisAssembly.Project.Version);
11+
static readonly ActivitySource tracer = new("Merq", ThisAssembly.Project.Version);
12+
static Meter Meter { get; } = new Meter("Merq", ThisAssembly.Project.Version);
13+
14+
static readonly Counter<long> commands;
15+
static readonly Counter<long> events;
16+
17+
/// <summary>
18+
/// Duration of commands executed by the Merq message bus.
19+
/// </summary>
20+
public static Histogram<long> Processing { get; } =
21+
Meter.CreateHistogram<long>("Processing", unit: "ms", description: "Duration of commands executed by the Merq message bus.");
22+
23+
/// <summary>
24+
/// Duration of event publishing by the Merq message bus.
25+
/// </summary>
26+
public static Histogram<long> Publishing { get; } =
27+
Meter.CreateHistogram<long>("Publishing", unit: "ms", description: "Duration of event publishing by the Merq message bus.");
28+
29+
static Telemetry()
30+
{
31+
commands = Meter.CreateCounter<long>("Commands", description: "Commands executed by the Merq message bus.");
32+
events = Meter.CreateCounter<long>("Events", description: "Events published to the Merq message bus.");
33+
}
1034

1135
// See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#operation-names
1236
// publish: A message is sent to a destination by a message producer/client.
@@ -21,27 +45,37 @@ static class Telemetry
2145
public const string Process = nameof(Process);
2246

2347
public static Activity? StartActivity(Type type, string operation, [CallerMemberName] string? member = default, [CallerFilePath] string? file = default, [CallerLineNumber] int? line = default)
48+
{
49+
if (operation == Publish)
50+
events.Add(1, new KeyValuePair<string, object?>("Name", type.FullName));
51+
else if (operation == Process)
52+
commands.Add(1, new KeyValuePair<string, object?>("Name", type.FullName));
53+
2454
// Span name convention should be: <destination> <operation> (see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#span-name)
2555
// Requirement is that the destination has low cardinality. In our case, the destination is
2656
// the logical operation being performed, such as "Execute", "Notify" or "Deliver". The
2757
// operation is actually the type being acted on (such as CreateUser -a command- or UserCreated -event).
28-
=> tracer.StartActivity(ActivityKind.Producer, name: $"{operation} {type.FullName}")
58+
return tracer.StartActivity(ActivityKind.Producer, name: $"{operation}/{type.FullName}")
2959
?.SetTag("code.function", member)
3060
?.SetTag("code.filepath", file)
3161
?.SetTag("code.lineno", line)
3262
?.SetTag("messaging.system", "merq")
3363
?.SetTag("messaging.destination.name", type.FullName)
3464
?.SetTag("messaging.destination.kind", "topic")
35-
?.SetTag("messaging.operation", operation)
65+
?.SetTag("messaging.operation", operation.ToLowerInvariant())
3666
?.SetTag("messaging.protocol.name", type.Assembly.GetName().Name)
3767
?.SetTag("messaging.protocol.version", type.Assembly.GetName().Version?.ToString() ?? "unknown");
68+
}
3869

3970
public static void RecordException(this Activity? activity, Exception e)
4071
{
4172
if (activity != null)
4273
{
4374
activity.SetStatus(ActivityStatusCode.Error, e.Message);
4475

76+
activity.SetTag("otel.status_code", "ERROR");
77+
activity.SetTag("otel.status_description", e.Message);
78+
4579
// See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/exceptions.md
4680
activity.AddEvent(new ActivityEvent("exception", tags: new()
4781
{

src/Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<PackageReference Update="Devlooped.Extensions.DependencyInjection.Attributed" Version="1.3.2" />
3030
<PackageReference Update="RxFree" Version="1.1.2" />
3131
<PackageReference Update="Devlooped.SponsorLink" Version="v0.11.0" />
32+
<PackageReference Update="Microsoft.Extensions.Logging.EventSource" Version="6.0.0" />
3233
</ItemGroup>
3334
<ItemGroup Label="Tests">
3435
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.6.3" />

src/Samples/ConsoleApp/ConsoleApp.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
<PackageReference Include="Azure.Monitor.OpenTelemetry.Exporter" />
1616
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" />
1717
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
18+
<PackageReference Include="Microsoft.Extensions.Logging.EventSource" />
1819
<PackageReference Include="OpenTelemetry.Exporter.Console" />
1920
<PackageReference Include="OpenTelemetry.Exporter.Zipkin" />
2021
<PackageReference Include="RxFree" />

0 commit comments

Comments
 (0)