Skip to content

feat: add publish resilience gate for transient RabbitMQ connection drops#77

Merged
niemyjski merged 13 commits into
mainfrom
fix/publish-resilience-gate
May 20, 2026
Merged

feat: add publish resilience gate for transient RabbitMQ connection drops#77
niemyjski merged 13 commits into
mainfrom
fix/publish-resilience-gate

Conversation

@niemyjski
Copy link
Copy Markdown
Member

@niemyjski niemyjski commented May 19, 2026

Summary

Fixes publish resilience during transient RabbitMQ connection drops. Previously, PublishAsync would fail immediately when the broker connection was momentarily lost (e.g., network blip, broker restart). Now publishes suspend transparently during recovery and resume automatically.

Problem

When the RabbitMQ connection drops, the .NET client's auto-recovery kicks in (default: 5s NetworkRecoveryInterval). During this window, any PublishAsync call would throw AlreadyClosedException or similar failures. The resilience policy's retry budget (3 attempts, 1s+2s backoff = ~3s total) was often exhausted before recovery completed (~5s), causing unnecessary message loss.

Solution: Recovery Gate Pattern

An AsyncManualResetEvent ("gate") suspends publishes during connection recovery and releases them when the connection is restored. The gate is checked inside the resilience policy's retry loop, meaning:

  • Each retry attempt re-evaluates gate status (connection flaps don't burn the retry budget)
  • Disposal cancellation (from base class DisposedCancellationToken) cleanly unblocks waiting publishers
  • The IsOpen check on the channel provides defense-in-depth for edge cases

Architecture

PublishMessageAsync
  -> _resiliencePolicy.ExecuteAsync (retry loop)
       -> Check gate (wait if connection recovering)
       -> Acquire lock
       -> Verify channel.IsOpen
       -> BasicPublishAsync

Key Design Decisions

Decision Rationale
Gate inside retry loop Prevents instant retry-budget exhaustion during connection flaps
Gate opened after connection close on disposal CleanupAsync closes connections first (nulling channel), then opens gate; waiters resume into null-channel check and fail cleanly. Base class DisposedCancellationToken also fires, cancelling linked tokens used by WaitAsync
Single _isPublisherBlocked check (inside lock) Outer check was stale/racy; inner lock-protected check is authoritative
PublishRecoveryTimeout option (default: 10s) Configurable per-publisher; zero disables waiting (fail-fast for legacy behavior)
IsOpen check per attempt Defense against race between recovery signal and channel ready state

Configuration

var bus = new RabbitMQMessageBus(o => o
    .ConnectionString("amqp://localhost")
    .PublishRecoveryTimeout(TimeSpan.FromSeconds(15))); // wait up to 15s for recovery

Set to TimeSpan.Zero for pre-fix behavior (immediate failure on connection drop).

Changes

  • RabbitMQMessageBus.cs: Added AsyncManualResetEvent gate, restructured PublishMessageAsync with gate inside retry loop, removed redundant outer _isPublisherBlocked check, leveraged disposal cancellation
  • RabbitMQMessageBusOptions.cs: Added PublishRecoveryTimeout option with builder and validation
  • Properties/AssemblyInfo.cs: Added InternalsVisibleTo for test project (simulation methods)
  • RabbitMqPublishResilienceTests.cs: 7 tests covering: timeout-disabled, recovery wait, timeout expiry, cancellation, healthy publish, disposal, recovery-error

Test Plan

  • All tests pass (dotnet test)
  • Build with 0 warnings, 0 errors
  • Timeout=zero fails immediately (after retry exhaustion)
  • Gate blocks during recovery, unblocks on success
  • Configurable timeout throws MessageBusException on expiry
  • Caller cancellation respected during gate wait
  • Disposal cancels waiting publishers cleanly (OperationCanceledException)
  • Recovery errors keep gate closed (no premature unblock)

During transient RabbitMQ connection drops, PublishAsync now suspends
rather than failing immediately. An AsyncManualResetEvent gate blocks
publishes while the connection is recovering and resumes them
automatically once recovery succeeds.

Key behaviors:
- Gate closes on non-application connection shutdown
- Gate opens on successful recovery
- Gate stays closed on recovery error (auto-recovery keeps retrying)
- Gate is signaled during disposal for fast shutdown
- Configurable PublishRecoveryTimeout (default 10s)
- TimeoutException wrapped in MessageBusException for consistent API

Closes #76
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses transient RabbitMQ connection drops causing PublishAsync to fail before auto-recovery completes by introducing a publish “readiness gate” that pauses publishing during recovery (with a configurable timeout), then resumes once recovery succeeds.

Changes:

  • Added an AsyncManualResetEvent gate in RabbitMQMessageBus to suspend publishes during connection recovery and time out with a MessageBusException.
  • Introduced PublishRecoveryTimeout option (+ builder method) to configure/disable the recovery wait behavior.
  • Added resilience-focused tests and InternalsVisibleTo to enable test-only simulation hooks.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs Adds tests for publish behavior during simulated recovery/timeout/cancellation/disposal scenarios.
src/Foundatio.RabbitMQ/Properties/AssemblyInfo.cs Grants test assembly access to internals used for recovery simulations.
src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs Adds PublishRecoveryTimeout option and fluent builder configuration.
src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs Implements publish recovery gate, wires it into connection shutdown/recovery events, and adds simulation helpers.
Comments suppressed due to low confidence (2)

src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs:563

  • This warning always says publishes will "wait up to {Timeout}" when the connection drops, but when PublishRecoveryTimeout is TimeSpan.Zero the gate is disabled and publishes will not wait. Consider adjusting the log message (or branching) to reflect the configured behavior (wait vs fail-fast) to avoid misleading operational output.
        if (e.Initiator != ShutdownInitiator.Application)
        {
            _publisherReady.Reset();
            _logger.LogWarning("Publisher connection lost (Reply Code: {ReplyCode}, Reason: {ReplyText}). Publishes will wait up to {Timeout:g} for recovery.",
                e.ReplyCode, e.ReplyText, _options.PublishRecoveryTimeout);
        }

tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs:152

  • The name PublishAsync_RecoveryError_StaysBlocked suggests a recovery-error event is simulated, but the test never triggers ConnectionRecoveryError (it only leaves the gate closed until the timeout). Consider either renaming this test to reflect what it actually verifies (timeout while recovery never completes) or adding a simulation that invokes the recovery-error path and asserts the gate remains closed until RecoverySucceeded.
    [Fact]
    public async Task PublishAsync_RecoveryError_StaysBlocked()
    {
        await using var messageBus = new RabbitMQMessageBus(o => o
            .ConnectionString(ConnectionString)
            .LoggerFactory(Log)
            .PublishRecoveryTimeout(TimeSpan.FromMilliseconds(500)));

        await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);

        messageBus.SimulatePublisherConnectionLost();

        var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "should stay blocked" },
            cancellationToken: new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token);

        await Task.Delay(100, TestCancellationToken);
        Assert.False(publishTask.IsCompleted);

        await Task.Delay(100, TestCancellationToken);
        Assert.False(publishTask.IsCompleted);

        var ex = await Assert.ThrowsAsync<MessageBusException>(() => publishTask);
        Assert.Contains("connection recovery did not complete within", ex.Message);

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs Outdated
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs Outdated
Comment thread tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs Outdated
niemyjski added 4 commits May 19, 2026 15:24
- Fix CI: SimulatePublisherConnectionShutdownAsync now nulls _publisherChannel
  to properly represent a real disconnect (test was passing because channel
  stayed alive when gate was disabled)
