-
-
Notifications
You must be signed in to change notification settings - Fork 8
feat: add publish resilience gate for transient RabbitMQ connection drops #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 6 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
d20b352
fix: add publish resilience gate for connection recovery
niemyjski 608e758
fix: address CI failure and PR review feedback
niemyjski deea0a5
refactor: move lock inside retry loop for channel re-read on each att…
niemyjski 9e623e0
fix: align test assertion with updated error message
niemyjski 00fca54
refactor: move recovery gate inside retry loop, fix disposal and dupl…
niemyjski 4734dfa
docs: clarify PublishRecoveryTimeout is per-attempt, increase test CT…
niemyjski 231f771
Address PR feedback: fix log message, add AnyContext, harden disposal…
niemyjski e90c803
fix: restore recovery gate reset on disposal, remove stale XML docs
niemyjski 0846c38
fix: address review findings -- security, correctness, and code quality
niemyjski f98fc91
fix: simplify CreatePublisherChannelAsync and SanitizeUri per PR feed…
niemyjski e9b9907
fix: volatile channel fields, disposal race, and ParseHostEndpoint fa…
niemyjski 767e2d0
fix: update stale x-delay comment to reflect current Int32 cast
niemyjski 40900ae
fix: handle unbracketed IPv6 in ParseHostEndpoint
niemyjski File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| using System.Runtime.CompilerServices; | ||
|
|
||
| [assembly: InternalsVisibleTo("Foundatio.RabbitMQ.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100a9357232b9bcad78fd310297fdb41bf42816ee2ca9ccdace999889de2badb6f06df2de1d9f2c8cb17b21f5311f11d6bb328d55e0dd9fe8adc5e2dc4610028c1bdacb3355d2e239b81d0bb0ac83e615fc641f8a3ec49e4fad8e305994953d448ef7b38e8c256601e54af19c035b562e3e5e5461c2a93b8dd11936e451b05034a2")] |
159 changes: 159 additions & 0 deletions
159
tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| using System; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Foundatio.Messaging; | ||
| using Foundatio.Tests.Messaging; | ||
| using Xunit; | ||
|
|
||
| namespace Foundatio.RabbitMQ.Tests.Messaging; | ||
|
|
||
| public class RabbitMqPublishResilienceTests(ITestOutputHelper output) : RabbitMqMessageBusTestBase("amqp://localhost:5672", output) | ||
| { | ||
| [Fact] | ||
| public async Task PublishAsync_WithRecoveryTimeoutDisabled_FailsImmediatelyOnConnectionDrop() | ||
| { | ||
|
niemyjski marked this conversation as resolved.
|
||
| await using var messageBus = new RabbitMQMessageBus(o => o | ||
| .ConnectionString(ConnectionString) | ||
| .LoggerFactory(Log) | ||
| .PublishRecoveryTimeout(TimeSpan.Zero)); | ||
|
|
||
| await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken); | ||
|
|
||
| await messageBus.SimulatePublisherConnectionShutdownAsync(); | ||
|
|
||
| using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); | ||
| var ex = await Assert.ThrowsAsync<MessageBusException>(() => | ||
| messageBus.PublishAsync(new SimpleMessageA { Data = "should fail" }, | ||
| cancellationToken: cts.Token)); | ||
|
|
||
| Assert.Contains("publisher channel is closed", ex.Message); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task PublishAsync_DuringRecovery_WaitsAndSucceeds() | ||
| { | ||
| await using var messageBus = new RabbitMQMessageBus(o => o | ||
| .ConnectionString(ConnectionString) | ||
| .LoggerFactory(Log) | ||
| .PublishRecoveryTimeout(TimeSpan.FromSeconds(15))); | ||
|
|
||
| await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken); | ||
|
|
||
| messageBus.SimulatePublisherConnectionLost(); | ||
|
|
||
| using var publishCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); | ||
| var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "during recovery" }, | ||
| cancellationToken: publishCts.Token); | ||
|
|
||
| await Task.Delay(100, TestCancellationToken); | ||
| Assert.False(publishTask.IsCompleted); | ||
|
|
||
| messageBus.SimulatePublisherRecoverySucceeded(); | ||
|
|
||
| await publishTask; | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task PublishAsync_RecoveryTimeout_ThrowsMessageBusException() | ||
| { | ||
| await using var messageBus = new RabbitMQMessageBus(o => o | ||
| .ConnectionString(ConnectionString) | ||
| .LoggerFactory(Log) | ||
| .PublishRecoveryTimeout(TimeSpan.FromMilliseconds(500))); | ||
|
|
||
| await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken); | ||
|
|
||
| messageBus.SimulatePublisherConnectionLost(); | ||
|
|
||
| var ex = await Assert.ThrowsAsync<MessageBusException>(() => | ||
| messageBus.PublishAsync(new SimpleMessageA { Data = "should timeout" }, | ||
| cancellationToken: TestCancellationToken)); | ||
|
|
||
| Assert.Contains("connection recovery did not complete within", ex.Message); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task PublishAsync_CancellationDuringRecovery_RespectsCancellation() | ||
| { | ||
| await using var messageBus = new RabbitMQMessageBus(o => o | ||
| .ConnectionString(ConnectionString) | ||
| .LoggerFactory(Log) | ||
| .PublishRecoveryTimeout(TimeSpan.FromSeconds(30))); | ||
|
|
||
| await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken); | ||
|
|
||
| messageBus.SimulatePublisherConnectionLost(); | ||
|
|
||
| using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); | ||
|
|
||
| await Assert.ThrowsAnyAsync<OperationCanceledException>(() => | ||
| messageBus.PublishAsync(new SimpleMessageA { Data = "should cancel" }, cancellationToken: cts.Token)); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task PublishAsync_WhenConnectionHealthy_SucceedsImmediately() | ||
| { | ||
| await using var messageBus = new RabbitMQMessageBus(o => o | ||
| .ConnectionString(ConnectionString) | ||
| .LoggerFactory(Log) | ||
| .PublishRecoveryTimeout(TimeSpan.FromSeconds(10))); | ||
|
|
||
| var exception = await Record.ExceptionAsync(() => | ||
| messageBus.PublishAsync(new SimpleMessageA { Data = "should succeed immediately" }, | ||
| cancellationToken: TestCancellationToken)); | ||
|
|
||
| Assert.Null(exception); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task PublishAsync_DuringDisposal_FailsFastWithOperationCanceledException() | ||
| { | ||
| var messageBus = new RabbitMQMessageBus(o => o | ||
| .ConnectionString(ConnectionString) | ||
| .LoggerFactory(Log) | ||
| .PublishRecoveryTimeout(TimeSpan.FromSeconds(30))); | ||
|
|
||
| await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken); | ||
|
|
||
| messageBus.SimulatePublisherConnectionLost(); | ||
|
|
||
| using var publishCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); | ||
| var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "should fail on disposal" }, | ||
| cancellationToken: publishCts.Token); | ||
|
|
||
| await Task.Delay(50, TestCancellationToken); | ||
| Assert.False(publishTask.IsCompleted); | ||
|
|
||
| await messageBus.DisposeAsync(); | ||
|
|
||
| await Assert.ThrowsAnyAsync<OperationCanceledException>(() => publishTask); | ||
|
niemyjski marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| [Fact] | ||
| public async Task PublishAsync_RecoveryErrorDoesNotOpenGate_WaitsUntilTimeout() | ||
| { | ||
| await using var messageBus = new RabbitMQMessageBus(o => o | ||
| .ConnectionString(ConnectionString) | ||
| .LoggerFactory(Log) | ||
| .PublishRecoveryTimeout(TimeSpan.FromMilliseconds(500))); | ||
|
|
||
| await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken); | ||
|
|
||
| messageBus.SimulatePublisherConnectionLost(); | ||
|
|
||
| using var publishCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); | ||
| var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "should stay blocked" }, | ||
| cancellationToken: publishCts.Token); | ||
|
|
||
| await Task.Delay(50, TestCancellationToken); | ||
| Assert.False(publishTask.IsCompleted); | ||
|
|
||
| await messageBus.SimulatePublisherConnectionRecoveryErrorAsync(); | ||
|
|
||
| await Task.Delay(50, TestCancellationToken); | ||
| Assert.False(publishTask.IsCompleted); | ||
|
|
||
| var ex = await Assert.ThrowsAsync<MessageBusException>(() => publishTask); | ||
| Assert.Contains("connection recovery did not complete within", ex.Message); | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.