Skip to content

Commit 6fe98db

Browse files
Copilotsamtrion
andcommitted
feat: add KafkaTransportOptions for configurable Kafka outbox transport
- Add KafkaTransportOptions with DefaultPartitionCount, DefaultReplicationFactor, AutoCreateTopics, MessageRetention - Update KafkaMessageTransport to inject IOptions<KafkaTransportOptions> and auto-create topics - Update UseKafkaTransport() to accept Action<KafkaTransportOptions>? configureOptions parameter - Add unit tests for all new options (partition count, replication factor, AutoCreateTopics=false, retention, constructor null check)" Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/6dda0741-f9e5-4596-9ad2-028cc33d0cbc Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com>
1 parent 4ffc7d8 commit 6fe98db

5 files changed

Lines changed: 318 additions & 10 deletions

File tree

src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public static class KafkaExtensions
1616
/// Registers the Kafka outbox transport so that outbox messages are produced to Kafka topics.
1717
/// </summary>
1818
/// <param name="configurator">The mediator configurator.</param>
19+
/// <param name="configureOptions">Optional action to configure <see cref="KafkaTransportOptions"/>.</param>
1920
/// <returns>The same <paramref name="configurator" /> instance for chaining.</returns>
2021
/// <remarks>
2122
/// <para><strong>Prerequisites:</strong></para>
@@ -41,12 +42,21 @@ public static class KafkaExtensions
4142
/// Replaces any previously registered <see cref="IMessageTransport" />.
4243
/// </remarks>
4344
/// <exception cref="ArgumentNullException">Thrown if <paramref name="configurator" /> is null.</exception>
44-
public static IMediatorBuilder UseKafkaTransport(this IMediatorBuilder configurator)
45+
public static IMediatorBuilder UseKafkaTransport(
46+
this IMediatorBuilder configurator,
47+
Action<KafkaTransportOptions>? configureOptions = null
48+
)
4549
{
4650
ArgumentNullException.ThrowIfNull(configurator);
4751

4852
var services = configurator.Services;
4953

54+
_ = services.AddOptions<KafkaTransportOptions>();
55+
if (configureOptions is not null)
56+
{
57+
_ = services.Configure(configureOptions);
58+
}
59+
5060
var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport));
5161
if (existing is not null)
5262
{

src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ namespace NetEvolve.Pulse.Outbox;
33
using System.Collections.Concurrent;
44
using System.Diagnostics.CodeAnalysis;
55
using System.Globalization;
6+
using System.Linq;
67
using System.Text;
78
using Confluent.Kafka;
9+
using Confluent.Kafka.Admin;
10+
using Microsoft.Extensions.Options;
811
using NetEvolve.Pulse.Extensibility.Outbox;
912

1013
/// <summary>
@@ -21,26 +24,32 @@ public sealed class KafkaMessageTransport : IMessageTransport
2124
private readonly IProducer<string, string> _producer;
2225
private readonly IAdminClient _adminClient;
2326
private readonly ITopicNameResolver _topicNameResolver;
27+
private readonly KafkaTransportOptions _options;
28+
private readonly ConcurrentDictionary<string, bool> _ensuredTopics = new(StringComparer.Ordinal);
2429

2530
/// <summary>
2631
/// Initializes a new instance of <see cref="KafkaMessageTransport" />.
2732
/// </summary>
2833
/// <param name="producer">The Kafka producer, registered in DI by the caller.</param>
2934
/// <param name="adminClient">The Kafka admin client, registered in DI by the caller.</param>
3035
/// <param name="topicNameResolver">The resolver that maps each outbox message to a Kafka topic name.</param>
36+
/// <param name="options">The transport options.</param>
3137
public KafkaMessageTransport(
3238
IProducer<string, string> producer,
3339
IAdminClient adminClient,
34-
ITopicNameResolver topicNameResolver
40+
ITopicNameResolver topicNameResolver,
41+
IOptions<KafkaTransportOptions> options
3542
)
3643
{
3744
ArgumentNullException.ThrowIfNull(producer);
3845
ArgumentNullException.ThrowIfNull(adminClient);
3946
ArgumentNullException.ThrowIfNull(topicNameResolver);
47+
ArgumentNullException.ThrowIfNull(options);
4048

4149
_producer = producer;
4250
_adminClient = adminClient;
4351
_topicNameResolver = topicNameResolver;
52+
_options = options.Value;
4453
}
4554

4655
/// <inheritdoc />
@@ -49,6 +58,9 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
4958
ArgumentNullException.ThrowIfNull(message);
5059

5160
var topic = _topicNameResolver.Resolve(message);
61+
62+
await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false);
63+
5264
var kafkaMessage = CreateKafkaMessage(message);
5365

5466
_ = await _producer.ProduceAsync(topic, kafkaMessage, cancellationToken).ConfigureAwait(false);
@@ -60,7 +72,7 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
6072
"CA1849:Call async methods when in an async method",
6173
Justification = "Intentional fire-and-forget batch pattern. Produce() enqueues messages with a delivery-report callback; awaiting ProduceAsync() per message would serialize delivery and defeat the purpose of batching."
6274
)]
63-
public Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
75+
public async Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
6476
{
6577
ArgumentNullException.ThrowIfNull(messages);
6678

@@ -69,6 +81,9 @@ public Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToke
6981
foreach (var message in messages)
7082
{
7183
var topic = _topicNameResolver.Resolve(message);
84+
85+
await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false);
86+
7287
var kafkaMessage = CreateKafkaMessage(message);
7388

7489
try
@@ -95,10 +110,8 @@ public Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToke
95110

96111
if (!errors.IsEmpty)
97112
{
98-
return Task.FromException(new AggregateException(errors));
113+
throw new AggregateException(errors);
99114
}
100-
101-
return Task.CompletedTask;
102115
}
103116

104117
/// <inheritdoc />
@@ -115,6 +128,43 @@ public Task<bool> IsHealthyAsync(CancellationToken cancellationToken = default)
115128
}
116129
}
117130

131+
private async Task EnsureTopicAsync(string topic, CancellationToken cancellationToken)
132+
{
133+
if (!_options.AutoCreateTopics || _ensuredTopics.ContainsKey(topic))
134+
{
135+
return;
136+
}
137+
138+
cancellationToken.ThrowIfCancellationRequested();
139+
140+
var configs = new Dictionary<string, string>();
141+
142+
if (_options.MessageRetention.HasValue)
143+
{
144+
configs["retention.ms"] = ((long)_options.MessageRetention.Value.TotalMilliseconds).ToString(
145+
CultureInfo.InvariantCulture
146+
);
147+
}
148+
149+
var spec = new TopicSpecification
150+
{
151+
Name = topic,
152+
NumPartitions = _options.DefaultPartitionCount,
153+
ReplicationFactor = _options.DefaultReplicationFactor,
154+
Configs = configs.Count > 0 ? configs : null,
155+
};
156+
157+
try
158+
{
159+
await _adminClient.CreateTopicsAsync([spec]).ConfigureAwait(false);
160+
_ = _ensuredTopics.TryAdd(topic, true);
161+
}
162+
catch (CreateTopicsException ex) when (ex.Results.All(static r => r.Error.Code == ErrorCode.TopicAlreadyExists))
163+
{
164+
_ = _ensuredTopics.TryAdd(topic, true);
165+
}
166+
}
167+
118168
private static Message<string, string> CreateKafkaMessage(OutboxMessage message)
119169
{
120170
var eventTypeName = message.EventType.ToOutboxEventTypeName();
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
namespace NetEvolve.Pulse.Outbox;
2+
3+
/// <summary>
4+
/// Configuration options for <see cref="KafkaMessageTransport"/>.
5+
/// </summary>
6+
public sealed class KafkaTransportOptions
7+
{
8+
/// <summary>
9+
/// Gets or sets the default number of partitions for auto-created topics.
10+
/// </summary>
11+
/// <remarks>Defaults to <c>1</c>.</remarks>
12+
public int DefaultPartitionCount { get; set; } = 1;
13+
14+
/// <summary>
15+
/// Gets or sets the default replication factor for auto-created topics.
16+
/// </summary>
17+
/// <remarks>Defaults to <c>1</c>.</remarks>
18+
public short DefaultReplicationFactor { get; set; } = 1;
19+
20+
/// <summary>
21+
/// Gets or sets a value indicating whether topics should be automatically created before sending messages.
22+
/// </summary>
23+
/// <remarks>Defaults to <see langword="true"/>.</remarks>
24+
public bool AutoCreateTopics { get; set; } = true;
25+
26+
/// <summary>
27+
/// Gets or sets the message retention duration applied to auto-created topics.
28+
/// </summary>
29+
/// <remarks>When <see langword="null"/>, the broker default retention policy is used.</remarks>
30+
public TimeSpan? MessageRetention { get; set; }
31+
}

tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
namespace NetEvolve.Pulse.Tests.Unit.Kafka;
22

3+
using System;
34
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.Options;
46
using NetEvolve.Extensions.TUnit;
57
using NetEvolve.Pulse;
68
using NetEvolve.Pulse.Extensibility;
@@ -59,6 +61,50 @@ public async Task UseKafkaTransport_Does_not_register_adapters()
5961
.IsFalse();
6062
}
6163

64+
[Test]
65+
public async Task UseKafkaTransport_With_configureOptions_registers_options()
66+
{
67+
IServiceCollection services = new ServiceCollection();
68+
_ = services.AddPulse(config =>
69+
config.UseKafkaTransport(opt =>
70+
{
71+
opt.DefaultPartitionCount = 4;
72+
opt.DefaultReplicationFactor = 2;
73+
opt.AutoCreateTopics = false;
74+
opt.MessageRetention = TimeSpan.FromHours(48);
75+
})
76+
);
77+
78+
using var provider = services.BuildServiceProvider();
79+
var options = provider.GetRequiredService<IOptions<KafkaTransportOptions>>().Value;
80+
81+
using (Assert.Multiple())
82+
{
83+
_ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(4);
84+
_ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)2);
85+
_ = await Assert.That(options.AutoCreateTopics).IsFalse();
86+
_ = await Assert.That(options.MessageRetention).IsEqualTo(TimeSpan.FromHours(48));
87+
}
88+
}
89+
90+
[Test]
91+
public async Task UseKafkaTransport_Without_configureOptions_uses_default_options()
92+
{
93+
IServiceCollection services = new ServiceCollection();
94+
_ = services.AddPulse(config => config.UseKafkaTransport());
95+
96+
using var provider = services.BuildServiceProvider();
97+
var options = provider.GetRequiredService<IOptions<KafkaTransportOptions>>().Value;
98+
99+
using (Assert.Multiple())
100+
{
101+
_ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(1);
102+
_ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)1);
103+
_ = await Assert.That(options.AutoCreateTopics).IsTrue();
104+
_ = await Assert.That(options.MessageRetention).IsNull();
105+
}
106+
}
107+
62108
private sealed class DummyTransport : IMessageTransport
63109
{
64110
public Task<bool> IsHealthyAsync(CancellationToken cancellationToken = default) => Task.FromResult(true);

0 commit comments

Comments
 (0)