- Fix CTS disposal: all CancellationTokenSource instances in tests now use
  'using' declarations
- Fix timeout message: use TotalMilliseconds instead of TotalSeconds to
  avoid misleading "0s" for sub-second timeouts
- Add SimulatePublisherConnectionRecoveryErrorAsync for proper recovery
  error testing
- Rename test to PublishAsync_RecoveryErrorDoesNotOpenGate_WaitsUntilTimeout
  and add actual recovery error simulation call
- Add input validation: PublishRecoveryTimeout rejects negative values
…empt

Applies industry best practice (NServiceBus/EasyNetQ pattern): re-read
_publisherChannel on each retry attempt rather than capturing once.

Changes:
- Lock acquired/released per retry (not held across all retries)
- IsOpen check rejects dead channels before attempting publish
- AlreadyClosedException from BasicPublishAsync triggers retry with
  fresh channel state
- Other publishers and recovery handlers can proceed between retries
The IsOpen check changed the message from "publisher channel was closed"
to "publisher channel is closed or unavailable" -- update assertion to match.
…icate checks

- Gate check now runs on EACH retry attempt, preventing retries from
  burning through the budget instantly when connection flaps
- Remove _publisherReady.Set() from CleanupAsync; the base class
  DisposedCancellationToken already cancels in-flight publishes before
  CleanupAsync runs, so manual gate-open is unnecessary and caused
  publishers to unblock into a closed channel
