feat: add publish resilience gate for transient RabbitMQ connection drops#77
Conversation
During transient RabbitMQ connection drops, PublishAsync now suspends rather than failing immediately. An AsyncManualResetEvent gate blocks publishes while the connection is recovering and resumes them automatically once recovery succeeds. Key behaviors: - Gate closes on non-application connection shutdown - Gate opens on successful recovery - Gate stays closed on recovery error (auto-recovery keeps retrying) - Gate is signaled during disposal for fast shutdown - Configurable PublishRecoveryTimeout (default 10s) - TimeoutException wrapped in MessageBusException for consistent API Closes #76
There was a problem hiding this comment.
Pull request overview
This PR addresses transient RabbitMQ connection drops causing PublishAsync to fail before auto-recovery completes by introducing a publish “readiness gate” that pauses publishing during recovery (with a configurable timeout), then resumes once recovery succeeds.
Changes:
- Added an
AsyncManualResetEventgate inRabbitMQMessageBusto suspend publishes during connection recovery and time out with aMessageBusException. - Introduced
PublishRecoveryTimeoutoption (+ builder method) to configure/disable the recovery wait behavior. - Added resilience-focused tests and
InternalsVisibleToto enable test-only simulation hooks.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs | Adds tests for publish behavior during simulated recovery/timeout/cancellation/disposal scenarios. |
| src/Foundatio.RabbitMQ/Properties/AssemblyInfo.cs | Grants test assembly access to internals used for recovery simulations. |
| src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs | Adds PublishRecoveryTimeout option and fluent builder configuration. |
| src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs | Implements publish recovery gate, wires it into connection shutdown/recovery events, and adds simulation helpers. |
Comments suppressed due to low confidence (2)
src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs:563
- This warning always says publishes will "wait up to {Timeout}" when the connection drops, but when
PublishRecoveryTimeoutisTimeSpan.Zerothe gate is disabled and publishes will not wait. Consider adjusting the log message (or branching) to reflect the configured behavior (wait vs fail-fast) to avoid misleading operational output.
if (e.Initiator != ShutdownInitiator.Application)
{
_publisherReady.Reset();
_logger.LogWarning("Publisher connection lost (Reply Code: {ReplyCode}, Reason: {ReplyText}). Publishes will wait up to {Timeout:g} for recovery.",
e.ReplyCode, e.ReplyText, _options.PublishRecoveryTimeout);
}
tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs:152
- The name
PublishAsync_RecoveryError_StaysBlockedsuggests a recovery-error event is simulated, but the test never triggersConnectionRecoveryError(it only leaves the gate closed until the timeout). Consider either renaming this test to reflect what it actually verifies (timeout while recovery never completes) or adding a simulation that invokes the recovery-error path and asserts the gate remains closed untilRecoverySucceeded.
[Fact]
public async Task PublishAsync_RecoveryError_StaysBlocked()
{
await using var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.FromMilliseconds(500)));
await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);
messageBus.SimulatePublisherConnectionLost();
var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "should stay blocked" },
cancellationToken: new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token);
await Task.Delay(100, TestCancellationToken);
Assert.False(publishTask.IsCompleted);
await Task.Delay(100, TestCancellationToken);
Assert.False(publishTask.IsCompleted);
var ex = await Assert.ThrowsAsync<MessageBusException>(() => publishTask);
Assert.Contains("connection recovery did not complete within", ex.Message);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Fix CI: SimulatePublisherConnectionShutdownAsync now nulls _publisherChannel to properly represent a real disconnect (test was passing because channel stayed alive when gate was disabled) - Fix CTS disposal: all CancellationTokenSource instances in tests now use 'using' declarations - Fix timeout message: use TotalMilliseconds instead of TotalSeconds to avoid misleading "0s" for sub-second timeouts - Add SimulatePublisherConnectionRecoveryErrorAsync for proper recovery error testing - Rename test to PublishAsync_RecoveryErrorDoesNotOpenGate_WaitsUntilTimeout and add actual recovery error simulation call - Add input validation: PublishRecoveryTimeout rejects negative values
…empt Applies industry best practice (NServiceBus/EasyNetQ pattern): re-read _publisherChannel on each retry attempt rather than capturing once. Changes: - Lock acquired/released per retry (not held across all retries) - IsOpen check rejects dead channels before attempting publish - AlreadyClosedException from BasicPublishAsync triggers retry with fresh channel state - Other publishers and recovery handlers can proceed between retries
The IsOpen check changed the message from "publisher channel was closed" to "publisher channel is closed or unavailable" -- update assertion to match.
…icate checks - Gate check now runs on EACH retry attempt, preventing retries from burning through the budget instantly when connection flaps - Remove _publisherReady.Set() from CleanupAsync; the base class DisposedCancellationToken already cancels in-flight publishes before CleanupAsync runs, so manual gate-open is unnecessary and caused publishers to unblock into a closed channel - Remove redundant _isPublisherBlocked check from PublishImplAsync; the authoritative check inside the lock in PublishMessageAsync is the only one that matters (outer check was stale/racy) - Update disposal test to assert OperationCanceledException (correct behavior when disposal cancellation unblocks the gate wait)
…S margin - Update XML doc comments to explicitly state the timeout is per-attempt (resilience policy may retry, so total wall-clock can exceed the value) - Increase CTS timeout in recovery-error test from 5s to 10s to prevent CI flakiness (3 retry attempts x 500ms + backoff = ~4.5s)
… test - Branch log message for timeout=0 vs timeout>0 in OnPublisherConnectionOnConnectionShutdownAsync - Add .AnyContext() to SimulatePublisherConnectionShutdownAsync - Wrap disposal test in try/finally for leak safety
- Restore _publisherReady.Set() in CleanupAsync to unblock publishers waiting on the recovery gate during disposal (removed in 00fca54). Without this, publishers with long PublishRecoveryTimeout values rely solely on DisposedCancellationToken propagation which is fragile. - Remove stale <remarks> on PublishImplAsync advising channel-per-thread patterns that don't apply (single channel + AsyncLock architecture). - Document blocked-state fail-fast design decision inline.
- Sanitize connection string in constructor exceptions to avoid leaking credentials (strip userinfo, show only scheme/host/port/path) - Guard DeliveryDelay overflow: fail with clear ArgumentOutOfRangeException instead of unhelpful OverflowException when delay > Int32.MaxValue ms - Handle IPv6 bracket notation in ParseHostEndpoint ([::1]:port) - Extract duplicated publisher channel creation into CreatePublisherChannelAsync - Remove stale empty XML doc tags on PublishImplAsync and CreateConnectionAsync - Add PERF comments at ToArray() call sites and publish lock documenting allocation/serialization trade-offs (tracked in FoundatioFx/Foundatio#512)
…back - Remove async/await from CreatePublisherChannelAsync and return the Task directly since there is no code after the await and no using scope to clean up - Remove the SanitizeUri(string) overload; the only call site where URI parse failed cannot safely echo the value, so the message no longer includes it - URI sanitization is still applied at the scheme-check exception via the Uri overload
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs:875
- In ParseHostEndpoint, when the string contains a colon but the port portion is not a valid int (e.g., "host:abc"), the fallback currently creates an endpoint with HostName set to the entire trimmed input (including ":abc"). This yields an invalid host and breaks failover parsing. Consider falling back to the parsed hostname with defaultPort (similar to the successful parse path), rather than using the original trimmed string.
: new AmqpTcpEndpoint(trimmed, defaultPort);
}
private void RegisterPublisherConnectionEventHandlers()
{
if (_publisherConnection is null)
throw new MessageBusException("Publisher connection must be initialized before registering event handlers.");
…llback - Add volatile to _publisherChannel/_subscriberChannel for safe double-checked locking on ARM64 (outer null check needs memory barrier) - Move _publisherReady.Set() after connection close in CleanupAsync to prevent unblocked publishers from racing into a disposing channel - Fix ParseHostEndpoint: invalid port fallback now uses parsed hostname instead of the full trimmed string (e.g. "host:abc" -> "host", not "host:abc")
Summary
Fixes publish resilience during transient RabbitMQ connection drops. Previously,
PublishAsyncwould fail immediately when the broker connection was momentarily lost (e.g., network blip, broker restart). Now publishes suspend transparently during recovery and resume automatically.Problem
When the RabbitMQ connection drops, the .NET client's auto-recovery kicks in (default: 5s
NetworkRecoveryInterval). During this window, anyPublishAsynccall would throwAlreadyClosedExceptionor similar failures. The resilience policy's retry budget (3 attempts, 1s+2s backoff = ~3s total) was often exhausted before recovery completed (~5s), causing unnecessary message loss.Solution: Recovery Gate Pattern
An
AsyncManualResetEvent("gate") suspends publishes during connection recovery and releases them when the connection is restored. The gate is checked inside the resilience policy's retry loop, meaning:DisposedCancellationToken) cleanly unblocks waiting publishersIsOpencheck on the channel provides defense-in-depth for edge casesArchitecture
Key Design Decisions
CleanupAsynccloses connections first (nulling channel), then opens gate; waiters resume into null-channel check and fail cleanly. Base classDisposedCancellationTokenalso fires, cancelling linked tokens used byWaitAsync_isPublisherBlockedcheck (inside lock)PublishRecoveryTimeoutoption (default: 10s)IsOpencheck per attemptConfiguration
Set to
TimeSpan.Zerofor pre-fix behavior (immediate failure on connection drop).Changes
RabbitMQMessageBus.cs: AddedAsyncManualResetEventgate, restructuredPublishMessageAsyncwith gate inside retry loop, removed redundant outer_isPublisherBlockedcheck, leveraged disposal cancellationRabbitMQMessageBusOptions.cs: AddedPublishRecoveryTimeoutoption with builder and validationProperties/AssemblyInfo.cs: AddedInternalsVisibleTofor test project (simulation methods)RabbitMqPublishResilienceTests.cs: 7 tests covering: timeout-disabled, recovery wait, timeout expiry, cancellation, healthy publish, disposal, recovery-errorTest Plan
dotnet test)MessageBusExceptionon expiryOperationCanceledException)