diff --git a/Pulse.slnx b/Pulse.slnx index 5a49cbc0..f230233f 100644 --- a/Pulse.slnx +++ b/Pulse.slnx @@ -28,6 +28,7 @@ + @@ -38,7 +39,6 @@ - diff --git a/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs b/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs index 20c2ec2d..07a8a202 100644 --- a/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs +++ b/src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs @@ -16,6 +16,7 @@ public static class KafkaExtensions /// Registers the Kafka outbox transport so that outbox messages are produced to Kafka topics. /// /// The mediator configurator. + /// Optional action to configure . /// The same instance for chaining. /// /// Prerequisites: @@ -41,12 +42,21 @@ public static class KafkaExtensions /// Replaces any previously registered . /// /// Thrown if is null. - public static IMediatorBuilder UseKafkaTransport(this IMediatorBuilder configurator) + public static IMediatorBuilder UseKafkaTransport( + this IMediatorBuilder configurator, + Action? configureOptions = null + ) { ArgumentNullException.ThrowIfNull(configurator); var services = configurator.Services; + _ = services.AddOptions(); + if (configureOptions is not null) + { + _ = services.Configure(configureOptions); + } + var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport)); if (existing is not null) { diff --git a/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs index 144df0be..9d662ad1 100644 --- a/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs +++ b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs @@ -3,8 +3,11 @@ namespace NetEvolve.Pulse.Outbox; using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; using System.Globalization; +using System.Linq; using System.Text; using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Microsoft.Extensions.Options; using NetEvolve.Pulse.Extensibility.Outbox; /// @@ -21,6 +24,8 @@ public sealed class KafkaMessageTransport : IMessageTransport private readonly IProducer _producer; private readonly IAdminClient _adminClient; private readonly ITopicNameResolver _topicNameResolver; + private readonly KafkaTransportOptions _options; + private readonly ConcurrentDictionary _ensuredTopics = new(StringComparer.Ordinal); /// /// Initializes a new instance of . @@ -28,19 +33,23 @@ public sealed class KafkaMessageTransport : IMessageTransport /// The Kafka producer, registered in DI by the caller. /// The Kafka admin client, registered in DI by the caller. /// The resolver that maps each outbox message to a Kafka topic name. + /// The transport options. public KafkaMessageTransport( IProducer producer, IAdminClient adminClient, - ITopicNameResolver topicNameResolver + ITopicNameResolver topicNameResolver, + IOptions options ) { ArgumentNullException.ThrowIfNull(producer); ArgumentNullException.ThrowIfNull(adminClient); ArgumentNullException.ThrowIfNull(topicNameResolver); + ArgumentNullException.ThrowIfNull(options); _producer = producer; _adminClient = adminClient; _topicNameResolver = topicNameResolver; + _options = options.Value; } /// @@ -49,6 +58,9 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio ArgumentNullException.ThrowIfNull(message); var topic = _topicNameResolver.Resolve(message); + + await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false); + var kafkaMessage = CreateKafkaMessage(message); _ = await _producer.ProduceAsync(topic, kafkaMessage, cancellationToken).ConfigureAwait(false); @@ -58,9 +70,9 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio [SuppressMessage( "Performance", "CA1849:Call async methods when in an async method", - Justification = "Intentional fire-and-forget batch pattern. Produce() enqueues messages with a delivery-report callback; awaiting ProduceAsync() per message would serialize delivery and defeat the purpose of batching." + Justification = "Intentional fire-and-forget batch pattern. Produce() enqueues messages with a delivery-report callback; awaiting ProduceAsync() per message would serialize delivery and defeat the purpose of batching. The method is async only to await EnsureTopicAsync() for topic auto-creation before the fire-and-forget send loop." )] - public Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default) + public async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(messages); @@ -69,6 +81,9 @@ public Task SendBatchAsync(IEnumerable messages, CancellationToke foreach (var message in messages) { var topic = _topicNameResolver.Resolve(message); + + await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false); + var kafkaMessage = CreateKafkaMessage(message); try @@ -95,10 +110,8 @@ public Task SendBatchAsync(IEnumerable messages, CancellationToke if (!errors.IsEmpty) { - return Task.FromException(new AggregateException(errors)); + throw new AggregateException(errors); } - - return Task.CompletedTask; } /// @@ -115,6 +128,43 @@ public Task IsHealthyAsync(CancellationToken cancellationToken = default) } } + private async Task EnsureTopicAsync(string topic, CancellationToken cancellationToken) + { + if (!_options.AutoCreateTopics || _ensuredTopics.ContainsKey(topic)) + { + return; + } + + cancellationToken.ThrowIfCancellationRequested(); + + var configs = new Dictionary(); + + if (_options.MessageRetention.HasValue) + { + configs["retention.ms"] = ((long)_options.MessageRetention.Value.TotalMilliseconds).ToString( + CultureInfo.InvariantCulture + ); + } + + var spec = new TopicSpecification + { + Name = topic, + NumPartitions = _options.DefaultPartitionCount, + ReplicationFactor = _options.DefaultReplicationFactor, + Configs = configs.Count > 0 ? configs : null, + }; + + try + { + await _adminClient.CreateTopicsAsync([spec]).ConfigureAwait(false); + _ = _ensuredTopics.TryAdd(topic, true); + } + catch (CreateTopicsException ex) when (ex.Results.All(static r => r.Error.Code == ErrorCode.TopicAlreadyExists)) + { + _ = _ensuredTopics.TryAdd(topic, true); + } + } + private static Message CreateKafkaMessage(OutboxMessage message) { var eventTypeName = message.EventType.ToOutboxEventTypeName(); diff --git a/src/NetEvolve.Pulse.Kafka/Outbox/KafkaTransportOptions.cs b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaTransportOptions.cs new file mode 100644 index 00000000..539fe747 --- /dev/null +++ b/src/NetEvolve.Pulse.Kafka/Outbox/KafkaTransportOptions.cs @@ -0,0 +1,31 @@ +namespace NetEvolve.Pulse.Outbox; + +/// +/// Configuration options for . +/// +public sealed class KafkaTransportOptions +{ + /// + /// Gets or sets the default number of partitions for auto-created topics. + /// + /// Defaults to 1. + public int DefaultPartitionCount { get; set; } = 1; + + /// + /// Gets or sets the default replication factor for auto-created topics. + /// + /// Defaults to 1. + public short DefaultReplicationFactor { get; set; } = 1; + + /// + /// Gets or sets a value indicating whether topics should be automatically created before sending messages. + /// + /// Defaults to . + public bool AutoCreateTopics { get; set; } = true; + + /// + /// Gets or sets the message retention duration applied to auto-created topics. + /// + /// When , the broker default retention policy is used. + public TimeSpan? MessageRetention { get; set; } +} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs index c4143ed6..49c7b6cf 100644 --- a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs +++ b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs @@ -1,6 +1,8 @@ namespace NetEvolve.Pulse.Tests.Unit.Kafka; +using System; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using NetEvolve.Extensions.TUnit; using NetEvolve.Pulse; using NetEvolve.Pulse.Extensibility; @@ -59,6 +61,50 @@ public async Task UseKafkaTransport_Does_not_register_adapters() .IsFalse(); } + [Test] + public async Task UseKafkaTransport_With_configureOptions_registers_options() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => + config.UseKafkaTransport(opt => + { + opt.DefaultPartitionCount = 4; + opt.DefaultReplicationFactor = 2; + opt.AutoCreateTopics = false; + opt.MessageRetention = TimeSpan.FromHours(48); + }) + ); + + using var provider = services.BuildServiceProvider(); + var options = provider.GetRequiredService>().Value; + + using (Assert.Multiple()) + { + _ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(4); + _ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)2); + _ = await Assert.That(options.AutoCreateTopics).IsFalse(); + _ = await Assert.That(options.MessageRetention).IsEqualTo(TimeSpan.FromHours(48)); + } + } + + [Test] + public async Task UseKafkaTransport_Without_configureOptions_uses_default_options() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseKafkaTransport()); + + using var provider = services.BuildServiceProvider(); + var options = provider.GetRequiredService>().Value; + + using (Assert.Multiple()) + { + _ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(1); + _ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)1); + _ = await Assert.That(options.AutoCreateTopics).IsTrue(); + _ = await Assert.That(options.MessageRetention).IsNull(); + } + } + private sealed class DummyTransport : IMessageTransport { public Task IsHealthyAsync(CancellationToken cancellationToken = default) => Task.FromResult(true); diff --git a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs index 1ef014d8..8a2e3fe6 100644 --- a/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs +++ b/tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaMessageTransportTests.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Confluent.Kafka; using Confluent.Kafka.Admin; +using Microsoft.Extensions.Options; using NetEvolve.Extensions.TUnit; using NetEvolve.Pulse.Extensibility; using NetEvolve.Pulse.Extensibility.Outbox; @@ -154,11 +155,175 @@ CancellationToken cancellationToken _ = await Assert.That(healthy).IsFalse(); } + [Test] + public async Task SendAsync_AutoCreateTopics_true_Creates_topic_before_producing( + CancellationToken cancellationToken + ) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = true } + ); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(1); + _ = await Assert.That(admin.CreatedTopics).Count().IsEqualTo(1); + } + + [Test] + public async Task SendAsync_AutoCreateTopics_false_Skips_topic_creation(CancellationToken cancellationToken) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = false } + ); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(0); + } + + [Test] + public async Task SendAsync_AutoCreateTopics_only_Creates_topic_once_per_topic(CancellationToken cancellationToken) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = true } + ); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(1); + } + + [Test] + public async Task SendAsync_Creates_topic_with_custom_partition_count(CancellationToken cancellationToken) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = true, DefaultPartitionCount = 3 } + ); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(admin.CreatedTopics.Single().NumPartitions).IsEqualTo(3); + } + + [Test] + public async Task SendAsync_Creates_topic_with_custom_replication_factor(CancellationToken cancellationToken) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = true, DefaultReplicationFactor = 2 } + ); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(admin.CreatedTopics.Single().ReplicationFactor).IsEqualTo((short)2); + } + + [Test] + public async Task SendAsync_Creates_topic_with_custom_retention(CancellationToken cancellationToken) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var retention = TimeSpan.FromHours(24); + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = true, MessageRetention = retention } + ); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + var createdTopic = admin.CreatedTopics.Single(); + _ = await Assert.That(createdTopic.Configs).IsNotNull(); + _ = await Assert + .That(createdTopic.Configs!["retention.ms"]) + .IsEqualTo(((long)retention.TotalMilliseconds).ToString(System.Globalization.CultureInfo.InvariantCulture)); + } + + [Test] + public async Task SendAsync_Creates_topic_without_retention_config_when_not_set(CancellationToken cancellationToken) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = true, MessageRetention = null } + ); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(admin.CreatedTopics.Single().Configs).IsNull(); + } + + [Test] + public async Task SendBatchAsync_AutoCreateTopics_false_Skips_topic_creation(CancellationToken cancellationToken) + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var transport = CreateTransport( + producer, + admin, + options: new KafkaTransportOptions { AutoCreateTopics = false } + ); + var messages = new[] { CreateOutboxMessage(), CreateOutboxMessage() }; + + await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(admin.CreateTopicsCallCount).IsEqualTo(0); + } + + [Test] + public async Task Constructor_When_options_is_null_throws_ArgumentNullException() + { + using var producer = new FakeProducer(); + using var admin = new FakeAdminClient { BrokerCount = 1 }; + var resolver = new FixedTopicNameResolver("test-topic"); + + _ = await Assert.ThrowsAsync(() => + Task.FromResult(new KafkaMessageTransport(producer, admin, resolver, null!)) + ); + } + private static KafkaMessageTransport CreateTransport( IProducer producer, IAdminClient admin, - string topicName = "test-topic" - ) => new(producer, admin, new FixedTopicNameResolver(topicName)); + string topicName = "test-topic", + KafkaTransportOptions? options = null + ) => + new( + producer, + admin, + new FixedTopicNameResolver(topicName), + Options.Create(options ?? new KafkaTransportOptions()) + ); private static OutboxMessage CreateOutboxMessage() => new() @@ -300,6 +465,8 @@ private sealed class FakeAdminClient : IAdminClient public int BrokerCount { get; init; } public bool ThrowOnGetMetadata { get; init; } public int GetMetadataCallCount { get; private set; } + public List CreatedTopics { get; } = []; + public int CreateTopicsCallCount { get; private set; } public string Name => "fake-admin"; public Handle Handle => default!; @@ -327,8 +494,12 @@ public Metadata GetMetadata(TimeSpan timeout) public GroupInfo ListGroup(string group, TimeSpan timeout) => throw new NotSupportedException(); - public Task CreateTopicsAsync(IEnumerable topics, CreateTopicsOptions? options = null) => - throw new NotSupportedException(); + public Task CreateTopicsAsync(IEnumerable topics, CreateTopicsOptions? options = null) + { + CreateTopicsCallCount++; + CreatedTopics.AddRange(topics); + return Task.CompletedTask; + } public Task DeleteTopicsAsync(IEnumerable topics, DeleteTopicsOptions? options = null) => throw new NotSupportedException();