Skip to content

Commit 3799dc0

Browse files
authored
feat: add publish resilience gate for transient RabbitMQ connection drops (#77)
* fix: add publish resilience gate for connection recovery During transient RabbitMQ connection drops, PublishAsync now suspends rather than failing immediately. An AsyncManualResetEvent gate blocks publishes while the connection is recovering and resumes them automatically once recovery succeeds. Key behaviors: - Gate closes on non-application connection shutdown - Gate opens on successful recovery - Gate stays closed on recovery error (auto-recovery keeps retrying) - Gate is signaled during disposal for fast shutdown - Configurable PublishRecoveryTimeout (default 10s) - TimeoutException wrapped in MessageBusException for consistent API Closes #76 * fix: address CI failure and PR review feedback - Fix CI: SimulatePublisherConnectionShutdownAsync now nulls _publisherChannel to properly represent a real disconnect (test was passing because channel stayed alive when gate was disabled) - Fix CTS disposal: all CancellationTokenSource instances in tests now use 'using' declarations - Fix timeout message: use TotalMilliseconds instead of TotalSeconds to avoid misleading "0s" for sub-second timeouts - Add SimulatePublisherConnectionRecoveryErrorAsync for proper recovery error testing - Rename test to PublishAsync_RecoveryErrorDoesNotOpenGate_WaitsUntilTimeout and add actual recovery error simulation call - Add input validation: PublishRecoveryTimeout rejects negative values * refactor: move lock inside retry loop for channel re-read on each attempt Applies industry best practice (NServiceBus/EasyNetQ pattern): re-read _publisherChannel on each retry attempt rather than capturing once. Changes: - Lock acquired/released per retry (not held across all retries) - IsOpen check rejects dead channels before attempting publish - AlreadyClosedException from BasicPublishAsync triggers retry with fresh channel state - Other publishers and recovery handlers can proceed between retries * fix: align test assertion with updated error message The IsOpen check changed the message from "publisher channel was closed" to "publisher channel is closed or unavailable" -- update assertion to match. * refactor: move recovery gate inside retry loop, fix disposal and duplicate checks - Gate check now runs on EACH retry attempt, preventing retries from burning through the budget instantly when connection flaps - Remove _publisherReady.Set() from CleanupAsync; the base class DisposedCancellationToken already cancels in-flight publishes before CleanupAsync runs, so manual gate-open is unnecessary and caused publishers to unblock into a closed channel - Remove redundant _isPublisherBlocked check from PublishImplAsync; the authoritative check inside the lock in PublishMessageAsync is the only one that matters (outer check was stale/racy) - Update disposal test to assert OperationCanceledException (correct behavior when disposal cancellation unblocks the gate wait) * docs: clarify PublishRecoveryTimeout is per-attempt, increase test CTS margin - Update XML doc comments to explicitly state the timeout is per-attempt (resilience policy may retry, so total wall-clock can exceed the value) - Increase CTS timeout in recovery-error test from 5s to 10s to prevent CI flakiness (3 retry attempts x 500ms + backoff = ~4.5s) * Address PR feedback: fix log message, add AnyContext, harden disposal test - Branch log message for timeout=0 vs timeout>0 in OnPublisherConnectionOnConnectionShutdownAsync - Add .AnyContext() to SimulatePublisherConnectionShutdownAsync - Wrap disposal test in try/finally for leak safety * fix: restore recovery gate reset on disposal, remove stale XML docs - Restore _publisherReady.Set() in CleanupAsync to unblock publishers waiting on the recovery gate during disposal (removed in 00fca54). Without this, publishers with long PublishRecoveryTimeout values rely solely on DisposedCancellationToken propagation which is fragile. - Remove stale <remarks> on PublishImplAsync advising channel-per-thread patterns that don't apply (single channel + AsyncLock architecture). - Document blocked-state fail-fast design decision inline. * fix: address review findings -- security, correctness, and code quality - Sanitize connection string in constructor exceptions to avoid leaking credentials (strip userinfo, show only scheme/host/port/path) - Guard DeliveryDelay overflow: fail with clear ArgumentOutOfRangeException instead of unhelpful OverflowException when delay > Int32.MaxValue ms - Handle IPv6 bracket notation in ParseHostEndpoint ([::1]:port) - Extract duplicated publisher channel creation into CreatePublisherChannelAsync - Remove stale empty XML doc tags on PublishImplAsync and CreateConnectionAsync - Add PERF comments at ToArray() call sites and publish lock documenting allocation/serialization trade-offs (tracked in FoundatioFx/Foundatio#512) * fix: simplify CreatePublisherChannelAsync and SanitizeUri per PR feedback - Remove async/await from CreatePublisherChannelAsync and return the Task directly since there is no code after the await and no using scope to clean up - Remove the SanitizeUri(string) overload; the only call site where URI parse failed cannot safely echo the value, so the message no longer includes it - URI sanitization is still applied at the scheme-check exception via the Uri overload * fix: volatile channel fields, disposal race, and ParseHostEndpoint fallback - Add volatile to _publisherChannel/_subscriberChannel for safe double-checked locking on ARM64 (outer null check needs memory barrier) - Move _publisherReady.Set() after connection close in CleanupAsync to prevent unblocked publishers from racing into a disposing channel - Fix ParseHostEndpoint: invalid port fallback now uses parsed hostname instead of the full trimmed string (e.g. "host:abc" -> "host", not "host:abc") * fix: update stale x-delay comment to reflect current Int32 cast * fix: handle unbracketed IPv6 in ParseHostEndpoint
1 parent 44baa63 commit 3799dc0

4 files changed

Lines changed: 345 additions & 63 deletions

File tree

src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs

Lines changed: 151 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ public class RabbitMQMessageBus : MessageBusBase<RabbitMQMessageBusOptions>
2121
private static readonly Version _delayedExchangePluginIncompatibleVersion = new(4, 3);
2222

2323
private readonly AsyncLock _lock = new();
24+
private readonly AsyncManualResetEvent _publisherReady = new(true);
2425
private readonly ConnectionFactory _factory;
2526
private readonly List<AmqpTcpEndpoint> _endpoints;
2627
private IConnection? _publisherConnection;
2728
private IConnection? _subscriberConnection;
28-
private IChannel? _publisherChannel;
29-
private IChannel? _subscriberChannel;
29+
private volatile IChannel? _publisherChannel;
30+
private volatile IChannel? _subscriberChannel;
3031
private AsyncEventingBasicConsumer? _consumer;
3132
private bool? _delayedExchangePluginEnabled;
3233
private Version? _serverVersion;
@@ -39,11 +40,11 @@ public RabbitMQMessageBus(RabbitMQMessageBusOptions options) : base(options)
3940
ArgumentException.ThrowIfNullOrWhiteSpace(options?.ConnectionString, nameof(options.ConnectionString));
4041

4142
if (!Uri.TryCreate(options.ConnectionString, UriKind.Absolute, out var primaryUri))
42-
throw new ArgumentException($"ConnectionString is not a valid URI: {options.ConnectionString}");
43+
throw new ArgumentException("ConnectionString is not a valid URI.");
4344

4445
if (!primaryUri.Scheme.Equals("amqp", StringComparison.OrdinalIgnoreCase) &&
4546
!primaryUri.Scheme.Equals("amqps", StringComparison.OrdinalIgnoreCase))
46-
throw new ArgumentException($"ConnectionString must use amqp:// or amqps:// scheme: {options.ConnectionString}");
47+
throw new ArgumentException($"ConnectionString must use amqp:// or amqps:// scheme: {SanitizeUri(primaryUri)}");
4748

4849
_isQuorumQueue = options.Arguments is not null && options.Arguments.TryGetValue("x-queue-type", out object? queueType) && queueType is string type && String.Equals(type, "quorum", StringComparison.OrdinalIgnoreCase);
4950

@@ -96,6 +97,8 @@ protected override async Task CleanupAsync()
9697

9798
await ClosePublisherConnectionAsync().AnyContext();
9899
await CloseSubscriberConnectionAsync().AnyContext();
100+
101+
_publisherReady.Set();
99102
}
100103

