From ca9faae6250b5f41463a387c6d4cb00f6f34454b Mon Sep 17 00:00:00 2001 From: Dmitry Semenov Date: Wed, 18 Mar 2026 18:46:49 +0100 Subject: [PATCH] Declare RabbitMQ exchange before publishing messages RabbitMQ requres an exchange to exist, otherwise it returns an error and the whole channel becomes unusable. The code uses ConcurrentDictionary to remember which exchanges were declared. It is faster than calling the function before each publish. --- .../RabbitMQPubSubExtensions.cs | 1 + .../Subscriptions.RabbitMQ/RabbitMQPubSub.cs | 9 +++- .../Subscriptions.RabbitMQ/RabbitMQTopic.cs | 24 +++------- .../RabbitMQTopologyHelper.cs | 45 +++++++++++++++++++ 4 files changed, 58 insertions(+), 21 deletions(-) create mode 100644 src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopologyHelper.cs diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs index e23757844ec..42f0817a171 100644 --- a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs @@ -91,6 +91,7 @@ public static IServiceCollection AddRabbitMQSubscriptionPublisher( services.TryAddSingleton(options ?? new SubscriptionOptions()); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(sp => sp.GetRequiredService()); return services; diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs index 103dacf741b..27812ffac52 100644 --- a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs @@ -10,6 +10,7 @@ internal sealed class RabbitMQPubSub : DefaultPubSub private readonly IRabbitMQConnection _connection; private readonly IMessageSerializer _serializer; private readonly RabbitMQSubscriptionOptions _rabbitMqSubscriptionOptions; + private readonly RabbitMQTopologyHelper _topologyHelper; private readonly string _completed; private readonly int _topicBufferCapacity; private readonly TopicBufferFullMode _topicBufferFullMode; @@ -20,12 +21,14 @@ public RabbitMQPubSub( IMessageSerializer serializer, SubscriptionOptions options, RabbitMQSubscriptionOptions rabbitMqSubscriptionOptions, - ISubscriptionDiagnosticEvents diagnosticEvents) + ISubscriptionDiagnosticEvents diagnosticEvents, + RabbitMQTopologyHelper topologyHelper) : base(options, diagnosticEvents) { _connection = connection ?? throw new ArgumentNullException(nameof(connection)); _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); _rabbitMqSubscriptionOptions = rabbitMqSubscriptionOptions ?? throw new ArgumentNullException(nameof(rabbitMqSubscriptionOptions)); + _topologyHelper = topologyHelper; _topicBufferCapacity = options.TopicBufferCapacity; _topicBufferFullMode = options.TopicBufferFullMode; _completed = serializer.CompleteMessage; @@ -55,12 +58,14 @@ protected override DefaultTopic OnCreateTopic( bufferCapacity ?? _topicBufferCapacity, bufferFullMode ?? _topicBufferFullMode, _rabbitMqSubscriptionOptions, - DiagnosticEvents); + DiagnosticEvents, + _topologyHelper); private async Task PublishAsync(string formattedTopic, string message, CancellationToken cancellationToken) { var body = Encoding.UTF8.GetBytes(message); var channel = await _connection.GetChannelAsync(cancellationToken).ConfigureAwait(false); + await _topologyHelper.ConfigurePublishingAsync(channel, formattedTopic, cancellationToken).ConfigureAwait(false); await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs index 37a01cfc715..5a027d7440c 100644 --- a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs @@ -12,6 +12,7 @@ internal sealed class RabbitMQTopic : DefaultTopic private readonly IRabbitMQConnection _connection; private readonly IMessageSerializer _serializer; private readonly RabbitMQSubscriptionOptions _rabbitMqSubscriptionOptions; + private readonly RabbitMQTopologyHelper _topologyHelper; public RabbitMQTopic( string name, @@ -20,12 +21,14 @@ public RabbitMQTopic( int capacity, TopicBufferFullMode fullMode, RabbitMQSubscriptionOptions rabbitMqSubscriptionOptions, - ISubscriptionDiagnosticEvents diagnosticEvents) + ISubscriptionDiagnosticEvents diagnosticEvents, + RabbitMQTopologyHelper topologyHelper) : base(name, capacity, fullMode, diagnosticEvents) { _connection = connection ?? throw new ArgumentNullException(nameof(connection)); _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); _rabbitMqSubscriptionOptions = rabbitMqSubscriptionOptions ?? throw new ArgumentNullException(nameof(rabbitMqSubscriptionOptions)); + _topologyHelper = topologyHelper; } protected override async ValueTask OnConnectAsync(CancellationToken cancellationToken) @@ -39,24 +42,7 @@ protected override async ValueTask OnConnectAsync(Cancellation var queueName = string.IsNullOrEmpty(_rabbitMqSubscriptionOptions.QueuePrefix) ? string.Empty // use server-generated name : _rabbitMqSubscriptionOptions.QueuePrefix + Guid.NewGuid(); - - await channel.ExchangeDeclareAsync( - exchange: Name, - type: ExchangeType.Fanout, - durable: true, - autoDelete: false, - cancellationToken: cancellationToken); - await channel.QueueDeclareAsync( - queue: queueName, - durable: true, - exclusive: true, - autoDelete: true, - cancellationToken: cancellationToken); - await channel.QueueBindAsync( - queue: queueName, - exchange: Name, - routingKey: string.Empty, - cancellationToken: cancellationToken); + await _topologyHelper.ConfigureConsumingAsync(channel, Name, queueName, cancellationToken).ConfigureAwait(false); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (_, args) => diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopologyHelper.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopologyHelper.cs new file mode 100644 index 00000000000..c22dbfd304a --- /dev/null +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopologyHelper.cs @@ -0,0 +1,45 @@ +using System.Collections.Concurrent; +using RabbitMQ.Client; + +namespace HotChocolate.Subscriptions.RabbitMQ; + +internal class RabbitMQTopologyHelper +{ + private readonly ConcurrentDictionary _declaredExchanges = new(); + + public async ValueTask ConfigurePublishingAsync(IChannel channel, string formattedTopic, CancellationToken cancellationToken) + { + // Create an exchange if it wasn't created. + // This extra check isn't required, but it's faster to do it in memory than go to RabbitMQ every time before publishing a message + if (!_declaredExchanges.ContainsKey(formattedTopic)) + { + await channel.ExchangeDeclareAsync( + exchange: formattedTopic, + type: ExchangeType.Fanout, + durable: true, + autoDelete: false, + cancellationToken: cancellationToken); + + _declaredExchanges.TryAdd(formattedTopic, true); + } + } + + public async Task ConfigureConsumingAsync(IChannel channel, string formattedTopic, string queueName, CancellationToken cancellationToken) + { + // need to declare an exchange so that we can bind a queue to it + await ConfigurePublishingAsync(channel, formattedTopic, cancellationToken); + + await channel.QueueDeclareAsync( + queue: queueName, + durable: true, + exclusive: true, + autoDelete: true, + cancellationToken: cancellationToken); + + await channel.QueueBindAsync( + queue: queueName, + exchange: formattedTopic, + routingKey: string.Empty, + cancellationToken: cancellationToken); + } +}