Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Pulse.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<Project Path="src/NetEvolve.Pulse.AspNetCore/NetEvolve.Pulse.AspNetCore.csproj" />
<Project Path="src/NetEvolve.Pulse.EntityFramework/NetEvolve.Pulse.EntityFramework.csproj" />
<Project Path="src/NetEvolve.Pulse.Dapr/NetEvolve.Pulse.Dapr.csproj" />
<Project Path="src/NetEvolve.Pulse.MongoDB/NetEvolve.Pulse.MongoDB.csproj" />
<Project Path="src/NetEvolve.Pulse.MySql/NetEvolve.Pulse.MySql.csproj" />
<Project Path="src/NetEvolve.Pulse.PostgreSql/NetEvolve.Pulse.PostgreSql.csproj" />
<Project Path="src/NetEvolve.Pulse.SqlServer/NetEvolve.Pulse.SqlServer.csproj" />
Expand All @@ -38,7 +39,6 @@
<Project Path="src/NetEvolve.Pulse.SourceGeneration/NetEvolve.Pulse.SourceGeneration.csproj" />
<Project Path="src/NetEvolve.Pulse.AzureServiceBus/NetEvolve.Pulse.AzureServiceBus.csproj" />
<Project Path="src/NetEvolve.Pulse.SQLite/NetEvolve.Pulse.SQLite.csproj" />
<Project Path="src/NetEvolve.Pulse.MongoDB/NetEvolve.Pulse.MongoDB.csproj" />
<Project Path="src/NetEvolve.Pulse.Kafka/NetEvolve.Pulse.Kafka.csproj" />
<Project Path="src/NetEvolve.Pulse.Redis/NetEvolve.Pulse.Redis.csproj" />
</Folder>
Expand Down
12 changes: 11 additions & 1 deletion src/NetEvolve.Pulse.Kafka/KafkaExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public static class KafkaExtensions
/// Registers the Kafka outbox transport so that outbox messages are produced to Kafka topics.
/// </summary>
/// <param name="configurator">The mediator configurator.</param>
/// <param name="configureOptions">Optional action to configure <see cref="KafkaTransportOptions"/>.</param>
/// <returns>The same <paramref name="configurator" /> instance for chaining.</returns>
/// <remarks>
/// <para><strong>Prerequisites:</strong></para>
Expand All @@ -41,12 +42,21 @@ public static class KafkaExtensions
/// Replaces any previously registered <see cref="IMessageTransport" />.
/// </remarks>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="configurator" /> is null.</exception>
public static IMediatorBuilder UseKafkaTransport(this IMediatorBuilder configurator)
public static IMediatorBuilder UseKafkaTransport(
this IMediatorBuilder configurator,
Action<KafkaTransportOptions>? configureOptions = null
)
{
ArgumentNullException.ThrowIfNull(configurator);

var services = configurator.Services;

_ = services.AddOptions<KafkaTransportOptions>();
if (configureOptions is not null)
{
_ = services.Configure(configureOptions);
}

var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport));
if (existing is not null)
{
Expand Down
62 changes: 56 additions & 6 deletions src/NetEvolve.Pulse.Kafka/Outbox/KafkaMessageTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ namespace NetEvolve.Pulse.Outbox;
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Linq;
using System.Text;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility.Outbox;

/// <summary>
Expand All @@ -21,26 +24,32 @@ public sealed class KafkaMessageTransport : IMessageTransport
private readonly IProducer<string, string> _producer;
private readonly IAdminClient _adminClient;
private readonly ITopicNameResolver _topicNameResolver;
private readonly KafkaTransportOptions _options;
private readonly ConcurrentDictionary<string, bool> _ensuredTopics = new(StringComparer.Ordinal);

/// <summary>
/// Initializes a new instance of <see cref="KafkaMessageTransport" />.
/// </summary>
/// <param name="producer">The Kafka producer, registered in DI by the caller.</param>
/// <param name="adminClient">The Kafka admin client, registered in DI by the caller.</param>
/// <param name="topicNameResolver">The resolver that maps each outbox message to a Kafka topic name.</param>
/// <param name="options">The transport options.</param>
public KafkaMessageTransport(
IProducer<string, string> producer,
IAdminClient adminClient,
ITopicNameResolver topicNameResolver
ITopicNameResolver topicNameResolver,
IOptions<KafkaTransportOptions> options
)
{
ArgumentNullException.ThrowIfNull(producer);
ArgumentNullException.ThrowIfNull(adminClient);
ArgumentNullException.ThrowIfNull(topicNameResolver);
ArgumentNullException.ThrowIfNull(options);

_producer = producer;
_adminClient = adminClient;
_topicNameResolver = topicNameResolver;
_options = options.Value;
}

/// <inheritdoc />
Expand All @@ -49,6 +58,9 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
ArgumentNullException.ThrowIfNull(message);

var topic = _topicNameResolver.Resolve(message);

await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false);

var kafkaMessage = CreateKafkaMessage(message);

_ = await _producer.ProduceAsync(topic, kafkaMessage, cancellationToken).ConfigureAwait(false);
Expand All @@ -58,9 +70,9 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
[SuppressMessage(
"Performance",
"CA1849:Call async methods when in an async method",
Justification = "Intentional fire-and-forget batch pattern. Produce() enqueues messages with a delivery-report callback; awaiting ProduceAsync() per message would serialize delivery and defeat the purpose of batching."
Justification = "Intentional fire-and-forget batch pattern. Produce() enqueues messages with a delivery-report callback; awaiting ProduceAsync() per message would serialize delivery and defeat the purpose of batching. The method is async only to await EnsureTopicAsync() for topic auto-creation before the fire-and-forget send loop."
)]
public Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
public async Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(messages);

Expand All @@ -69,6 +81,9 @@ public Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToke
foreach (var message in messages)
{
var topic = _topicNameResolver.Resolve(message);

await EnsureTopicAsync(topic, cancellationToken).ConfigureAwait(false);

var kafkaMessage = CreateKafkaMessage(message);

try
Expand All @@ -95,10 +110,8 @@ public Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToke

if (!errors.IsEmpty)
{
return Task.FromException(new AggregateException(errors));
throw new AggregateException(errors);
}

return Task.CompletedTask;
}

/// <inheritdoc />
Expand All @@ -115,6 +128,43 @@ public Task<bool> IsHealthyAsync(CancellationToken cancellationToken = default)
}
}

private async Task EnsureTopicAsync(string topic, CancellationToken cancellationToken)
{
if (!_options.AutoCreateTopics || _ensuredTopics.ContainsKey(topic))
{
return;
}

cancellationToken.ThrowIfCancellationRequested();

var configs = new Dictionary<string, string>();

if (_options.MessageRetention.HasValue)
{
configs["retention.ms"] = ((long)_options.MessageRetention.Value.TotalMilliseconds).ToString(
CultureInfo.InvariantCulture
);
}

var spec = new TopicSpecification
{
Name = topic,
NumPartitions = _options.DefaultPartitionCount,
ReplicationFactor = _options.DefaultReplicationFactor,
Configs = configs.Count > 0 ? configs : null,
};

try
{
await _adminClient.CreateTopicsAsync([spec]).ConfigureAwait(false);
_ = _ensuredTopics.TryAdd(topic, true);
}
catch (CreateTopicsException ex) when (ex.Results.All(static r => r.Error.Code == ErrorCode.TopicAlreadyExists))
{
_ = _ensuredTopics.TryAdd(topic, true);
}
}

private static Message<string, string> CreateKafkaMessage(OutboxMessage message)
{
var eventTypeName = message.EventType.ToOutboxEventTypeName();
Expand Down
31 changes: 31 additions & 0 deletions src/NetEvolve.Pulse.Kafka/Outbox/KafkaTransportOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace NetEvolve.Pulse.Outbox;

/// <summary>
/// Configuration options for <see cref="KafkaMessageTransport"/>.
/// </summary>
public sealed class KafkaTransportOptions
{
/// <summary>
/// Gets or sets the default number of partitions for auto-created topics.
/// </summary>
/// <remarks>Defaults to <c>1</c>.</remarks>
public int DefaultPartitionCount { get; set; } = 1;

/// <summary>
/// Gets or sets the default replication factor for auto-created topics.
/// </summary>
/// <remarks>Defaults to <c>1</c>.</remarks>
public short DefaultReplicationFactor { get; set; } = 1;

/// <summary>
/// Gets or sets a value indicating whether topics should be automatically created before sending messages.
/// </summary>
/// <remarks>Defaults to <see langword="true"/>.</remarks>
public bool AutoCreateTopics { get; set; } = true;

/// <summary>
/// Gets or sets the message retention duration applied to auto-created topics.
/// </summary>
/// <remarks>When <see langword="null"/>, the broker default retention policy is used.</remarks>
public TimeSpan? MessageRetention { get; set; }
}
46 changes: 46 additions & 0 deletions tests/NetEvolve.Pulse.Tests.Unit/Kafka/KafkaExtensionsTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace NetEvolve.Pulse.Tests.Unit.Kafka;

using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using NetEvolve.Extensions.TUnit;
using NetEvolve.Pulse;
using NetEvolve.Pulse.Extensibility;
Expand Down Expand Up @@ -59,6 +61,50 @@ public async Task UseKafkaTransport_Does_not_register_adapters()
.IsFalse();
}

[Test]
public async Task UseKafkaTransport_With_configureOptions_registers_options()
{
IServiceCollection services = new ServiceCollection();
_ = services.AddPulse(config =>
config.UseKafkaTransport(opt =>
{
opt.DefaultPartitionCount = 4;
opt.DefaultReplicationFactor = 2;
opt.AutoCreateTopics = false;
opt.MessageRetention = TimeSpan.FromHours(48);
})
);

using var provider = services.BuildServiceProvider();
var options = provider.GetRequiredService<IOptions<KafkaTransportOptions>>().Value;

using (Assert.Multiple())
{
_ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(4);
_ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)2);
_ = await Assert.That(options.AutoCreateTopics).IsFalse();
_ = await Assert.That(options.MessageRetention).IsEqualTo(TimeSpan.FromHours(48));
}
}

[Test]
public async Task UseKafkaTransport_Without_configureOptions_uses_default_options()
{
IServiceCollection services = new ServiceCollection();
_ = services.AddPulse(config => config.UseKafkaTransport());

using var provider = services.BuildServiceProvider();
var options = provider.GetRequiredService<IOptions<KafkaTransportOptions>>().Value;

using (Assert.Multiple())
{
_ = await Assert.That(options.DefaultPartitionCount).IsEqualTo(1);
_ = await Assert.That(options.DefaultReplicationFactor).IsEqualTo((short)1);
_ = await Assert.That(options.AutoCreateTopics).IsTrue();
_ = await Assert.That(options.MessageRetention).IsNull();
}
}

private sealed class DummyTransport : IMessageTransport
{
public Task<bool> IsHealthyAsync(CancellationToken cancellationToken = default) => Task.FromResult(true);
Expand Down
Loading
Loading