101104
protected override async Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken)
@@ -355,6 +358,7 @@ private async Task RepublishMessageWithIncrementedDeliveryCountAsync(BasicDelive
355358
try
356359
{
357360
await EnsureTopicCreatedAsync(envelope.CancellationToken).AnyContext();
361+
// PERF: ToArray() allocates; PublishMessageAsync takes byte[] until Foundatio base supports ReadOnlyMemory<byte>
358362
await PublishMessageAsync(envelope.Exchange, envelope.RoutingKey, envelope.Body.ToArray(), properties, envelope.CancellationToken).AnyContext();
359363
await subscriberChannel.BasicAckAsync(envelope.DeliveryTag, false).AnyContext();
360364

@@ -402,25 +406,47 @@ private static long GetRetryCountFromHeader(BasicDeliverEventArgs envelope)
402406

403407
private async Task PublishMessageAsync(string exchange, string routingKey, byte[] body, BasicProperties properties, CancellationToken cancellationToken)
404408
{
405-
using (await _lock.LockAsync(cancellationToken).AnyContext())
409+
await _resiliencePolicy.ExecuteAsync(async _ =>
406410
{
407-
if (_publisherChannel is not { } channel)
408-
throw new MessageBusException("Cannot publish: publisher channel was closed.");
411+
if (!_publisherReady.IsSet)
412+
{
413+
if (_options.PublishRecoveryTimeout <= TimeSpan.Zero)
414+
throw new MessageBusException("Cannot publish: publisher channel is closed or unavailable.");
415+
416+
_logger.LogDebug("Publisher waiting for connection recovery...");
417+
try
418+
{
419+
await _publisherReady.WaitAsync(cancellationToken)
420+
.WaitAsync(_options.PublishRecoveryTimeout, cancellationToken).AnyContext();
421+
}
422+
catch (TimeoutException)
423+
{
424+
throw new MessageBusException(
425+
$"Publish failed: connection recovery did not complete within {_options.PublishRecoveryTimeout.TotalMilliseconds:F0}ms timeout.");
426+
}
427+
}
409428

410-
await _resiliencePolicy.ExecuteAsync(async _ =>
429+
// PERF: Single lock serializes all publishes; with publisher confirms this limits throughput to 1 RTT.
430+
// Consider channel pooling or batch publishing for high-throughput scenarios.
431+
using (await _lock.LockAsync(cancellationToken).AnyContext())
411432
{
412-
// Check blocked state inside resilience policy - re-evaluated on each retry
413-
// MessageBusException is excluded from retries in the base class policy
433+
if (_publisherChannel is not { IsOpen: true } channel)
434+
throw new MessageBusException("Cannot publish: publisher channel is closed or unavailable.");
435+
436+
// Fail fast on broker resource alarms -- retrying would add pressure to a constrained broker.
437+
// Unlike connection drops (which use the recovery gate to wait), blocked state has no recovery signal timing.
414438
if (_isPublisherBlocked)
415-
throw new MessageBusException($"Cannot publish: publisher connection is blocked by broker ({_publisherBlockedReason ?? "resource alarm"})");
439+
throw new MessageBusException(
440+
$"Cannot publish: publisher connection is blocked by broker ({_publisherBlockedReason ?? "resource alarm"})");
416441

417442
await channel.BasicPublishAsync(exchange, routingKey, mandatory: false, properties, body, cancellationToken: cancellationToken);
418-
}, cancellationToken).AnyContext();
419-
}
443+
}
444+
}, cancellationToken).AnyContext();
420445
}
421446

