diff --git a/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs
index ffd00224..ef33830c 100644
--- a/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs
+++ b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs
@@ -68,6 +68,34 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
ArgumentNullException.ThrowIfNull(message);
var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false);
+ await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false);
+ }
+
+ ///
+ public async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(messages);
+
+ var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false);
+
+ foreach (var message in messages)
+ {
+ await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ ///
+ /// Publishes a single outbox message to the RabbitMQ exchange using the provided channel.
+ ///
+ /// The channel to use for publishing.
+ /// The outbox message to publish.
+ /// A token to monitor for cancellation requests.
+ private async Task PublishAsync(
+ IRabbitMqChannelAdapter channel,
+ OutboxMessage message,
+ CancellationToken cancellationToken
+ )
+ {
var routingKey = ResolveRoutingKey(message);
var body = Encoding.UTF8.GetBytes(message.Payload);
diff --git a/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqContainerFixture.cs b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqContainerFixture.cs
new file mode 100644
index 00000000..7ebda8ae
--- /dev/null
+++ b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqContainerFixture.cs
@@ -0,0 +1,24 @@
+namespace NetEvolve.Pulse.Tests.Integration.RabbitMQ;
+
+using Microsoft.Extensions.Logging.Abstractions;
+using Testcontainers.RabbitMq;
+using TUnit.Core.Interfaces;
+
+///
+/// Provides a shared RabbitMQ container fixture for integration tests.
+///
+public sealed class RabbitMqContainerFixture : IAsyncDisposable, IAsyncInitializer
+{
+ private readonly RabbitMqContainer _container = new RabbitMqBuilder().WithLogger(NullLogger.Instance).Build();
+
+ ///
+ /// Gets the connection string for the running RabbitMQ container.
+ ///
+ public string ConnectionString => _container.GetConnectionString();
+
+ ///
+ public ValueTask DisposeAsync() => _container.DisposeAsync();
+
+ ///
+ public async Task InitializeAsync() => await _container.StartAsync().ConfigureAwait(false);
+}
diff --git a/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqMessageTransportIntegrationTests.cs b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqMessageTransportIntegrationTests.cs
new file mode 100644
index 00000000..af32d893
--- /dev/null
+++ b/tests/NetEvolve.Pulse.Tests.Integration/RabbitMQ/RabbitMqMessageTransportIntegrationTests.cs
@@ -0,0 +1,256 @@
+namespace NetEvolve.Pulse.Tests.Integration.RabbitMQ;
+
+using System.Text;
+using global::RabbitMQ.Client;
+using global::RabbitMQ.Client.Events;
+using Microsoft.Extensions.Options;
+using NetEvolve.Extensions.TUnit;
+using NetEvolve.Pulse.Extensibility.Outbox;
+using NetEvolve.Pulse.Internals;
+using NetEvolve.Pulse.Outbox;
+using TUnit.Assertions.Extensions;
+using TUnit.Core;
+
+///
+/// Integration tests for against a real RabbitMQ broker.
+///
+[ClassDataSource(Shared = SharedType.PerTestSession)]
+[TestGroup("RabbitMQ")]
+[Timeout(120_000)]
+public sealed class RabbitMqMessageTransportIntegrationTests(RabbitMqContainerFixture containerFixture)
+ : IAsyncDisposable
+{
+ private const string ExchangeName = "pulse.integration.test";
+
+ private IConnection? _connection;
+ private IChannel? _adminChannel;
+
+ private async Task<(IConnection Connection, IChannel AdminChannel)> GetConnectionAndChannelAsync(
+ CancellationToken cancellationToken
+ )
+ {
+ if (_connection is not null && _adminChannel is not null)
+ {
+ return (_connection, _adminChannel);
+ }
+
+ var factory = new ConnectionFactory { Uri = new Uri(containerFixture.ConnectionString) };
+ _connection = await factory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
+ _adminChannel = await _connection
+ .CreateChannelAsync(cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ await _adminChannel
+ .ExchangeDeclareAsync(
+ ExchangeName,
+ ExchangeType.Fanout,
+ durable: false,
+ autoDelete: true,
+ cancellationToken: cancellationToken
+ )
+ .ConfigureAwait(false);
+
+ return (_connection, _adminChannel);
+ }
+
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ if (_adminChannel is not null)
+ {
+ await _adminChannel.CloseAsync().ConfigureAwait(false);
+ _adminChannel.Dispose();
+ }
+
+ if (_connection is not null)
+ {
+ await _connection.CloseAsync().ConfigureAwait(false);
+ await _connection.DisposeAsync().ConfigureAwait(false);
+ }
+ }
+
+ [Test]
+ public async Task SendAsync_Publishes_message_to_exchange(CancellationToken cancellationToken)
+ {
+ var (connection, adminChannel) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
+
+ var queueName = await adminChannel
+ .QueueDeclareAsync(cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ await adminChannel
+ .QueueBindAsync(
+ queueName.QueueName,
+ ExchangeName,
+ routingKey: string.Empty,
+ cancellationToken: cancellationToken
+ )
+ .ConfigureAwait(false);
+
+ var adapter = new RabbitMqConnectionAdapter(connection);
+ using var transport = CreateTransport(adapter);
+ var outboxMessage = CreateOutboxMessage();
+
+ await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
+
+ var received = await ConsumeOneMessageAsync(adminChannel, queueName.QueueName, cancellationToken)
+ .ConfigureAwait(false);
+
+ using (Assert.Multiple())
+ {
+ _ = await Assert.That(received).IsNotNull();
+ var body = Encoding.UTF8.GetString(received!.Body.ToArray());
+ _ = await Assert.That(body).IsEqualTo(outboxMessage.Payload);
+ _ = await Assert.That(received.BasicProperties.MessageId).IsEqualTo(outboxMessage.Id.ToString());
+ _ = await Assert.That(received.BasicProperties.ContentType).IsEqualTo("application/json");
+ }
+ }
+
+ [Test]
+ public async Task SendBatchAsync_Publishes_all_messages_to_exchange(CancellationToken cancellationToken)
+ {
+ const int messageCount = 5;
+ var (connection, adminChannel) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
+
+ var queueName = await adminChannel
+ .QueueDeclareAsync(cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ await adminChannel
+ .QueueBindAsync(
+ queueName.QueueName,
+ ExchangeName,
+ routingKey: string.Empty,
+ cancellationToken: cancellationToken
+ )
+ .ConfigureAwait(false);
+
+ var adapter = new RabbitMqConnectionAdapter(connection);
+ using var transport = CreateTransport(adapter);
+ var messages = Enumerable.Range(0, messageCount).Select(_ => CreateOutboxMessage()).ToList();
+
+ await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false);
+
+ var receivedMessages = await ConsumeManyMessagesAsync(
+ adminChannel,
+ queueName.QueueName,
+ messageCount,
+ cancellationToken
+ )
+ .ConfigureAwait(false);
+
+ _ = await Assert.That(receivedMessages.Count).IsEqualTo(messageCount);
+ }
+
+ [Test]
+ public async Task IsHealthyAsync_When_connection_open_returns_true(CancellationToken cancellationToken)
+ {
+ var (connection, _) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
+
+ var adapter = new RabbitMqConnectionAdapter(connection);
+ using var transport = CreateTransport(adapter);
+
+ // Trigger channel creation by sending a message
+ await transport.SendAsync(CreateOutboxMessage(), cancellationToken).ConfigureAwait(false);
+
+ var healthy = await transport.IsHealthyAsync(cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(healthy).IsTrue();
+ }
+
+ [Test]
+ public async Task IsHealthyAsync_Before_first_send_returns_false(CancellationToken cancellationToken)
+ {
+ var (connection, _) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
+
+ var adapter = new RabbitMqConnectionAdapter(connection);
+ using var transport = CreateTransport(adapter);
+
+ // No sends yet — channel has not been created
+ var healthy = await transport.IsHealthyAsync(cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(healthy).IsFalse();
+ }
+
+ private static RabbitMqMessageTransport CreateTransport(IRabbitMqConnectionAdapter adapter) =>
+ new(
+ adapter,
+ new SimpleTopicNameResolver(),
+ Options.Create(new RabbitMqTransportOptions { ExchangeName = ExchangeName })
+ );
+
+ private static OutboxMessage CreateOutboxMessage() =>
+ new()
+ {
+ Id = Guid.NewGuid(),
+ EventType = typeof(IntegrationTestEvent),
+ Payload = """{"id":"test"}""",
+ CorrelationId = Guid.NewGuid().ToString(),
+ CreatedAt = DateTimeOffset.UtcNow,
+ UpdatedAt = DateTimeOffset.UtcNow,
+ RetryCount = 0,
+ ProcessedAt = null,
+ };
+
+ private static async Task ConsumeOneMessageAsync(
+ IChannel channel,
+ string queueName,
+ CancellationToken cancellationToken
+ )
+ {
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ cts.CancelAfter(TimeSpan.FromSeconds(15));
+ cts.Token.Register(() => tcs.TrySetResult(null));
+
+ var consumer = new AsyncEventingBasicConsumer(channel);
+ consumer.ReceivedAsync += (_, ea) =>
+ {
+ tcs.TrySetResult(ea);
+ return Task.CompletedTask;
+ };
+
+ await channel
+ .BasicConsumeAsync(queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+
+ return await tcs.Task.ConfigureAwait(false);
+ }
+
+ private static async Task> ConsumeManyMessagesAsync(
+ IChannel channel,
+ string queueName,
+ int expectedCount,
+ CancellationToken cancellationToken
+ )
+ {
+ var received = new List();
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ cts.CancelAfter(TimeSpan.FromSeconds(15));
+ cts.Token.Register(() => tcs.TrySetResult(false));
+
+ var consumer = new AsyncEventingBasicConsumer(channel);
+ consumer.ReceivedAsync += (_, ea) =>
+ {
+ received.Add(ea);
+ if (received.Count >= expectedCount)
+ {
+ tcs.TrySetResult(true);
+ }
+
+ return Task.CompletedTask;
+ };
+
+ await channel
+ .BasicConsumeAsync(queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+
+ await tcs.Task.ConfigureAwait(false);
+ return received;
+ }
+
+ private sealed class SimpleTopicNameResolver : ITopicNameResolver
+ {
+ public string Resolve(OutboxMessage message) => message.EventType.Name;
+ }
+
+ private sealed record IntegrationTestEvent;
+}
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs
index 28ffa783..ac869062 100644
--- a/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs
+++ b/tests/NetEvolve.Pulse.Tests.Unit/RabbitMQ/RabbitMqMessageTransportTests.cs
@@ -260,6 +260,92 @@ public async Task Dispose_Is_idempotent(CancellationToken cancellationToken)
_ = await Assert.That(channel.DisposeCallCount).IsEqualTo(1);
}
+ [Test]
+ public async Task SendBatchAsync_When_messages_null_throws(CancellationToken cancellationToken)
+ {
+ var connectionAdapter = new FakeConnectionAdapter();
+ var topicNameResolver = new FakeTopicNameResolver();
+ using var transport = CreateTransport(connectionAdapter, topicNameResolver);
+
+ var exception = await Assert.ThrowsAsync(() =>
+ transport.SendBatchAsync(null!, cancellationToken)
+ );
+
+ _ = await Assert.That(exception).IsNotNull();
+ _ = await Assert.That(exception!.ParamName).IsEqualTo("messages");
+ }
+
+ [Test]
+ public async Task SendBatchAsync_Publishes_all_messages_with_correct_properties(CancellationToken cancellationToken)
+ {
+ var connectionAdapter = new FakeConnectionAdapter();
+ var topicNameResolver = new FakeTopicNameResolver();
+ using var transport = CreateTransport(connectionAdapter, topicNameResolver, exchangeName: "test-exchange");
+ var messages = new[] { CreateOutboxMessage(), CreateOutboxMessage(), CreateOutboxMessage() };
+
+ await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1);
+ var channel = connectionAdapter.CreatedChannels.Single();
+ _ = await Assert.That(channel.PublishCallCount).IsEqualTo(3);
+
+ foreach (var publishCall in channel.PublishCalls)
+ {
+ _ = await Assert.That(publishCall.Exchange).IsEqualTo("test-exchange");
+ _ = await Assert.That(publishCall.Mandatory).IsFalse();
+ _ = await Assert.That(publishCall.Properties.ContentType).IsEqualTo("application/json");
+ }
+ }
+
+ [Test]
+ public async Task SendBatchAsync_Uses_single_channel_for_all_messages(CancellationToken cancellationToken)
+ {
+ var connectionAdapter = new FakeConnectionAdapter();
+ var topicNameResolver = new FakeTopicNameResolver();
+ using var transport = CreateTransport(connectionAdapter, topicNameResolver);
+ var messages = new[] { CreateOutboxMessage(), CreateOutboxMessage(), CreateOutboxMessage() };
+
+ await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1);
+ var channel = connectionAdapter.CreatedChannels.Single();
+ _ = await Assert.That(channel.PublishCallCount).IsEqualTo(3);
+ }
+
+ [Test]
+ public async Task SendBatchAsync_With_empty_collection_does_not_publish(CancellationToken cancellationToken)
+ {
+ var connectionAdapter = new FakeConnectionAdapter();
+ var topicNameResolver = new FakeTopicNameResolver();
+ using var transport = CreateTransport(connectionAdapter, topicNameResolver);
+
+ await transport.SendBatchAsync([], cancellationToken).ConfigureAwait(false);
+
+ _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1);
+ var channel = connectionAdapter.CreatedChannels.Single();
+ _ = await Assert.That(channel.PublishCallCount).IsEqualTo(0);
+ }
+
+ [Test]
+ public async Task SendBatchAsync_Reuses_existing_channel(CancellationToken cancellationToken)
+ {
+ var connectionAdapter = new FakeConnectionAdapter();
+ var topicNameResolver = new FakeTopicNameResolver();
+ using var transport = CreateTransport(connectionAdapter, topicNameResolver);
+
+ // First call creates a channel
+ await transport.SendAsync(CreateOutboxMessage(), cancellationToken).ConfigureAwait(false);
+
+ // Second call reuses the same channel
+ await transport
+ .SendBatchAsync([CreateOutboxMessage(), CreateOutboxMessage()], cancellationToken)
+ .ConfigureAwait(false);
+
+ _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1);
+ var channel = connectionAdapter.CreatedChannels.Single();
+ _ = await Assert.That(channel.PublishCallCount).IsEqualTo(3);
+ }
+
[Test]
public async Task Options_ExchangeName_can_be_configured()
{