Skip to content

Commit 219f631

Browse files
committed
Add retry wrapper for ASB message consumer in generated tests
Like Kafka tests, wrap the ASB consumer with RetryAzureServiceBusMessageConsumer that retries Receive up to 10 times when MT_NONE is returned. This gives CI environments more time for messages to arrive before tests fail. Also updates AzureServiceBusMessageGatewayProvider to create channels directly (injecting the wrapped consumer) instead of using AzureServiceBusChannelFactory. 💘 Generated with Crush
1 parent 077b184 commit 219f631

3 files changed

Lines changed: 134 additions & 6 deletions

File tree

tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/AzureServiceBusMessageGatewayProvider.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,27 @@ public async Task<IAmAMessageProducerAsync> CreateProducerAsync(AzureServiceBusP
8484
public IAmAChannelSync CreateChannel(AzureServiceBusSubscription subscription)
8585
{
8686
var consumerFactory = new AzureServiceBusConsumerFactory(_clientProvider);
87-
var channelFactory = new AzureServiceBusChannelFactory(consumerFactory);
88-
return channelFactory.CreateSyncChannel(subscription);
87+
var consumer = consumerFactory.Create(subscription);
88+
var retryConsumer = new RetryAzureServiceBusMessageConsumer(consumer, maxRetries: 10);
89+
90+
return new Channel(
91+
subscription.ChannelName,
92+
subscription.RoutingKey,
93+
retryConsumer,
94+
subscription.BufferSize);
8995
}
9096

9197
public async Task<IAmAChannelAsync> CreateChannelAsync(AzureServiceBusSubscription subscription, CancellationToken cancellationToken = default)
9298
{
9399
var consumerFactory = new AzureServiceBusConsumerFactory(_clientProvider);
94-
var channelFactory = new AzureServiceBusChannelFactory(consumerFactory);
95-
return await channelFactory.CreateAsyncChannelAsync(subscription, cancellationToken);
100+
var consumer = consumerFactory.Create(subscription);
101+
var retryConsumer = new RetryAzureServiceBusMessageConsumer(consumer, maxRetries: 10);
102+
103+
return new ChannelAsync(
104+
subscription.ChannelName,
105+
subscription.RoutingKey,
106+
retryConsumer,
107+
subscription.BufferSize);
96108
}
97109

98110
public void CleanUp(IAmAMessageProducerSync? producer, IAmAChannelSync? channel, IEnumerable<Message> messages)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace Paramore.Brighter.AzureServiceBus.Tests.MessagingGateway;
6+
7+
/// <summary>
8+
/// A decorator for Azure Service Bus message consumers that retries receiving a message
9+
/// when the underlying consumer returns an empty result or MT_NONE message.
10+
/// This is useful in CI environments where ASB may be slow to deliver messages.
11+
/// </summary>
12+
public class RetryAzureServiceBusMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync
13+
{
14+
private readonly IAmAMessageConsumerSync _innerSync;
15+
private readonly IAmAMessageConsumerAsync _innerAsync;
16+
private readonly int _maxRetries;
17+
18+
/// <summary>
19+
/// Initializes a new instance of the <see cref="RetryAzureServiceBusMessageConsumer"/> class.
20+
/// </summary>
21+
/// <param name="inner">The underlying Azure Service Bus message consumer to decorate.</param>
22+
/// <param name="maxRetries">The maximum number of receive attempts (minimum 1).</param>
23+
public RetryAzureServiceBusMessageConsumer(IAmAMessageConsumerSync inner, int maxRetries = 10)
24+
{
25+
_innerSync = inner ?? throw new ArgumentNullException(nameof(inner));
26+
_innerAsync = inner as IAmAMessageConsumerAsync ?? throw new ArgumentException("Inner consumer must implement IAmAMessageConsumerAsync", nameof(inner));
27+
_maxRetries = Math.Max(1, maxRetries);
28+
}
29+
30+
public void Acknowledge(Message message)
31+
{
32+
_innerSync.Acknowledge(message);
33+
}
34+
35+
public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)
36+
{
37+
return _innerAsync.AcknowledgeAsync(message, cancellationToken);
38+
}
39+
40+
public void Purge()
41+
{
42+
_innerSync.Purge();
43+
}
44+
45+
public Task PurgeAsync(CancellationToken cancellationToken = default)
46+
{
47+
return _innerAsync.PurgeAsync(cancellationToken);
48+
}
49+
50+
public Message[] Receive(TimeSpan? timeOut = null)
51+
{
52+
for (var i = 0; i < _maxRetries; i++)
53+
{
54+
var messages = _innerSync.Receive(timeOut);
55+
if (messages.Length > 0 && messages[0].Header.MessageType != MessageType.MT_NONE)
56+
{
57+
return messages;
58+
}
59+
}
60+
61+
return [new Message()];
62+
}
63+
64+
public async Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)
65+
{
66+
for (var i = 0; i < _maxRetries; i++)
67+
{
68+
var messages = await _innerAsync.ReceiveAsync(timeOut, cancellationToken);
69+
if (messages.Length > 0 && messages[0].Header.MessageType != MessageType.MT_NONE)
70+
{
71+
return messages;
72+
}
73+
}
74+
75+
return [new Message()];
76+
}
77+
78+
public bool Reject(Message message, MessageRejectionReason? reason = null)
79+
{
80+
return _innerSync.Reject(message, reason);
81+
}
82+
83+
public Task<bool> RejectAsync(Message message, MessageRejectionReason? reason = null, CancellationToken cancellationToken = default)
84+
{
85+
return _innerAsync.RejectAsync(message, reason, cancellationToken);
86+
}
87+
88+
public void Nack(Message message)
89+
{
90+
_innerSync.Nack(message);
91+
}
92+
93+
public Task NackAsync(Message message, CancellationToken cancellationToken = default)
94+
{
95+
return _innerAsync.NackAsync(message, cancellationToken);
96+
}
97+
98+
public bool Requeue(Message message, TimeSpan? delay = null)
99+
{
100+
return _innerSync.Requeue(message, delay);
101+
}
102+
103+
public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null, CancellationToken cancellationToken = default)
104+
{
105+
return _innerAsync.RequeueAsync(message, delay, cancellationToken);
106+
}
107+
108+
public void Dispose()
109+
{
110+
_innerSync.Dispose();
111+
}
112+
113+
public async ValueTask DisposeAsync()
114+
{
115+
await _innerAsync.DisposeAsync();
116+
}
117+
}

tests/Paramore.Brighter.AzureServiceBus.Tests/test-configuration.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
"HasSupportToDelayedMessages": true,
1212
"HasSupportToValidateBrokerExistence": true,
1313
"HasSupportToRequeue": true,
14-
"ReceiveMessageTimeoutInMilliseconds": 5000,
15-
"DelayBetweenReceiveMessageInMilliseconds": 1000
14+
"ReceiveMessageTimeoutInMilliseconds": 6000
1615
}
1716
}

0 commit comments

Comments
 (0)