Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 76 additions & 14 deletions src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class RabbitMQMessageBus : MessageBusBase<RabbitMQMessageBusOptions>
private static readonly Version _delayedExchangePluginIncompatibleVersion = new(4, 3);

private readonly AsyncLock _lock = new();
private readonly AsyncManualResetEvent _publisherReady = new(true);
private readonly ConnectionFactory _factory;
private readonly List<AmqpTcpEndpoint> _endpoints;
private IConnection? _publisherConnection;
Expand Down Expand Up @@ -402,21 +403,38 @@ private static long GetRetryCountFromHeader(BasicDeliverEventArgs envelope)

private async Task PublishMessageAsync(string exchange, string routingKey, byte[] body, BasicProperties properties, CancellationToken cancellationToken)
{
using (await _lock.LockAsync(cancellationToken).AnyContext())
await _resiliencePolicy.ExecuteAsync(async _ =>
{
if (_publisherChannel is not { } channel)
throw new MessageBusException("Cannot publish: publisher channel was closed.");
if (!_publisherReady.IsSet)
{
if (_options.PublishRecoveryTimeout <= TimeSpan.Zero)
throw new MessageBusException("Cannot publish: publisher channel is closed or unavailable.");

await _resiliencePolicy.ExecuteAsync(async _ =>
_logger.LogDebug("Publisher waiting for connection recovery...");
try
{
await _publisherReady.WaitAsync(cancellationToken)
.WaitAsync(_options.PublishRecoveryTimeout, cancellationToken).AnyContext();
}
catch (TimeoutException)
{
throw new MessageBusException(
Comment thread
niemyjski marked this conversation as resolved.
$"Publish failed: connection recovery did not complete within {_options.PublishRecoveryTimeout.TotalMilliseconds:F0}ms timeout.");
}
}

using (await _lock.LockAsync(cancellationToken).AnyContext())
{
// Check blocked state inside resilience policy - re-evaluated on each retry
// MessageBusException is excluded from retries in the base class policy
if (_publisherChannel is not { IsOpen: true } channel)
throw new MessageBusException("Cannot publish: publisher channel is closed or unavailable.");

if (_isPublisherBlocked)
throw new MessageBusException($"Cannot publish: publisher connection is blocked by broker ({_publisherBlockedReason ?? "resource alarm"})");
throw new MessageBusException(
$"Cannot publish: publisher connection is blocked by broker ({_publisherBlockedReason ?? "resource alarm"})");

await channel.BasicPublishAsync(exchange, routingKey, mandatory: false, properties, body, cancellationToken: cancellationToken);
}, cancellationToken).AnyContext();
}
}
}, cancellationToken).AnyContext();
}

protected virtual IMessage ConvertToMessage(BasicDeliverEventArgs envelope)
Expand Down Expand Up @@ -534,12 +552,19 @@ private Task OnPublisherConnectionOnConnectionBlockedAsync(object sender, Connec

private Task OnPublisherConnectionOnConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs e)
{
_logger.LogError(e.Exception, "Publisher connection recovery error: {Message}", e.Exception.Message);
_logger.LogError(e.Exception, "Publisher connection recovery attempt failed, retrying: {Message}", e.Exception.Message);
return Task.CompletedTask;
}

