diff --git a/Pulse.slnx b/Pulse.slnx
index 5a49cbc0..f230233f 100644
--- a/Pulse.slnx
+++ b/Pulse.slnx
@@ -28,6 +28,7 @@
+
@@ -38,7 +39,6 @@
-
diff --git a/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs b/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs
index 20c2ec2d..07a8a202 100644
--- a/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs
+++ b/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs
@@ -16,6 +16,7 @@ public static class KafkaExtensions
/// Registers the Kafka outbox transport so that outbox messages are produced to Kafka topics.
///
/// The mediator configurator.
+ /// Optional action to configure .
/// The same instance for chaining.
///
/// Prerequisites:
@@ -41,12 +42,21 @@ public static class KafkaExtensions
/// Replaces any previously registered .
///
/// Thrown if is null.
- public static IMediatorBuilder UseKafkaTransport(this IMediatorBuilder configurator)
+ public static IMediatorBuilder UseKafkaTransport(
+ this IMediatorBuilder configurator,
+ Action? configureOptions = null
+ )
{
ArgumentNullException.ThrowIfNull(configurator);
var services = configurator.Services;
+ _ = services.AddOptions();
+ if (configureOptions is not null)
+ {
+ _ = services.Configure(configureOptions);
+ }
+
var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport));
if (existing is not null)
{
diff --git a/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs
index 144df0be..9d662ad1 100644
--- a/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs
+++ b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs
@@ -3,8 +3,11 @@ namespace NetEvolve.Pulse.Outbox;
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
+using System.Linq;
using System.Text;
using Confluent.Kafka;
+using Confluent.Kafka.Admin;
+using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility.Outbox;
///
@@ -21,6 +24,8 @@ public sealed class KafkaMessageTransport : IMessageTransport
private readonly IProducer _producer;
private readonly IAdminClient _adminClient;
private readonly ITopicNameResolver _topicNameResolver;
+ private readonly KafkaTransportOptions _options;
+ private readonly ConcurrentDictionary _ensuredTopics = new(StringComparer.Ordinal);
///
/// Initializes a new instance of .
@@ -28,19 +33,23 @@ public sealed class KafkaMessageTransport : IMessageTransport
/// The Kafka producer, registered in DI by the caller.
/// The Kafka admin client, registered in DI by the caller.
/// The resolver that maps each outbox message to a Kafka topic name.
+ /// The transport options.
public KafkaMessageTransport(
IProducer producer,
IAdminClient adminClient,
- ITopicNameResolver topicNameResolver
+ ITopicNameResolver topicNameResolver,
+ IOptions options
)
{
ArgumentNullException.ThrowIfNull(producer);
ArgumentNullException.ThrowIfNull(adminClient);
ArgumentNullException.ThrowIfNull(topicNameResolver);
+ ArgumentNullException.ThrowIfNull(options);
_producer = producer;
_adminClient = adminClient;
_topicNameResolver = topicNameResolver;
+ _options = options.Value;
}
///
@@ -49,6 +58,9 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
ArgumentNullException.ThrowIfNull(message);
var topic = _topicNameResolver.Resolve(message);
+
+ await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false);
+
var kafkaMessage = CreateKafkaMessage(message);
_ = await _producer.ProduceAsync(topic, kafkaMessage, cancellationToken).ConfigureAwait(false);
@@ -58,9 +70,9 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
[SuppressMessage(
"Performance",
"CA1849:Call async methods when in an async method",
- Justification = "Intentional fire-and-forget batch pattern. Produce() enqueues messages with a delivery-report callback; awaiting ProduceAsync() per message would serialize delivery and defeat the purpose of batching."
+ Justification = "Intentional fire-and-forget batch pattern. Produce() enqueues messages with a delivery-report callback; awaiting ProduceAsync() per message would serialize delivery and defeat the purpose of batching. The method is async only to await EnsureTopicAsync() for topic auto-creation before the fire-and-forget send loop."
)]
- public Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default)
+ public async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(messages);
@@ -69,6 +81,9 @@ public Task SendBatchAsync(IEnumerable messages, CancellationToke
foreach (var message in messages)
{
var topic = _topicNameResolver.Resolve(message);
+
+ await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false);
+
var kafkaMessage = CreateKafkaMessage(message);
try
@@ -95,10 +110,8 @@ public Task SendBatchAsync(IEnumerable messages, CancellationToke
if (!errors.IsEmpty)
{
- return Task.FromException(new AggregateException(errors));
+ throw new AggregateException(errors);
}
-
- return Task.CompletedTask;
}
///
@@ -115,6 +128,43 @@ public Task IsHealthyAsync(CancellationToken cancellationToken = default)
}
}
+ private async Task EnsureTopicAsync(string topic, CancellationToken cancellationToken)
+ {
+ if (!_options.AutoCreateTopics || _ensuredTopics.ContainsKey(topic))
+ {
+ return;
+ }
+
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var configs = new Dictionary();
+
+ if (_options.MessageRetention.HasValue)
+ {
+ configs["retention.ms"] = ((long)_options.MessageRetention.Value.TotalMilliseconds).ToString(
+ CultureInfo.InvariantCulture
+ );
+ }
+
+ var spec = new TopicSpecification
+ {
+ Name = topic,
+ NumPartitions = _options.DefaultPartitionCount,
+ ReplicationFactor = _options.DefaultReplicationFactor,
+ Configs = configs.Count > 0 ? configs : null,
+ };
+
+ try
+ {
+ await _adminClient.CreateTopicsAsync([spec]).ConfigureAwait(false);
+ _ = _ensuredTopics.TryAdd(topic, true);
+ }
+ catch (CreateTopicsException ex) when (ex.Results.All(static r => r.Error.Code == ErrorCode.TopicAlreadyExists))
+ {
+ _ = _ensuredTopics.TryAdd(topic, true);
+ }
+ }
+
private static Message CreateKafkaMessage(OutboxMessage message)
{
var eventTypeName = message.EventType.ToOutboxEventTypeName();
diff --git a/src/NetEvolve.Pulse.Kafka/Outbox/KafkaTransportOptions.cs b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaTransportOptions.cs
new file mode 100644
index 00000000..539fe747
--- /dev/null
+++ b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaTransportOptions.cs
@@ -0,0 +1,31 @@
+namespace NetEvolve.Pulse.Outbox;
+
+///
+/// Configuration options for .
+///
+public sealed class KafkaTransportOptions
+{
+ ///
+ /// Gets or sets the default number of partitions for auto-created topics.
+ ///
+ /// Defaults to 1.
+ public int DefaultPartitionCount { get; set; } = 1;
+
+ ///
+ /// Gets or sets the default replication factor for auto-created topics.
+ ///
+ /// Defaults to 1.
+ public short DefaultReplicationFactor { get; set; } = 1;
+
+ ///
+ /// Gets or sets a value indicating whether topics should be automatically created before sending messages.
+ ///
+ /// Defaults to .
+ public bool AutoCreateTopics { get; set; } = true;
+
+ ///
+ /// Gets or sets the message retention duration applied to auto-created topics.
+ ///
+ /// When , the broker default retention policy is used.
+ public TimeSpan? MessageRetention { get; set; }
+}
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs
index c4143ed6..49c7b6cf 100644
--- a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs
+++ b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs
@@ -1,6 +1,8 @@
namespace NetEvolve.Pulse.Tests.Unit.Kafka;
+using System;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
using NetEvolve.Extensions.TUnit;
using NetEvolve.Pulse;
using NetEvolve.Pulse.Extensibility;
@@ -59,6 +61,50 @@ public async Task UseKafkaTransport_Does_not_register_adapters()
.IsFalse();
}
+ [Test]
+ public async Task UseKafkaTransport_With_configureOptions_registers_options()
+ {
+ IServiceCollection services = new ServiceCollection();
+ _ = services.AddPulse(config =>
+ config.UseKafkaTransport(opt =>
+ {
+ opt.DefaultPartitionCount = 4;
+ opt.DefaultReplicationFactor = 2;
+ opt.AutoCreateTopics = false;
+ opt.MessageRetention = TimeSpan.FromHours(48);
+ })
+ );
+
+ using var provider = services.BuildServiceProvider();
+ var options = provider.GetRequiredService>().Value;
+
+ using (Assert.Multiple())
+ {
+ _ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(4);
+ _ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)2);
+ _ = await Assert.That(options.AutoCreateTopics).IsFalse();
+ _ = await Assert.That(options.MessageRetention).IsEqualTo(TimeSpan.FromHours(48));
+ }
+ }
+
+ [Test]
+ public async Task UseKafkaTransport_Without_configureOptions_uses_default_options()
+ {
+ IServiceCollection services = new ServiceCollection();
+ _ = services.AddPulse(config => config.UseKafkaTransport());
+
+ using var provider = services.BuildServiceProvider();
+ var options = provider.GetRequiredService>().Value;
+
+ using (Assert.Multiple())
+ {
+ _ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(1);
+ _ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)1);
+ _ = await Assert.That(options.AutoCreateTopics).IsTrue();
+ _ = await Assert.That(options.MessageRetention).IsNull();
+ }
+ }
+
private sealed class DummyTransport : IMessageTransport
{
public Task IsHealthyAsync(CancellationToken cancellationToken = default) => Task.FromResult(true);
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs
index 1ef014d8..8a2e3fe6 100644
--- a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs
+++ b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs
@@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
+using Microsoft.Extensions.Options;
using NetEvolve.Extensions.TUnit;
using NetEvolve.Pulse.Extensibility;
using NetEvolve.Pulse.Extensibility.Outbox;
@@ -154,11 +155,175 @@ CancellationToken cancellationToken
_ = await Assert.That(healthy).IsFalse();
}
+ [Test]
+ public async Task SendAsync_AutoCreateTopics_true_Creates_topic_before_producing(
+ CancellationToken cancellationToken
+ )
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = true }
+ );
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(1);
+ _ = await Assert.That(admin.CreatedTopics).Count().IsEqualTo(1);
+ }
+
+ [Test]
+ public async Task SendAsync_AutoCreateTopics_false_Skips_topic_creation(CancellationToken cancellationToken)
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = false }
+ );
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(0);
+ }
+
+ [Test]
+ public async Task SendAsync_AutoCreateTopics_only_Creates_topic_once_per_topic(CancellationToken cancellationToken)
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = true }
+ );
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(1);
+ }
+
+ [Test]
+ public async Task SendAsync_Creates_topic_with_custom_partition_count(CancellationToken cancellationToken)
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = true, DefaultPartitionCount = 3 }
+ );
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(admin.CreatedTopics.Single().NumPartitions).IsEqualTo(3);
+ }
+
+ [Test]
+ public async Task SendAsync_Creates_topic_with_custom_replication_factor(CancellationToken cancellationToken)
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = true, DefaultReplicationFactor = 2 }
+ );
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(admin.CreatedTopics.Single().ReplicationFactor).IsEqualTo((short)2);
+ }
+
+ [Test]
+ public async Task SendAsync_Creates_topic_with_custom_retention(CancellationToken cancellationToken)
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var retention = TimeSpan.FromHours(24);
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = true, MessageRetention = retention }
+ );
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ var createdTopic = admin.CreatedTopics.Single();
+ _ = await Assert.That(createdTopic.Configs).IsNotNull();
+ _ = await Assert
+ .That(createdTopic.Configs!["retention.ms"])
+ .IsEqualTo(((long)retention.TotalMilliseconds).ToString(System.Globalization.CultureInfo.InvariantCulture));
+ }
+
+ [Test]
+ public async Task SendAsync_Creates_topic_without_retention_config_when_not_set(CancellationToken cancellationToken)
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = true, MessageRetention = null }
+ );
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(admin.CreatedTopics.Single().Configs).IsNull();
+ }
+
+ [Test]
+ public async Task SendBatchAsync_AutoCreateTopics_false_Skips_topic_creation(CancellationToken cancellationToken)
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var transport = CreateTransport(
+ producer,
+ admin,
+ options: new KafkaTransportOptions { AutoCreateTopics = false }
+ );
+ var messages = new[] { CreateOutboxMessage(), CreateOutboxMessage() };
+
+ await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(0);
+ }
+
+ [Test]
+ public async Task Constructor_When_options_is_null_throws_ArgumentNullException()
+ {
+ using var producer = new FakeProducer();
+ using var admin = new FakeAdminClient { BrokerCount = 1 };
+ var resolver = new FixedTopicNameResolver("test-topic");
+
+ _ = await Assert.ThrowsAsync(() =>
+ Task.FromResult(new KafkaMessageTransport(producer, admin, resolver, null!))
+ );
+ }
+
private static KafkaMessageTransport CreateTransport(
IProducer producer,
IAdminClient admin,
- string topicName = "test-topic"
- ) => new(producer, admin, new FixedTopicNameResolver(topicName));
+ string topicName = "test-topic",
+ KafkaTransportOptions? options = null
+ ) =>
+ new(
+ producer,
+ admin,
+ new FixedTopicNameResolver(topicName),
+ Options.Create(options ?? new KafkaTransportOptions())
+ );
private static OutboxMessage CreateOutboxMessage() =>
new()
@@ -300,6 +465,8 @@ private sealed class FakeAdminClient : IAdminClient
public int BrokerCount { get; init; }
public bool ThrowOnGetMetadata { get; init; }
public int GetMetadataCallCount { get; private set; }
+ public List CreatedTopics { get; } = [];
+ public int CreateTopicsCallCount { get; private set; }
public string Name => "fake-admin";
public Handle Handle => default!;
@@ -327,8 +494,12 @@ public Metadata GetMetadata(TimeSpan timeout)
public GroupInfo ListGroup(string group, TimeSpan timeout) => throw new NotSupportedException();
- public Task CreateTopicsAsync(IEnumerable topics, CreateTopicsOptions? options = null) =>
- throw new NotSupportedException();
+ public Task CreateTopicsAsync(IEnumerable topics, CreateTopicsOptions? options = null)
+ {
+ CreateTopicsCallCount++;
+ CreatedTopics.AddRange(topics);
+ return Task.CompletedTask;
+ }
public Task DeleteTopicsAsync(IEnumerable topics, DeleteTopicsOptions? options = null) =>
throw new NotSupportedException();