422447
protected virtual IMessage ConvertToMessage(BasicDeliverEventArgs envelope)
423448
{
449+
// PERF: ToArray() allocates a copy; Message ctor requires byte[] until Foundatio supports ReadOnlyMemory<byte>
424450
var message = new Message(envelope.Body.ToArray(), DeserializeMessageBody)
425451
{
426452
Type = envelope.BasicProperties.Type,
@@ -462,17 +488,7 @@ protected override async Task EnsureTopicCreatedAsync(CancellationToken cancella
462488
_isPublisherBlocked = false;
463489
_publisherBlockedReason = null;
464490

465-
if (_options.PublisherConfirmsEnabled)
466-
{
467-
var channelOptions = new CreateChannelOptions(
468-
publisherConfirmationsEnabled: true,
469-
publisherConfirmationTrackingEnabled: true);
470-
_publisherChannel = await _publisherConnection.CreateChannelAsync(channelOptions, cancellationToken).AnyContext();
471-
}
472-
else
473-
{
474-
_publisherChannel = await _publisherConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext();
475-
}
491+
_publisherChannel = await CreatePublisherChannelAsync(cancellationToken).AnyContext();
476492

477493
// We first attempt to create "x-delayed-type". For this the rabbitmq_delayed_message_exchange plugin should be installed.
478494
// However, if the plugin is not installed this will throw an exception. In that case
@@ -500,17 +516,7 @@ protected override async Task EnsureTopicCreatedAsync(CancellationToken cancella
500516
_isPublisherBlocked = false;
501517
_publisherBlockedReason = null;
502518

503-
if (_options.PublisherConfirmsEnabled)
504-
{
505-
var channelOptions = new CreateChannelOptions(
506-
publisherConfirmationsEnabled: true,
507-
publisherConfirmationTrackingEnabled: true);
508-
_publisherChannel = await _publisherConnection.CreateChannelAsync(channelOptions, cancellationToken).AnyContext();
509-
}
510-
else
511-
{
512-
_publisherChannel = await _publisherConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext();
513-
}
519+
_publisherChannel = await CreatePublisherChannelAsync(cancellationToken).AnyContext();
514520
await CreateRegularExchangeAsync(_publisherChannel).AnyContext();
515521
}
516522

@@ -534,12 +540,23 @@ private Task OnPublisherConnectionOnConnectionBlockedAsync(object sender, Connec
534540

535541
private Task OnPublisherConnectionOnConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs e)
536542
{
537-
_logger.LogError(e.Exception, "Publisher connection recovery error: {Message}", e.Exception.Message);
543+
_logger.LogError(e.Exception, "Publisher connection recovery attempt failed, retrying: {Message}", e.Exception.Message);
538544
return Task.CompletedTask;
539545
}
540546

541547
private Task OnPublisherConnectionOnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
542548
{
549+
if (e.Initiator != ShutdownInitiator.Application)
550+
{
551+
_publisherReady.Reset();
552+
if (_options.PublishRecoveryTimeout > TimeSpan.Zero)
553+
_logger.LogWarning("Publisher connection lost (Reply Code: {ReplyCode}, Reason: {ReplyText}). Publishes will wait up to {Timeout:g} for recovery.",
554+
e.ReplyCode, e.ReplyText, _options.PublishRecoveryTimeout);
555+
else
556+
_logger.LogWarning("Publisher connection lost (Reply Code: {ReplyCode}, Reason: {ReplyText}). Publishes will fail immediately (recovery timeout disabled).",
557+
e.ReplyCode, e.ReplyText);
558+
}
559+
543560
_logger.LogInformation(e.Exception, "Publisher shutdown. Reply Code: {ReplyCode} Reason: {ReplyText} Initiator: {Initiator}", e.ReplyCode, e.ReplyText, e.Initiator);
544561
return Task.CompletedTask;
545562
}
@@ -560,29 +577,15 @@ private Task OnPublisherConnectionOnRecoveringConsumerAsync(object sender, Recov
560577

561578
private Task OnPublisherConnectionOnRecoverySucceededAsync(object sender, AsyncEventArgs e)
562579
{
563-
// Reset blocked state on recovery - the new connection starts unblocked
564580
_isPublisherBlocked = false;
565581
_publisherBlockedReason = null;
582+
_publisherReady.Set();
566583
_logger.LogInformation("Publisher connection recovery succeeded");
567584
return Task.CompletedTask;
568585
}
569586

570-
/// <summary>
571-
/// Publish the message
572-
/// </summary>
573-
/// <param name="messageType"></param>
574-
/// <param name="message"></param>
575-
/// <param name="options">Message options</param>
576-
/// <param name="cancellationToken"></param>
577-
/// <remarks>RabbitMQ has an upper limit of 2GB for messages.BasicPublish blocking AMQP operations.
578-
/// The rule of thumb is: avoid sharing channels across threads.
579-
/// Publishers in your application that publish from separate threads should use their own channels.
580-
/// The same is a good idea for consumers.</remarks>
581587
protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken)
582588
{
583-
if (_isPublisherBlocked)
584-
throw new MessageBusException($"Cannot publish: publisher connection is blocked by broker ({_publisherBlockedReason ?? "resource alarm"})");
585-
586589
byte[] data = SerializeMessageBody(messageType, message);
587590

588591
// if the RabbitMQ plugin is not available, then use the base class delay mechanism
@@ -619,11 +622,13 @@ protected override async Task PublishImplAsync(string messageType, object messag
619622
// RabbitMQ only supports delayed messages with a third party plugin called "rabbitmq_delayed_message_exchange"
620623
if (_delayedExchangePluginEnabled is true && options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero)
621624
{
622-
// It's necessary to typecast long to int because RabbitMQ on the consumer side is reading the
623-
// data back as signed (using BinaryReader#ReadInt64). You will see the value to be negative
624-
// and the data will be delivered immediately.
625+
// RabbitMQ's x-delay header must be a 32-bit signed int; the broker reads it as Int32
626+
// and negative values cause immediate delivery.
625627
basicProperties.Headers ??= new Dictionary<string, object?>();
626-
basicProperties.Headers["x-delay"] = Convert.ToInt32(options.DeliveryDelay.Value.TotalMilliseconds);
628+
double delayMs = options.DeliveryDelay.Value.TotalMilliseconds;
629+
if (delayMs > Int32.MaxValue)
630+
throw new ArgumentOutOfRangeException(nameof(options), $"DeliveryDelay ({options.DeliveryDelay.Value}) exceeds the maximum supported by RabbitMQ delayed exchange plugin ({Int32.MaxValue}ms).");
631+
basicProperties.Headers["x-delay"] = (int)delayMs;
627632
_logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds);
628633
}
629634
else
@@ -635,16 +640,27 @@ protected override async Task PublishImplAsync(string messageType, object messag
635640
_logger.LogDebug("Done publishing type {MessageType} {MessageId}", messageType, basicProperties.MessageId);
636641
}
637642

638-
/// <summary>
639-
/// Connect to a broker - RabbitMQ
640-
/// </summary>
641-
/// <returns></returns>
642643
private Task<IConnection> CreateConnectionAsync()
643644
{
644-
// Use multiple endpoints for failover support
645645
return _factory.CreateConnectionAsync(_endpoints);
646646
}
647647

648+
private Task<IChannel> CreatePublisherChannelAsync(CancellationToken cancellationToken)
649+
{
650+
if (_publisherConnection is null)
651+
throw new MessageBusException("Publisher connection must be initialized before creating a channel.");
652+
653+
if (_options.PublisherConfirmsEnabled)
654+
{
655+
var channelOptions = new CreateChannelOptions(
656+
publisherConfirmationsEnabled: true,
657+
publisherConfirmationTrackingEnabled: true);
658+
return _publisherConnection.CreateChannelAsync(channelOptions, cancellationToken);
659+
}
660+
661+
return _publisherConnection.CreateChannelAsync(cancellationToken: cancellationToken);
662+
}
663+
648664
private void DetectServerVersion(IConnection connection)
649665
{
650666
if (_serverVersion is not null)
@@ -805,6 +821,15 @@ private async Task CloseSubscriberConnectionAsync(CancellationToken cancellation
805821
}
806822
}
807823

824+
private static string SanitizeUri(Uri uri)
825+
{
826+
if (String.IsNullOrEmpty(uri.UserInfo))
827+
return uri.ToString();
828+
829+
string portSuffix = uri.IsDefaultPort ? "" : $":{uri.Port}";
830+
return $"{uri.Scheme}://***@{uri.Host}{portSuffix}{uri.AbsolutePath}";
831+
}
832+
808833
/// <summary>
809834
/// Parses a host string in format "hostname" or "hostname:port" into an AmqpTcpEndpoint.
810835
/// </summary>
@@ -814,14 +839,37 @@ private async Task CloseSubscriberConnectionAsync(CancellationToken cancellation
814839
return null;
815840

816841
string trimmed = host.Trim();
842+
843+
// Handle IPv6 bracket notation: [::1] or [::1]:5672
844+
if (trimmed.StartsWith('['))
845+
{
846+
int closeBracket = trimmed.IndexOf(']');
847+
if (closeBracket < 0)
848+
return new AmqpTcpEndpoint(trimmed, defaultPort);
849+
850+
string ipv6Host = trimmed[1..closeBracket];
851+
if (closeBracket + 1 < trimmed.Length && trimmed[closeBracket + 1] == ':')
852+
{
853+
return Int32.TryParse(trimmed[(closeBracket + 2)..], out int port)
854+
? new AmqpTcpEndpoint(ipv6Host, port)
855+
: new AmqpTcpEndpoint(ipv6Host, defaultPort);
856+
}
857+
858+
return new AmqpTcpEndpoint(ipv6Host, defaultPort);
859+
}
860+
817861
int colonIndex = trimmed.LastIndexOf(':');
818862
if (colonIndex < 0)
819863
return new AmqpTcpEndpoint(trimmed, defaultPort);
820864

865+
// Multiple colons without brackets indicates an unbracketed IPv6 address — treat as bare hostname
866+
if (trimmed.IndexOf(':') != colonIndex)
867+
return new AmqpTcpEndpoint(trimmed, defaultPort);
868+
821869
string hostname = trimmed[..colonIndex];
822-
return Int32.TryParse(trimmed[(colonIndex + 1)..], out int port)
823-
? new AmqpTcpEndpoint(hostname, port)
824-
: new AmqpTcpEndpoint(trimmed, defaultPort);
870+
return Int32.TryParse(trimmed[(colonIndex + 1)..], out int parsedPort)
871+
? new AmqpTcpEndpoint(hostname, parsedPort)
872+
: new AmqpTcpEndpoint(hostname, defaultPort);
825873
}
826874

827875
private void RegisterPublisherConnectionEventHandlers()
@@ -897,4 +945,44 @@ private void UnregisterConsumerEventHandlers()
897945
_consumer.ReceivedAsync -= OnMessageAsync;
898946
_consumer.ShutdownAsync -= OnConsumerShutdownAsync;
899947
}
948+
949+
/// <summary>
950+
/// Simulates an unexpected publisher connection loss for testing the recovery gate.
951+
/// Closes the gate so that subsequent publishes will wait for recovery.
952+
/// </summary>
953+
internal void SimulatePublisherConnectionLost()
954+
{
955+
_publisherReady.Reset();
956+
}
957+
958+
/// <summary>
959+
/// Simulates a successful publisher connection recovery for testing.
960+
/// Opens the gate so that waiting publishes resume.
961+
/// </summary>
962+
internal void SimulatePublisherRecoverySucceeded()
963+
{
964+
_publisherReady.Set();
965+
}
966+
967+
/// <summary>
968+
/// Simulates a publisher connection shutdown event for testing.
969+
/// Triggers the full shutdown handler (gate closure) and nulls the publisher channel
970+
/// to represent a real unexpected disconnect.
971+
/// </summary>
972+
internal async Task SimulatePublisherConnectionShutdownAsync()
973+
{
974+
await OnPublisherConnectionOnConnectionShutdownAsync(this,
975+
new ShutdownEventArgs(ShutdownInitiator.Library, 541, "Simulated connection reset")).AnyContext();
976+
_publisherChannel = null;
977+
}
978+
979+
/// <summary>
980+
/// Simulates a connection recovery error event for testing.
981+
/// Verifies the gate remains closed (recovery continues retrying).
982+
/// </summary>
983+
internal Task SimulatePublisherConnectionRecoveryErrorAsync()
984+
{
985+
return OnPublisherConnectionOnConnectionRecoveryErrorAsync(this,
986+
new ConnectionRecoveryErrorEventArgs(new Exception("Simulated recovery failure")));
987+
}
900988
}

0 commit comments

Comments
 (0)