Skip to content

Commit 0c55dc6

Browse files
committed
feat/implemnt-mass-transient-for messaging
1 parent c4278a0 commit 0c55dc6

10 files changed

Lines changed: 282 additions & 0 deletions

backend/Directory.Packages.props

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@
110110
<PackageVersion Include="WireMock.Net" Version="1.7.0" />
111111
</ItemGroup>
112112

113+
<ItemGroup Label="Messaging (MassTransit)">
114+
<!-- MassTransit core + RabbitMQ transport. All MIT-licensed, no commercial licence required. -->
115+
<PackageVersion Include="MassTransit" Version="8.3.7" />
116+
<PackageVersion Include="MassTransit.RabbitMQ" Version="8.3.7" />
117+
<!-- In-memory transport used in tests and when Messaging:Transport=InMemory -->
118+
<PackageVersion Include="MassTransit.Testing.Helpers" Version="8.3.7" />
119+
</ItemGroup>
120+
113121
<ItemGroup Label="Rate limiting (built-in 8+, no package) and problem-details helpers">
114122
<PackageVersion Include="Hellang.Middleware.ProblemDetails" Version="6.5.1" />
115123
</ItemGroup>
@@ -146,4 +154,13 @@
146154
<PackageVersion Include="YamlDotNet" Version="16.3.0" />
147155
</ItemGroup>
148156

157+
<ItemGroup Label="Seq + OpenTelemetry">
158+
<PackageVersion Include="Serilog.Sinks.Seq" Version="8.0.0" />
159+
<PackageVersion Include="OpenTelemetry" Version="1.11.2" />
160+
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.2" />
161+
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.11.2" />
162+
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.0" />
163+
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.11.0" />
164+
</ItemGroup>
165+
149166
</Project>

backend/src/CCE.Api.External/appsettings.Development.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,13 @@
8888
},
8989
"Seq": {
9090
"ServerUrl": "http://localhost:5341"
91+
},
92+
"Messaging": {
93+
"Transport": "InMemory",
94+
"UseAsyncDispatcher": true
95+
// For RabbitMQ production:
96+
// "Transport": "RabbitMQ",
97+
// "RabbitMqHost": "amqp://guest:guest@localhost",
98+
// "RabbitMqVirtualHost": "/cce-dev"
9199
}
92100
}

