Skip to content

Commit e9b9907

Browse files
committed
fix: volatile channel fields, disposal race, and ParseHostEndpoint fallback
- Add volatile to _publisherChannel/_subscriberChannel for safe double-checked locking on ARM64 (outer null check needs memory barrier) - Move _publisherReady.Set() after connection close in CleanupAsync to prevent unblocked publishers from racing into a disposing channel - Fix ParseHostEndpoint: invalid port fallback now uses parsed hostname instead of the full trimmed string (e.g. "host:abc" -> "host", not "host:abc")
1 parent f98fc91 commit e9b9907

1 file changed

Lines changed: 5 additions & 4 deletions

File tree

src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ public class RabbitMQMessageBus : MessageBusBase<RabbitMQMessageBusOptions>
2626
private readonly List<AmqpTcpEndpoint> _endpoints;
2727
private IConnection? _publisherConnection;
2828
private IConnection? _subscriberConnection;
29-
private IChannel? _publisherChannel;
30-
private IChannel? _subscriberChannel;
29+
private volatile IChannel? _publisherChannel;
30+
private volatile IChannel? _subscriberChannel;
3131
private AsyncEventingBasicConsumer? _consumer;
3232
private bool? _delayedExchangePluginEnabled;
3333
private Version? _serverVersion;
@@ -94,10 +94,11 @@ protected override async Task RemoveTopicSubscriptionAsync()
9494
protected override async Task CleanupAsync()
9595
{
9696
_factory.AutomaticRecoveryEnabled = false;
97-
_publisherReady.Set();
9897

9998
await ClosePublisherConnectionAsync().AnyContext();
10099
await CloseSubscriberConnectionAsync().AnyContext();
100+
101+
_publisherReady.Set();
101102
}
102103

103104
protected override async Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken)
@@ -865,7 +866,7 @@ private static string SanitizeUri(Uri uri)
865866
string hostname = trimmed[..colonIndex];
866867
return Int32.TryParse(trimmed[(colonIndex + 1)..], out int parsedPort)
867868
? new AmqpTcpEndpoint(hostname, parsedPort)
868-
: new AmqpTcpEndpoint(trimmed, defaultPort);
869+
: new AmqpTcpEndpoint(hostname, defaultPort);
869870
}
870871

871872
private void RegisterPublisherConnectionEventHandlers()

0 commit comments

Comments
 (0)