Skip to content

Commit ec55d7e

Browse files
Copilotsamtrion
andauthored
feat: add SendBatchAsync override and unit/integration tests for RabbitMQ transport
Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/3e8d71d0-5c05-4a68-91c3-639f8e6b0900 Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com>
1 parent a370233 commit ec55d7e

4 files changed

Lines changed: 394 additions & 0 deletions

File tree

src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,34 @@ public async Task SendAsync(OutboxMessage message, CancellationToken cancellatio
6868
ArgumentNullException.ThrowIfNull(message);
6969

7070
var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false);
71+
await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false);
72+
}
73+
74+
/// <inheritdoc />
75+
public async Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
76+
{
77+
ArgumentNullException.ThrowIfNull(messages);
78+
79+
var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false);
80+
81+
foreach (var message in messages)
82+
{
83+
await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false);
84+
}
85+
}
86+
87+
/// <summary>
88+
/// Publishes a single outbox message to the RabbitMQ exchange using the provided channel.
89+
/// </summary>
90+
/// <param name="channel">The channel to use for publishing.</param>
91+
/// <param name="message">The outbox message to publish.</param>
92+
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
93+
private async Task PublishAsync(
94+
IRabbitMqChannelAdapter channel,
95+
OutboxMessage message,
96+
CancellationToken cancellationToken
97+
)
98+
{
7199
var routingKey = ResolveRoutingKey(message);
72100
var body = Encoding.UTF8.GetBytes(message.Payload);
73101

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
namespace NetEvolve.Pulse.Tests.Integration.RabbitMQ;
2+
3+
using Microsoft.Extensions.Logging.Abstractions;
4+
using Testcontainers.RabbitMq;
5+
using TUnit.Core.Interfaces;
6+
7+
/// <summary>
8+
/// Provides a shared RabbitMQ container fixture for integration tests.
9+
/// </summary>
10+
public sealed class RabbitMqContainerFixture : IAsyncDisposable, IAsyncInitializer
11+
{
12+
private readonly RabbitMqContainer _container = new RabbitMqBuilder().WithLogger(NullLogger.Instance).Build();
13+
14+
/// <summary>
15+
/// Gets the connection string for the running RabbitMQ container.
16+
/// </summary>
17+
public string ConnectionString => _container.GetConnectionString();
18+
19+
/// <inheritdoc />
20+
public ValueTask DisposeAsync() => _container.DisposeAsync();
21+
22+
/// <inheritdoc />
23+
public async Task InitializeAsync() => await _container.StartAsync().ConfigureAwait(false);
24+
}
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
namespace NetEvolve.Pulse.Tests.Integration.RabbitMQ;
2+
3+
using System.Text;
4+
using global::RabbitMQ.Client;
5+
using global::RabbitMQ.Client.Events;
6+
using Microsoft.Extensions.Options;
7+
using NetEvolve.Extensions.TUnit;
8+
using NetEvolve.Pulse.Extensibility.Outbox;
9+
using NetEvolve.Pulse.Internals;
10+
using NetEvolve.Pulse.Outbox;
11+
using TUnit.Assertions.Extensions;
12+
using TUnit.Core;
13+
14+
/// <summary>
15+
/// Integration tests for <see cref="RabbitMqMessageTransport"/> against a real RabbitMQ broker.
16+
/// </summary>
17+
[ClassDataSource<RabbitMqContainerFixture>(Shared = SharedType.PerTestSession)]
18+
[TestGroup("RabbitMQ")]
19+
[Timeout(120_000)]
20+
public sealed class RabbitMqMessageTransportIntegrationTests(RabbitMqContainerFixture containerFixture)
21+
: IAsyncDisposable
22+
{
23+
private const string ExchangeName = "pulse.integration.test";
24+
25+
private IConnection? _connection;
26+
private IChannel? _adminChannel;
27+
28+
private async Task<(IConnection Connection, IChannel AdminChannel)> GetConnectionAndChannelAsync(
29+
CancellationToken cancellationToken
30+
)
31+
{
32+
if (_connection is not null && _adminChannel is not null)
33+
{
34+
return (_connection, _adminChannel);
35+
}
36+
37+
var factory = new ConnectionFactory { Uri = new Uri(containerFixture.ConnectionString) };
38+
_connection = await factory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
39+
_adminChannel = await _connection
40+
.CreateChannelAsync(cancellationToken: cancellationToken)
41+
.ConfigureAwait(false);
42+
await _adminChannel
43+
.ExchangeDeclareAsync(
44+
ExchangeName,
45+
ExchangeType.Fanout,
46+
durable: false,
47+
autoDelete: true,
48+
cancellationToken: cancellationToken
49+
)
50+
.ConfigureAwait(false);
51+
52+
return (_connection, _adminChannel);
53+
}
54+
55+
/// <inheritdoc />
56+
public async ValueTask DisposeAsync()
57+
{
58+
if (_adminChannel is not null)
59+
{
60+
await _adminChannel.CloseAsync().ConfigureAwait(false);
61+
_adminChannel.Dispose();
62+
}
63+
64+
if (_connection is not null)
65+
{
66+
await _connection.CloseAsync().ConfigureAwait(false);
67+
await _connection.DisposeAsync().ConfigureAwait(false);
68+
}
69+
}
70+
71+
[Test]
72+
public async Task SendAsync_Publishes_message_to_exchange(CancellationToken cancellationToken)
73+
{
74+
var (connection, adminChannel) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
75+
76+
var queueName = await adminChannel
77+
.QueueDeclareAsync(cancellationToken: cancellationToken)
78+
.ConfigureAwait(false);
79+
await adminChannel
80+
.QueueBindAsync(
81+
queueName.QueueName,
82+
ExchangeName,
83+
routingKey: string.Empty,
84+
cancellationToken: cancellationToken
85+
)
86+
.ConfigureAwait(false);
87+
88+
var adapter = new RabbitMqConnectionAdapter(connection);
89+
using var transport = CreateTransport(adapter);
90+
var outboxMessage = CreateOutboxMessage();
91+
92+
await transport.SendAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
93+
94+
var received = await ConsumeOneMessageAsync(adminChannel, queueName.QueueName, cancellationToken)
95+
.ConfigureAwait(false);
96+
97+
using (Assert.Multiple())
98+
{
99+
_ = await Assert.That(received).IsNotNull();
100+
var body = Encoding.UTF8.GetString(received!.Body.ToArray());
101+
_ = await Assert.That(body).IsEqualTo(outboxMessage.Payload);
102+
_ = await Assert.That(received.BasicProperties.MessageId).IsEqualTo(outboxMessage.Id.ToString());
103+
_ = await Assert.That(received.BasicProperties.ContentType).IsEqualTo("application/json");
104+
}
105+
}
106+
107+
[Test]
108+
public async Task SendBatchAsync_Publishes_all_messages_to_exchange(CancellationToken cancellationToken)
109+
{
110+
const int messageCount = 5;
111+
var (connection, adminChannel) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
112+
113+
var queueName = await adminChannel
114+
.QueueDeclareAsync(cancellationToken: cancellationToken)
115+
.ConfigureAwait(false);
116+
await adminChannel
117+
.QueueBindAsync(
118+
queueName.QueueName,
119+
ExchangeName,
120+
routingKey: string.Empty,
121+
cancellationToken: cancellationToken
122+
)
123+
.ConfigureAwait(false);
124+
125+
var adapter = new RabbitMqConnectionAdapter(connection);
126+
using var transport = CreateTransport(adapter);
127+
var messages = Enumerable.Range(0, messageCount).Select(_ => CreateOutboxMessage()).ToList();
128+
129+
await transport.SendBatchAsync(messages, cancellationToken).ConfigureAwait(false);
130+
131+
var receivedMessages = await ConsumeManyMessagesAsync(
132+
adminChannel,
133+
queueName.QueueName,
134+
messageCount,
135+
cancellationToken
136+
)
137+
.ConfigureAwait(false);
138+
139+
_ = await Assert.That(receivedMessages.Count).IsEqualTo(messageCount);
140+
}
141+
142+
[Test]
143+
public async Task IsHealthyAsync_When_connection_open_returns_true(CancellationToken cancellationToken)
144+
{
145+
var (connection, _) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
146+
147+
var adapter = new RabbitMqConnectionAdapter(connection);
148+
using var transport = CreateTransport(adapter);
149+
150+
// Trigger channel creation by sending a message
151+
await transport.SendAsync(CreateOutboxMessage(), cancellationToken).ConfigureAwait(false);
152+
153+
var healthy = await transport.IsHealthyAsync(cancellationToken).ConfigureAwait(false);
154+
155+
_ = await Assert.That(healthy).IsTrue();
156+
}
157+
158+
[Test]
159+
public async Task IsHealthyAsync_Before_first_send_returns_false(CancellationToken cancellationToken)
160+
{
161+
var (connection, _) = await GetConnectionAndChannelAsync(cancellationToken).ConfigureAwait(false);
162+
163+
var adapter = new RabbitMqConnectionAdapter(connection);
164+
using var transport = CreateTransport(adapter);
165+
166+
// No sends yet — channel has not been created
167+
var healthy = await transport.IsHealthyAsync(cancellationToken).ConfigureAwait(false);
168+
169+
_ = await Assert.That(healthy).IsFalse();
170+
}
171+
172+
private static RabbitMqMessageTransport CreateTransport(IRabbitMqConnectionAdapter adapter) =>
173+
new(
174+
adapter,
175+
new SimpleTopicNameResolver(),
176+
Options.Create(new RabbitMqTransportOptions { ExchangeName = ExchangeName })
177+
);
178+
179+
private static OutboxMessage CreateOutboxMessage() =>
180+
new()
181+
{
182+
Id = Guid.NewGuid(),
183+
EventType = typeof(IntegrationTestEvent),
184+
Payload = """{"id":"test"}""",
185+
CorrelationId = Guid.NewGuid().ToString(),
186+
CreatedAt = DateTimeOffset.UtcNow,
187+
UpdatedAt = DateTimeOffset.UtcNow,
188+
RetryCount = 0,
189+
ProcessedAt = null,
190+
};
191+
192+
private static async Task<BasicDeliverEventArgs?> ConsumeOneMessageAsync(
193+
IChannel channel,
194+
string queueName,
195+
CancellationToken cancellationToken
196+
)
197+
{
198+
var tcs = new TaskCompletionSource<BasicDeliverEventArgs?>(TaskCreationOptions.RunContinuationsAsynchronously);
199+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
200+
cts.CancelAfter(TimeSpan.FromSeconds(15));
201+
cts.Token.Register(() => tcs.TrySetResult(null));
202+
203+
var consumer = new AsyncEventingBasicConsumer(channel);
204+
consumer.ReceivedAsync += (_, ea) =>
205+
{
206+
tcs.TrySetResult(ea);
207+
return Task.CompletedTask;
208+
};
209+
210+
await channel
211+
.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken)
212+
.ConfigureAwait(false);
213+
214+
return await tcs.Task.ConfigureAwait(false);
215+
}
216+
217+
private static async Task<List<BasicDeliverEventArgs>> ConsumeManyMessagesAsync(
218+
IChannel channel,
219+
string queueName,
220+
int expectedCount,
221+
CancellationToken cancellationToken
222+
)
223+
{
224+
var received = new List<BasicDeliverEventArgs>();
225+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
226+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
227+
cts.CancelAfter(TimeSpan.FromSeconds(15));
228+
cts.Token.Register(() => tcs.TrySetResult(false));
229+
230+
var consumer = new AsyncEventingBasicConsumer(channel);
231+
consumer.ReceivedAsync += (_, ea) =>
232+
{
233+
received.Add(ea);
234+
if (received.Count >= expectedCount)
235+
{
236+
tcs.TrySetResult(true);
237+
}
238+
239+
return Task.CompletedTask;
240+
};
241+
242+
await channel
243+
.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken)
244+
.ConfigureAwait(false);
245+
246+
await tcs.Task.ConfigureAwait(false);
247+
return received;
248+
}
249+
250+
private sealed class SimpleTopicNameResolver : ITopicNameResolver
251+
{
252+
public string Resolve(OutboxMessage message) => message.EventType.Name;
253+
}
254+
255+
private sealed record IntegrationTestEvent;
256+
}

0 commit comments

Comments
 (0)