Skip to content

Commit e90c803

Browse files
committed
fix: restore recovery gate reset on disposal, remove stale XML docs
- Restore _publisherReady.Set() in CleanupAsync to unblock publishers waiting on the recovery gate during disposal (removed in 00fca54). Without this, publishers with long PublishRecoveryTimeout values rely solely on DisposedCancellationToken propagation which is fragile. - Remove stale <remarks> on PublishImplAsync advising channel-per-thread patterns that don't apply (single channel + AsyncLock architecture). - Document blocked-state fail-fast design decision inline.
1 parent 231f771 commit e90c803

1 file changed

Lines changed: 3 additions & 4 deletions

File tree

src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ protected override async Task RemoveTopicSubscriptionAsync()
9494
protected override async Task CleanupAsync()
9595
{
9696
_factory.AutomaticRecoveryEnabled = false;
97+
_publisherReady.Set();
9798

9899
await ClosePublisherConnectionAsync().AnyContext();
99100
await CloseSubscriberConnectionAsync().AnyContext();
@@ -428,6 +429,8 @@ await _publisherReady.WaitAsync(cancellationToken)
428429
if (_publisherChannel is not { IsOpen: true } channel)
429430
throw new MessageBusException("Cannot publish: publisher channel is closed or unavailable.");
430431

432+
// Fail fast on broker resource alarms -- retrying would add pressure to a constrained broker.
433+
// Unlike connection drops (which use the recovery gate to wait), blocked state has no recovery signal timing.
431434
if (_isPublisherBlocked)
432435
throw new MessageBusException(
433436
$"Cannot publish: publisher connection is blocked by broker ({_publisherBlockedReason ?? "resource alarm"})");
@@ -603,10 +606,6 @@ private Task OnPublisherConnectionOnRecoverySucceededAsync(object sender, AsyncE
603606
/// <param name="message"></param>
604607
/// <param name="options">Message options</param>
605608
/// <param name="cancellationToken"></param>
606-
/// <remarks>RabbitMQ has an upper limit of 2GB for messages.BasicPublish blocking AMQP operations.
607-
/// The rule of thumb is: avoid sharing channels across threads.
608-
/// Publishers in your application that publish from separate threads should use their own channels.
609-
/// The same is a good idea for consumers.</remarks>
610609
protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken)
611610
{
612611
byte[] data = SerializeMessageBody(messageType, message);

0 commit comments

Comments
 (0)