- Remove redundant _isPublisherBlocked check from PublishImplAsync;
  the authoritative check inside the lock in PublishMessageAsync is
  the only one that matters (outer check was stale/racy)
- Update disposal test to assert OperationCanceledException (correct
  behavior when disposal cancellation unblocks the gate wait)
@niemyjski niemyjski changed the title fix: add publish resilience gate for connection recovery feat: add publish resilience gate for transient RabbitMQ connection drops May 19, 2026
…S margin

- Update XML doc comments to explicitly state the timeout is per-attempt
  (resilience policy may retry, so total wall-clock can exceed the value)
- Increase CTS timeout in recovery-error test from 5s to 10s to prevent
  CI flakiness (3 retry attempts x 500ms + backoff = ~4.5s)
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs Outdated
Comment thread tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs Outdated
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs
… test

- Branch log message for timeout=0 vs timeout>0 in OnPublisherConnectionOnConnectionShutdownAsync

- Add .AnyContext() to SimulatePublisherConnectionShutdownAsync

- Wrap disposal test in try/finally for leak safety
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.

- Restore _publisherReady.Set() in CleanupAsync to unblock publishers waiting on the recovery gate during disposal (removed in 00fca54). Without this, publishers with long PublishRecoveryTimeout values rely solely on DisposedCancellationToken propagation which is fragile.

- Remove stale <remarks> on PublishImplAsync advising channel-per-thread patterns that don't apply (single channel + AsyncLock architecture).

- Document blocked-state fail-fast design decision inline.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.

- Sanitize connection string in constructor exceptions to avoid leaking
  credentials (strip userinfo, show only scheme/host/port/path)
- Guard DeliveryDelay overflow: fail with clear ArgumentOutOfRangeException
  instead of unhelpful OverflowException when delay > Int32.MaxValue ms
- Handle IPv6 bracket notation in ParseHostEndpoint ([::1]:port)
- Extract duplicated publisher channel creation into CreatePublisherChannelAsync
- Remove stale empty XML doc tags on PublishImplAsync and CreateConnectionAsync
- Add PERF comments at ToArray() call sites and publish lock documenting
  allocation/serialization trade-offs (tracked in FoundatioFx/Foundatio#512)
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs Outdated
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs Outdated
…back

- Remove async/await from CreatePublisherChannelAsync and return the Task
  directly since there is no code after the await and no using scope to clean up
- Remove the SanitizeUri(string) overload; the only call site where URI parse
  failed cannot safely echo the value, so the message no longer includes it
- URI sanitization is still applied at the scheme-check exception via the Uri overload
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs:875

  • In ParseHostEndpoint, when the string contains a colon but the port portion is not a valid int (e.g., "host:abc"), the fallback currently creates an endpoint with HostName set to the entire trimmed input (including ":abc"). This yields an invalid host and breaks failover parsing. Consider falling back to the parsed hostname with defaultPort (similar to the successful parse path), rather than using the original trimmed string.
            : new AmqpTcpEndpoint(trimmed, defaultPort);
    }

    private void RegisterPublisherConnectionEventHandlers()
    {
        if (_publisherConnection is null)
            throw new MessageBusException("Publisher connection must be initialized before registering event handlers.");

Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs Outdated
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
…llback

- Add volatile to _publisherChannel/_subscriberChannel for safe
  double-checked locking on ARM64 (outer null check needs memory barrier)
- Move _publisherReady.Set() after connection close in CleanupAsync to
  prevent unblocked publishers from racing into a disposing channel
- Fix ParseHostEndpoint: invalid port fallback now uses parsed hostname
  instead of the full trimmed string (e.g. "host:abc" -> "host", not
  "host:abc")
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
Comment thread src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.

@niemyjski niemyjski merged commit 3799dc0 into main May 20, 2026
8 checks passed
@niemyjski niemyjski deleted the fix/publish-resilience-gate branch May 20, 2026 14:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants