From 1c7b7237ca03c8315f867439ffda4dde8eaab66f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 17 Apr 2026 17:49:43 +0000 Subject: [PATCH] feat: add SendBatchAsync override and unit/integration tests for RabbitMQ transport Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/3e8d71d0-5c05-4a68-91c3-639f8e6b0900 Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com> --- .../Outbox/RabbitMqMessageTransport.cs | 28 ++ .../RabbitMQ/RabbitMqContainerFixture.cs | 24 ++ ...abbitMqMessageTransportIntegrationTests.cs | 256 ++++++++++++++++++ .../RabbitMQ/RabbitMqMessageTransportTests.cs | 86 ++++++ 4 files changed, 394 insertions(+) create mode 100644 tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqContainerFixture.cs create mode 100644 tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqMessageTransportIntegrationTests.cs diff --git a/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs index ffd00224..ef33830c 100644 --- a/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs +++ b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs @@ -68,6 +68,34 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio ArgumentNullException.ThrowIfNull(message); var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false); + await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false); + } + + /// + public async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(messages); + + var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false); + + foreach (var message in messages) + { + await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false); + } + } + + /// + /// Publishes a single outbox message to the RabbitMQ exchange using the provided channel. + /// + /// The channel to use for publishing. + /// The outbox message to publish. + /// A token to monitor for cancellation requests. + private async Task PublishAsync( + IRabbitMqChannelAdapter channel, + OutboxMessage message, + CancellationToken cancellationToken + ) + { var routingKey = ResolveRoutingKey(message); var body = Encoding.UTF8.GetBytes(message.Payload); diff --git a/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqContainerFixture.cs b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqContainerFixture.cs new file mode 100644 index 00000000..7ebda8ae --- /dev/null +++ b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqContainerFixture.cs @@ -0,0 +1,24 @@ +namespace NetEvolve.Pulse.Tests.Integration.RabbitMQ; + +using Microsoft.Extensions.Logging.Abstractions; +using Testcontainers.RabbitMq; +using TUnit.Core.Interfaces; + +/// +/// Provides a shared RabbitMQ container fixture for integration tests. +/// +public sealed class RabbitMqContainerFixture : IAsyncDisposable, IAsyncInitializer +{ + private readonly RabbitMqContainer _container = new RabbitMqBuilder().WithLogger(NullLogger.Instance).Build(); + + /// + /// Gets the connection string for the running RabbitMQ container. + /// + public string ConnectionString => _container.GetConnectionString(); + + /// + public ValueTask DisposeAsync() => _container.DisposeAsync(); + + /// + public async Task InitializeAsync() => await _container.StartAsync().ConfigureAwait(false); +} diff --git a/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqMessageTransportIntegrationTests.cs b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqMessageTransportIntegrationTests.cs new file mode 100644 index 00000000..af32d893 --- /dev/null +++ b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqMessageTransportIntegrationTests.cs @@ -0,0 +1,256 @@ +namespace NetEvolve.Pulse.Tests.Integration.RabbitMQ; + +using System.Text; +using global::RabbitMQ.Client; +using global::RabbitMQ.Client.Events; +using Microsoft.Extensions.Options; +using NetEvolve.Extensions.TUnit; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Internals; +using NetEvolve.Pulse.Outbox; +using TUnit.Assertions.Extensions; +using TUnit.Core; + +/// +/// Integration tests for against a real RabbitMQ broker. +/// +[ClassDataSource(Shared = SharedType.PerTestSession)] +[TestGroup("RabbitMQ")] +[Timeout(120_000)] +public sealed class RabbitMqMessageTransportIntegrationTests(RabbitMqContainerFixture containerFixture) + : IAsyncDisposable +{ + private const string ExchangeName = "pulse.integration.test"; + + private IConnection? _connection; + private IChannel? _adminChannel; + + private async Task<(IConnection Connection, IChannel AdminChannel)> GetConnectionAndChannelAsync( + CancellationToken cancellationToken + ) + { + if (_connection is not null && _adminChannel is not null) + { + return (_connection, _adminChannel); + } + + var factory = new ConnectionFactory { Uri = new Uri(containerFixture.ConnectionString) }; + _connection = await factory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false); + _adminChannel = await _connection + .CreateChannelAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + await _adminChannel + .ExchangeDeclareAsync( + ExchangeName, + ExchangeType.Fanout, + durable: false, + autoDelete: true, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + + return (_connection, _adminChannel); + } + + /// + public async ValueTask DisposeAsync() + { + if (_adminChannel is not null) + { + await _adminChannel.CloseAsync().ConfigureAwait(false); + _adminChannel.Dispose(); + } + + if (_connection is not null) + { + await _connection.CloseAsync().ConfigureAwait(false); + await _connection.DisposeAsync().ConfigureAwait(false); + } + } + + [Test] + public async Task SendAsync_Publishes_message_to_exchange(CancellationToken cancellationToken) + { + var (connection, adminChannel) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false); + + var queueName = await adminChannel + .QueueDeclareAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + await adminChannel + .QueueBindAsync( + queueName.QueueName, + ExchangeName, + routingKey: string.Empty, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + + var adapter = new RabbitMqConnectionAdapter(connection); + using var transport = CreateTransport(adapter); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false); + + var received = await ConsumeOneMessageAsync(adminChannel, queueName.QueueName, cancellationToken) + .ConfigureAwait(false); + + using (Assert.Multiple()) + { + _ = await Assert.That(received).IsNotNull(); + var body = Encoding.UTF8.GetString(received!.Body.ToArray()); + _ = await Assert.That(body).IsEqualTo(outboxMessage.Payload); + _ = await Assert.That(received.BasicProperties.MessageId).IsEqualTo(outboxMessage.Id.ToString()); + _ = await Assert.That(received.BasicProperties.ContentType).IsEqualTo("application/json"); + } + } + + [Test] + public async Task SendBatchAsync_Publishes_all_messages_to_exchange(CancellationToken cancellationToken) + { + const int messageCount = 5; + var (connection, adminChannel) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false); + + var queueName = await adminChannel + .QueueDeclareAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + await adminChannel + .QueueBindAsync( + queueName.QueueName, + ExchangeName, + routingKey: string.Empty, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + + var adapter = new RabbitMqConnectionAdapter(connection); + using var transport = CreateTransport(adapter); + var messages = Enumerable.Range(0, messageCount).Select(_ => CreateOutboxMessage()).ToList(); + + await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false); + + var receivedMessages = await ConsumeManyMessagesAsync( + adminChannel, + queueName.QueueName, + messageCount, + cancellationToken + ) + .ConfigureAwait(false); + + _ = await Assert.That(receivedMessages.Count).IsEqualTo(messageCount); + } + + [Test] + public async Task IsHealthyAsync_When_connection_open_returns_true(CancellationToken cancellationToken) + { + var (connection, _) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false); + + var adapter = new RabbitMqConnectionAdapter(connection); + using var transport = CreateTransport(adapter); + + // Trigger channel creation by sending a message + await transport.SendAsync(CreateOutboxMessage(), cancellationToken).ConfigureAwait(false); + + var healthy = await transport.IsHealthyAsync(cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(healthy).IsTrue(); + } + + [Test] + public async Task IsHealthyAsync_Before_first_send_returns_false(CancellationToken cancellationToken) + { + var (connection, _) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false); + + var adapter = new RabbitMqConnectionAdapter(connection); + using var transport = CreateTransport(adapter); + + // No sends yet — channel has not been created + var healthy = await transport.IsHealthyAsync(cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(healthy).IsFalse(); + } + + private static RabbitMqMessageTransport CreateTransport(IRabbitMqConnectionAdapter adapter) => + new( + adapter, + new SimpleTopicNameResolver(), + Options.Create(new RabbitMqTransportOptions { ExchangeName = ExchangeName }) + ); + + private static OutboxMessage CreateOutboxMessage() => + new() + { + Id = Guid.NewGuid(), + EventType = typeof(IntegrationTestEvent), + Payload = """{"id":"test"}""", + CorrelationId = Guid.NewGuid().ToString(), + CreatedAt = DateTimeOffset.UtcNow, + UpdatedAt = DateTimeOffset.UtcNow, + RetryCount = 0, + ProcessedAt = null, + }; + + private static async Task ConsumeOneMessageAsync( + IChannel channel, + string queueName, + CancellationToken cancellationToken + ) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(TimeSpan.FromSeconds(15)); + cts.Token.Register(() => tcs.TrySetResult(null)); + + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.ReceivedAsync += (_, ea) => + { + tcs.TrySetResult(ea); + return Task.CompletedTask; + }; + + await channel + .BasicConsumeAsync(queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return await tcs.Task.ConfigureAwait(false); + } + + private static async Task> ConsumeManyMessagesAsync( + IChannel channel, + string queueName, + int expectedCount, + CancellationToken cancellationToken + ) + { + var received = new List(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(TimeSpan.FromSeconds(15)); + cts.Token.Register(() => tcs.TrySetResult(false)); + + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.ReceivedAsync += (_, ea) => + { + received.Add(ea); + if (received.Count >= expectedCount) + { + tcs.TrySetResult(true); + } + + return Task.CompletedTask; + }; + + await channel + .BasicConsumeAsync(queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken) + .ConfigureAwait(false); + + await tcs.Task.ConfigureAwait(false); + return received; + } + + private sealed class SimpleTopicNameResolver : ITopicNameResolver + { + public string Resolve(OutboxMessage message) => message.EventType.Name; + } + + private sealed record IntegrationTestEvent; +} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs index 28ffa783..ac869062 100644 --- a/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs +++ b/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs @@ -260,6 +260,92 @@ public async Task Dispose_Is_idempotent(CancellationToken cancellationToken) _ = await Assert.That(channel.DisposeCallCount).IsEqualTo(1); } + [Test] + public async Task SendBatchAsync_When_messages_null_throws(CancellationToken cancellationToken) + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + var exception = await Assert.ThrowsAsync(() => + transport.SendBatchAsync(null!, cancellationToken) + ); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.ParamName).IsEqualTo("messages"); + } + + [Test] + public async Task SendBatchAsync_Publishes_all_messages_with_correct_properties(CancellationToken cancellationToken) + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver, exchangeName: "test-exchange"); + var messages = new[] { CreateOutboxMessage(), CreateOutboxMessage(), CreateOutboxMessage() }; + + await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1); + var channel = connectionAdapter.CreatedChannels.Single(); + _ = await Assert.That(channel.PublishCallCount).IsEqualTo(3); + + foreach (var publishCall in channel.PublishCalls) + { + _ = await Assert.That(publishCall.Exchange).IsEqualTo("test-exchange"); + _ = await Assert.That(publishCall.Mandatory).IsFalse(); + _ = await Assert.That(publishCall.Properties.ContentType).IsEqualTo("application/json"); + } + } + + [Test] + public async Task SendBatchAsync_Uses_single_channel_for_all_messages(CancellationToken cancellationToken) + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + var messages = new[] { CreateOutboxMessage(), CreateOutboxMessage(), CreateOutboxMessage() }; + + await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1); + var channel = connectionAdapter.CreatedChannels.Single(); + _ = await Assert.That(channel.PublishCallCount).IsEqualTo(3); + } + + [Test] + public async Task SendBatchAsync_With_empty_collection_does_not_publish(CancellationToken cancellationToken) + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + await transport.SendBatchAsync([], cancellationToken).ConfigureAwait(false); + + _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1); + var channel = connectionAdapter.CreatedChannels.Single(); + _ = await Assert.That(channel.PublishCallCount).IsEqualTo(0); + } + + [Test] + public async Task SendBatchAsync_Reuses_existing_channel(CancellationToken cancellationToken) + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + // First call creates a channel + await transport.SendAsync(CreateOutboxMessage(), cancellationToken).ConfigureAwait(false); + + // Second call reuses the same channel + await transport + .SendBatchAsync([CreateOutboxMessage(), CreateOutboxMessage()], cancellationToken) + .ConfigureAwait(false); + + _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1); + var channel = connectionAdapter.CreatedChannels.Single(); + _ = await Assert.That(channel.PublishCallCount).IsEqualTo(3); + } + [Test] public async Task Options_ExchangeName_can_be_configured() {