private Task OnPublisherConnectionOnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
{
if (e.Initiator != ShutdownInitiator.Application)
{
_publisherReady.Reset();
_logger.LogWarning("Publisher connection lost (Reply Code: {ReplyCode}, Reason: {ReplyText}). Publishes will wait up to {Timeout:g} for recovery.",
e.ReplyCode, e.ReplyText, _options.PublishRecoveryTimeout);
Comment thread
niemyjski marked this conversation as resolved.
Outdated
}

_logger.LogInformation(e.Exception, "Publisher shutdown. Reply Code: {ReplyCode} Reason: {ReplyText} Initiator: {Initiator}", e.ReplyCode, e.ReplyText, e.Initiator);
return Task.CompletedTask;
}
Expand All @@ -560,9 +585,9 @@ private Task OnPublisherConnectionOnRecoveringConsumerAsync(object sender, Recov

private Task OnPublisherConnectionOnRecoverySucceededAsync(object sender, AsyncEventArgs e)
{
// Reset blocked state on recovery - the new connection starts unblocked
_isPublisherBlocked = false;
_publisherBlockedReason = null;
_publisherReady.Set();
_logger.LogInformation("Publisher connection recovery succeeded");
return Task.CompletedTask;
}
Expand All @@ -580,9 +605,6 @@ private Task OnPublisherConnectionOnRecoverySucceededAsync(object sender, AsyncE
/// The same is a good idea for consumers.</remarks>
protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken)
{
if (_isPublisherBlocked)
throw new MessageBusException($"Cannot publish: publisher connection is blocked by broker ({_publisherBlockedReason ?? "resource alarm"})");

byte[] data = SerializeMessageBody(messageType, message);

// if the RabbitMQ plugin is not available, then use the base class delay mechanism
Expand Down Expand Up @@ -897,4 +919,44 @@ private void UnregisterConsumerEventHandlers()
_consumer.ReceivedAsync -= OnMessageAsync;
_consumer.ShutdownAsync -= OnConsumerShutdownAsync;
}

/// <summary>
/// Simulates an unexpected publisher connection loss for testing the recovery gate.
/// Closes the gate so that subsequent publishes will wait for recovery.
/// </summary>
internal void SimulatePublisherConnectionLost()
{
_publisherReady.Reset();
}

/// <summary>
/// Simulates a successful publisher connection recovery for testing.
/// Opens the gate so that waiting publishes resume.
/// </summary>
internal void SimulatePublisherRecoverySucceeded()
{
_publisherReady.Set();
}

/// <summary>
/// Simulates a publisher connection shutdown event for testing.
/// Triggers the full shutdown handler (gate closure) and nulls the publisher channel
/// to represent a real unexpected disconnect.
/// </summary>
internal async Task SimulatePublisherConnectionShutdownAsync()
{
await OnPublisherConnectionOnConnectionShutdownAsync(this,
Comment thread
niemyjski marked this conversation as resolved.
new ShutdownEventArgs(ShutdownInitiator.Library, 541, "Simulated connection reset"));
_publisherChannel = null;
}

/// <summary>
/// Simulates a connection recovery error event for testing.
/// Verifies the gate remains closed (recovery continues retrying).
/// </summary>
internal Task SimulatePublisherConnectionRecoveryErrorAsync()
{
return OnPublisherConnectionOnConnectionRecoveryErrorAsync(this,
new ConnectionRecoveryErrorEventArgs(new Exception("Simulated recovery failure")));
}
}
25 changes: 25 additions & 0 deletions src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ public class RabbitMQMessageBusOptions : SharedMessageBusOptions
/// See: https://www.rabbitmq.com/docs/confirms#publisher-confirms
/// </summary>
public bool PublisherConfirmsEnabled { get; set; }

/// <summary>
/// Maximum time each publish attempt will wait for connection recovery before failing.
/// During a transient connection drop, publishes suspend (rather than failing immediately)
/// and resume automatically when the connection recovers.
/// Note: The resilience policy may retry failed attempts (default: 3 attempts with exponential backoff),
/// so total wall-clock time can exceed this value.
/// Set to TimeSpan.Zero to disable (fail immediately on connection drop, like pre-fix behavior).
/// Default: 10 seconds (covers one full NetworkRecoveryInterval cycle with margin).
/// </summary>
public TimeSpan PublishRecoveryTimeout { get; set; } = TimeSpan.FromSeconds(10);
Comment thread
niemyjski marked this conversation as resolved.
}

public class RabbitMQMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder<RabbitMQMessageBusOptions, RabbitMQMessageBusOptionsBuilder>
Expand Down Expand Up @@ -202,6 +213,20 @@ public RabbitMQMessageBusOptionsBuilder PublisherConfirmsEnabled(bool enabled =
return this;
}

/// <summary>
/// Sets the maximum time each publish attempt will wait for connection recovery before failing.
/// During a transient connection drop, publishes suspend and resume automatically on recovery.
/// Set to TimeSpan.Zero to disable waiting (fail immediately on connection drop).
/// </summary>
/// <param name="timeout">Maximum per-attempt recovery wait time. Default: 10 seconds.</param>
/// <returns>The builder instance for method chaining.</returns>
public RabbitMQMessageBusOptionsBuilder PublishRecoveryTimeout(TimeSpan timeout)
{
ArgumentOutOfRangeException.ThrowIfLessThan(timeout, TimeSpan.Zero);
Target.PublishRecoveryTimeout = timeout;
return this;
}

/// <summary>
/// Configures the message bus for RabbitMQ quorum queues using recommended settings.
/// Disables auto-delete and exclusive mode for subscription queues,
Expand Down
3 changes: 3 additions & 0 deletions src/Foundatio.RabbitMQ/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Foundatio.RabbitMQ.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100a9357232b9bcad78fd310297fdb41bf42816ee2ca9ccdace999889de2badb6f06df2de1d9f2c8cb17b21f5311f11d6bb328d55e0dd9fe8adc5e2dc4610028c1bdacb3355d2e239b81d0bb0ac83e615fc641f8a3ec49e4fad8e305994953d448ef7b38e8c256601e54af19c035b562e3e5e5461c2a93b8dd11936e451b05034a2")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Messaging;
using Foundatio.Tests.Messaging;
using Xunit;