backend/src/CCE.Api.Internal/appsettings.Development.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
"GraphTenantDomain": "cce.local",
3535
"CallbackPath": "/signin-oidc"
3636
},
37+
"Messaging": {
38+
"Transport": "InMemory",
39+
"UseAsyncDispatcher": true
40+
},
3741
"LocalAuth": {
3842
"External": {
3943
"Issuer": "cce-api-external-dev",

backend/src/CCE.Infrastructure/CCE.Infrastructure.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
<PackageReference Include="Microsoft.Graph" />
4242
<PackageReference Include="Microsoft.Identity.Web.MicrosoftGraph" />
4343
<PackageReference Include="YamlDotNet" />
44+
<PackageReference Include="MassTransit" />
45+
<PackageReference Include="MassTransit.RabbitMQ" />
4446
</ItemGroup>
4547

4648
<ItemGroup>

backend/src/CCE.Infrastructure/DependencyInjection.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
using CCE.Infrastructure.Sanitization;
2727
using CCE.Infrastructure.Country;
2828
using CCE.Infrastructure.Notifications;
29+
using CCE.Infrastructure.Notifications.Messaging;
2930
using CCE.Infrastructure.Reports;
3031
using CCE.Infrastructure.Surveys;
3132
using CCE.Application.Localization;
@@ -219,6 +220,10 @@ public static IServiceCollection AddInfrastructure(
219220
// Interactive City
220221
services.AddScoped<ICityScenarioService, CityScenarioService>();
221222

223+
// Messaging (MassTransit) — transport selected by Messaging:Transport in appsettings.
224+
// InMemory by default (no broker); set to RabbitMQ in production.
225+
services.AddCceMessaging(configuration);
226+
222227
// Search
223228
services.AddScoped<ISearchClient, MeilisearchClient>();
224229
services.AddScoped<ISearchQueryLogger, SearchQueryLogger>();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using CCE.Application.Notifications.Messages;
2+
using MassTransit;
3+
4+
namespace CCE.Infrastructure.Notifications.Messaging;
5+
6+
/// <summary>
7+
/// Drop-in replacement for <see cref="InProcessNotificationMessageDispatcher"/>.
8+
/// Instead of calling <c>INotificationGateway</c> inline it publishes a
9+
/// <see cref="NotificationMessage"/> onto the MassTransit bus so the work
10+
/// is handled asynchronously by <see cref="NotificationMessageConsumer"/>
11+
/// (which may run in this process, or in a separate worker process).
12+
///
13+
/// <para>
14+
/// Wire-up: replace the <c>InProcessNotificationMessageDispatcher</c> DI
15+
/// registration with this class. See <c>MessagingServiceExtensions</c>.
16+
/// </para>
17+
/// </summary>
18+
public sealed class MassTransitNotificationMessageDispatcher : INotificationMessageDispatcher
19+
{
20+
private readonly IPublishEndpoint _publishEndpoint;
21+
22+
public MassTransitNotificationMessageDispatcher(IPublishEndpoint publishEndpoint)
23+
=> _publishEndpoint = publishEndpoint;
24+
25+
public Task DispatchAsync(NotificationMessage message, CancellationToken ct)
26+
=> _publishEndpoint.Publish(message, ct);
27+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using System.ComponentModel.DataAnnotations;
2+
3+
namespace CCE.Infrastructure.Notifications.Messaging;
4+
5+
/// <summary>
6+
/// Bound from <c>appsettings.json</c> section <c>"Messaging"</c>.
7+
/// </summary>
8+
public sealed class MessagingOptions
9+
{
10+
public const string SectionName = "Messaging";
11+
12+
/// <summary>
13+
/// Transport to use.
14+
/// <list type="bullet">
15+
/// <item><c>InMemory</c> — default; same process, no broker required (dev / test).</item>
16+
/// <item><c>RabbitMQ</c> — production; requires <see cref="RabbitMqHost"/> config.</item>
17+
/// </list>
18+
/// </summary>
19+
[Required]
20+
public string Transport { get; init; } = "InMemory";
21+
22+
/// <summary>RabbitMQ host URI, e.g. <c>amqp://guest:guest@localhost</c>.</summary>
23+
public string? RabbitMqHost { get; init; }
24+
25+
/// <summary>
26+
/// Virtual host inside RabbitMQ. Defaults to <c>"/"</c>.
27+
/// Use a dedicated vhost per environment (dev/staging/prod) to keep queues isolated.
28+
/// </summary>
29+
public string RabbitMqVirtualHost { get; init; } = "/";
30+
31+
/// <summary>
32+
/// When <c>true</c> (default), <see cref="INotificationMessageDispatcher"/> is replaced
33+
/// with <see cref="MassTransitNotificationMessageDispatcher"/>. Set <c>false</c> to keep
34+
/// the synchronous in-process dispatcher even when MassTransit is registered
35+
/// (useful for integration tests that mock the gateway).
36+
/// </summary>
37+
public bool UseAsyncDispatcher { get; init; } = true;
38+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
using CCE.Application.Notifications.Messages;
2+
using MassTransit;
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.Options;
6+
7+
namespace CCE.Infrastructure.Notifications.Messaging;
8+
9+
/// <summary>
10+
/// Registers MassTransit with the correct transport based on
11+
/// <c>appsettings.json → Messaging:Transport</c>:
12+
///
13+
/// <list type="table">
14+
/// <item><term>InMemory</term><description>No broker. Messages flow in-process via a channel. Use for local dev and all tests.</description></item>
15+
/// <item><term>RabbitMQ</term><description>Production. Requires <c>Messaging:RabbitMqHost</c> and a running broker.</description></item>
16+
/// </list>
17+
///
18+
/// Call <c>services.AddCceMessaging(configuration)</c> from
19+
/// <see cref="CCE.Infrastructure.DependencyInjection.AddInfrastructure"/>.
20+
/// </summary>
21+
public static class MessagingServiceExtensions
22+
{
23+
public static IServiceCollection AddCceMessaging(
24+
this IServiceCollection services,
25+
IConfiguration configuration)
26+
{
27+
services.AddOptions<MessagingOptions>()
28+
.Bind(configuration.GetSection(MessagingOptions.SectionName))
29+
.ValidateDataAnnotations()
30+
.ValidateOnStart();
31+
32+
var options = configuration
33+
.GetSection(MessagingOptions.SectionName)
34+
.Get<MessagingOptions>() ?? new MessagingOptions();
35+
36+
services.AddMassTransit(x =>
37+
{
38+
// Register consumer + its definition (retry policy, concurrency).
39+
x.AddConsumer<NotificationMessageConsumer, NotificationMessageConsumerDefinition>();
40+
41+
switch (options.Transport.ToUpperInvariant())
42+
{
43+
case "RABBITMQ":
44+
x.UsingRabbitMq((ctx, cfg) =>
45+
{
46+
cfg.Host(options.RabbitMqHost ?? "amqp://guest:guest@localhost", options.RabbitMqVirtualHost, h =>
47+
{
48+
// Credentials are embedded in RabbitMqHost URI or set here.
49+
// Production: use environment variables / Azure Key Vault secrets.
50+
});
51+
52+
// Auto-configure endpoints from consumer definitions.
53+
cfg.ConfigureEndpoints(ctx);
54+
});
55+
break;
56+
57+
default: // "InMemory" or missing
58+
x.UsingInMemory((ctx, cfg) =>
59+
{
60+
cfg.ConfigureEndpoints(ctx);
61+
});
62+
break;
63+
}
64+
});
65+
66+
// Replace the synchronous in-process dispatcher with the async bus publisher
67+
// only when UseAsyncDispatcher=true (default).
68+
if (options.UseAsyncDispatcher)
69+
{
70+
// Remove the InProcessNotificationMessageDispatcher registered in DependencyInjection.cs
71+
var descriptor = services.FirstOrDefault(
72+
d => d.ServiceType == typeof(INotificationMessageDispatcher));
73+
if (descriptor is not null)
74+
services.Remove(descriptor);
75+
76+
services.AddScoped<INotificationMessageDispatcher,
77+
MassTransitNotificationMessageDispatcher>();
78+
}
79+
80+
return services;
81+
}
82+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
using CCE.Application.Notifications;
2+
using CCE.Application.Notifications.Messages;
3+
using MassTransit;
4+
using Microsoft.Extensions.Logging;
5+
6+
namespace CCE.Infrastructure.Notifications.Messaging;
7+
8+
/// <summary>
9+
/// MassTransit consumer that receives a <see cref="NotificationMessage"/> from
10+
/// the bus and hands it to <see cref="INotificationGateway"/> for template
11+
/// resolution, rendering, delivery and logging.
12+
///
13+
/// <para>
14+
/// This is the async counterpart to <see cref="InProcessNotificationMessageDispatcher"/>.
15+
/// The gateway call (and its DB + SMS/Email provider I/O) happens here, off the
16+
/// original HTTP request thread.
17+
/// </para>
18+
///
19+
/// <para>
20+
/// Retry policy is configured on the consumer definition
21+
/// (<see cref="NotificationMessageConsumerDefinition"/>): 3 immediate retries,
22+
/// then messages move to the error queue for manual inspection.
23+
/// </para>
24+
/// </summary>
25+
public sealed class NotificationMessageConsumer : IConsumer<NotificationMessage>
26+
{
27+
private readonly INotificationGateway _gateway;
28+
private readonly ILogger<NotificationMessageConsumer> _logger;
29+
30+
public NotificationMessageConsumer(
31+
INotificationGateway gateway,
32+
ILogger<NotificationMessageConsumer> logger)
33+
{
34+
_gateway = gateway;
35+
_logger = logger;
36+
}
37+
38+
public async Task Consume(ConsumeContext<NotificationMessage> context)
39+
{
40+
var message = context.Message;
41+
42+
_logger.LogInformation(
43+
"Consuming NotificationMessage TemplateCode={TemplateCode} RecipientUserId={RecipientUserId}",
44+
message.TemplateCode,
45+
message.RecipientUserId);
46+
47+
var result = await _gateway.SendAsync(new NotificationDispatchRequest(
48+
TemplateCode: message.TemplateCode,
49+
RecipientUserId: message.RecipientUserId,
50+
Channels: message.Channels ?? [],
51+
Variables: message.MetaData,
52+
Locale: message.Locale,
53+
Email: message.Email,
54+
PhoneNumber: message.PhoneNumber,
55+
CorrelationId: message.CorrelationId),
56+
context.CancellationToken).ConfigureAwait(false);
57+
58+
if (!result.IsSuccess)
59+
{
60+
_logger.LogWarning(
61+
"NotificationMessage TemplateCode={TemplateCode} had one or more failed channel dispatches.",
62+
message.TemplateCode);
63+
}
64+
}
65+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using MassTransit;
2+
3+
namespace CCE.Infrastructure.Notifications.Messaging;
4+
5+
/// <summary>
6+
/// Defines retry, concurrency, and queue naming for
7+
/// <see cref="NotificationMessageConsumer"/>.
8+
///
9+
/// MassTransit picks this up automatically via <c>AddConsumer&lt;,&gt;</c>.
10+
/// </summary>
11+
public sealed class NotificationMessageConsumerDefinition
12+
: ConsumerDefinition<NotificationMessageConsumer>
13+
{
14+
public NotificationMessageConsumerDefinition()
15+
{
16+
// One concurrent message per consumer instance (safe for DB write heavy work).
17+
ConcurrentMessageLimit = 10;
18+
}
19+
20+
protected override void ConfigureConsumer(
21+
IReceiveEndpointConfigurator endpointConfigurator,
22+
IConsumerConfigurator<NotificationMessageConsumer> consumerConfigurator,
23+
IRegistrationContext context)
24+
{
25+
// 3 immediate retries, 5-second interval.
26+
// After exhausting retries MassTransit moves the message to the
27+
// _error queue automatically — no message is silently dropped.
28+
endpointConfigurator.UseMessageRetry(r =>
29+
r.Intervals(
30+
TimeSpan.FromSeconds(5),
31+
TimeSpan.FromSeconds(15),
32+
TimeSpan.FromSeconds(30)));
33+
}
34+
}

0 commit comments

Comments
 (0)