namespace Foundatio.RabbitMQ.Tests.Messaging;

public class RabbitMqPublishResilienceTests(ITestOutputHelper output) : RabbitMqMessageBusTestBase("amqp://localhost:5672", output)
{
[Fact]
public async Task PublishAsync_WithRecoveryTimeoutDisabled_FailsImmediatelyOnConnectionDrop()
{
Comment thread
niemyjski marked this conversation as resolved.
await using var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.Zero));

await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);

await messageBus.SimulatePublisherConnectionShutdownAsync();

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var ex = await Assert.ThrowsAsync<MessageBusException>(() =>
messageBus.PublishAsync(new SimpleMessageA { Data = "should fail" },
cancellationToken: cts.Token));

Assert.Contains("publisher channel is closed", ex.Message);
}

[Fact]
public async Task PublishAsync_DuringRecovery_WaitsAndSucceeds()
{
await using var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.FromSeconds(15)));

await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);

messageBus.SimulatePublisherConnectionLost();

using var publishCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "during recovery" },
cancellationToken: publishCts.Token);

await Task.Delay(100, TestCancellationToken);
Assert.False(publishTask.IsCompleted);

messageBus.SimulatePublisherRecoverySucceeded();

await publishTask;
}

[Fact]
public async Task PublishAsync_RecoveryTimeout_ThrowsMessageBusException()
{
await using var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.FromMilliseconds(500)));

await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);

messageBus.SimulatePublisherConnectionLost();

var ex = await Assert.ThrowsAsync<MessageBusException>(() =>
messageBus.PublishAsync(new SimpleMessageA { Data = "should timeout" },
cancellationToken: TestCancellationToken));

Assert.Contains("connection recovery did not complete within", ex.Message);
}

[Fact]
public async Task PublishAsync_CancellationDuringRecovery_RespectsCancellation()
{
await using var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.FromSeconds(30)));

await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);

messageBus.SimulatePublisherConnectionLost();

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));

await Assert.ThrowsAnyAsync<OperationCanceledException>(() =>
messageBus.PublishAsync(new SimpleMessageA { Data = "should cancel" }, cancellationToken: cts.Token));
}

[Fact]
public async Task PublishAsync_WhenConnectionHealthy_SucceedsImmediately()
{
await using var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.FromSeconds(10)));

var exception = await Record.ExceptionAsync(() =>
messageBus.PublishAsync(new SimpleMessageA { Data = "should succeed immediately" },
cancellationToken: TestCancellationToken));

Assert.Null(exception);
}

[Fact]
public async Task PublishAsync_DuringDisposal_FailsFastWithOperationCanceledException()
{
var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.FromSeconds(30)));

await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);

messageBus.SimulatePublisherConnectionLost();

using var publishCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "should fail on disposal" },
cancellationToken: publishCts.Token);

await Task.Delay(50, TestCancellationToken);
Assert.False(publishTask.IsCompleted);

await messageBus.DisposeAsync();

await Assert.ThrowsAnyAsync<OperationCanceledException>(() => publishTask);
Comment thread
niemyjski marked this conversation as resolved.
Outdated
}

[Fact]
public async Task PublishAsync_RecoveryErrorDoesNotOpenGate_WaitsUntilTimeout()
{
await using var messageBus = new RabbitMQMessageBus(o => o
.ConnectionString(ConnectionString)
.LoggerFactory(Log)
.PublishRecoveryTimeout(TimeSpan.FromMilliseconds(500)));

await messageBus.PublishAsync(new SimpleMessageA { Data = "warmup" }, cancellationToken: TestCancellationToken);

messageBus.SimulatePublisherConnectionLost();

using var publishCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var publishTask = messageBus.PublishAsync(new SimpleMessageA { Data = "should stay blocked" },
cancellationToken: publishCts.Token);

await Task.Delay(50, TestCancellationToken);
Assert.False(publishTask.IsCompleted);

await messageBus.SimulatePublisherConnectionRecoveryErrorAsync();

await Task.Delay(50, TestCancellationToken);
Assert.False(publishTask.IsCompleted);

var ex = await Assert.ThrowsAsync<MessageBusException>(() => publishTask);
Assert.Contains("connection recovery did not complete within", ex.Message);
}
}