diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs index e23757844ec..42f0817a171 100644 --- a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/DependencyInjection/RabbitMQPubSubExtensions.cs @@ -91,6 +91,7 @@ public static IServiceCollection AddRabbitMQSubscriptionPublisher( services.TryAddSingleton(options ?? new SubscriptionOptions()); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(sp => sp.GetRequiredService()); return services; diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs index 103dacf741b..27812ffac52 100644 --- a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQPubSub.cs @@ -10,6 +10,7 @@ internal sealed class RabbitMQPubSub : DefaultPubSub private readonly IRabbitMQConnection _connection; private readonly IMessageSerializer _serializer; private readonly RabbitMQSubscriptionOptions _rabbitMqSubscriptionOptions; + private readonly RabbitMQTopologyHelper _topologyHelper; private readonly string _completed; private readonly int _topicBufferCapacity; private readonly TopicBufferFullMode _topicBufferFullMode; @@ -20,12 +21,14 @@ public RabbitMQPubSub( IMessageSerializer serializer, SubscriptionOptions options, RabbitMQSubscriptionOptions rabbitMqSubscriptionOptions, - ISubscriptionDiagnosticEvents diagnosticEvents) + ISubscriptionDiagnosticEvents diagnosticEvents, + RabbitMQTopologyHelper topologyHelper) : base(options, diagnosticEvents) { _connection = connection ?? throw new ArgumentNullException(nameof(connection)); _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); _rabbitMqSubscriptionOptions = rabbitMqSubscriptionOptions ?? throw new ArgumentNullException(nameof(rabbitMqSubscriptionOptions)); + _topologyHelper = topologyHelper; _topicBufferCapacity = options.TopicBufferCapacity; _topicBufferFullMode = options.TopicBufferFullMode; _completed = serializer.CompleteMessage; @@ -55,12 +58,14 @@ protected override DefaultTopic OnCreateTopic( bufferCapacity ?? _topicBufferCapacity, bufferFullMode ?? _topicBufferFullMode, _rabbitMqSubscriptionOptions, - DiagnosticEvents); + DiagnosticEvents, + _topologyHelper); private async Task PublishAsync(string formattedTopic, string message, CancellationToken cancellationToken) { var body = Encoding.UTF8.GetBytes(message); var channel = await _connection.GetChannelAsync(cancellationToken).ConfigureAwait(false); + await _topologyHelper.ConfigurePublishingAsync(channel, formattedTopic, cancellationToken).ConfigureAwait(false); await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs index 37a01cfc715..5a027d7440c 100644 --- a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs @@ -12,6 +12,7 @@ internal sealed class RabbitMQTopic : DefaultTopic private readonly IRabbitMQConnection _connection; private readonly IMessageSerializer _serializer; private readonly RabbitMQSubscriptionOptions _rabbitMqSubscriptionOptions; + private readonly RabbitMQTopologyHelper _topologyHelper; public RabbitMQTopic( string name, @@ -20,12 +21,14 @@ public RabbitMQTopic( int capacity, TopicBufferFullMode fullMode, RabbitMQSubscriptionOptions rabbitMqSubscriptionOptions, - ISubscriptionDiagnosticEvents diagnosticEvents) + ISubscriptionDiagnosticEvents diagnosticEvents, + RabbitMQTopologyHelper topologyHelper) : base(name, capacity, fullMode, diagnosticEvents) { _connection = connection ?? throw new ArgumentNullException(nameof(connection)); _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); _rabbitMqSubscriptionOptions = rabbitMqSubscriptionOptions ?? throw new ArgumentNullException(nameof(rabbitMqSubscriptionOptions)); + _topologyHelper = topologyHelper; } protected override async ValueTask OnConnectAsync(CancellationToken cancellationToken) @@ -39,24 +42,7 @@ protected override async ValueTask OnConnectAsync(Cancellation var queueName = string.IsNullOrEmpty(_rabbitMqSubscriptionOptions.QueuePrefix) ? string.Empty // use server-generated name : _rabbitMqSubscriptionOptions.QueuePrefix + Guid.NewGuid(); - - await channel.ExchangeDeclareAsync( - exchange: Name, - type: ExchangeType.Fanout, - durable: true, - autoDelete: false, - cancellationToken: cancellationToken); - await channel.QueueDeclareAsync( - queue: queueName, - durable: true, - exclusive: true, - autoDelete: true, - cancellationToken: cancellationToken); - await channel.QueueBindAsync( - queue: queueName, - exchange: Name, - routingKey: string.Empty, - cancellationToken: cancellationToken); + await _topologyHelper.ConfigureConsumingAsync(channel, Name, queueName, cancellationToken).ConfigureAwait(false); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (_, args) => diff --git a/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopologyHelper.cs b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopologyHelper.cs new file mode 100644 index 00000000000..c22dbfd304a --- /dev/null +++ b/src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopologyHelper.cs @@ -0,0 +1,45 @@ +using System.Collections.Concurrent; +using RabbitMQ.Client; + +namespace HotChocolate.Subscriptions.RabbitMQ; + +internal class RabbitMQTopologyHelper +{ + private readonly ConcurrentDictionary _declaredExchanges = new(); + + public async ValueTask ConfigurePublishingAsync(IChannel channel, string formattedTopic, CancellationToken cancellationToken) + { + // Create an exchange if it wasn't created. + // This extra check isn't required, but it's faster to do it in memory than go to RabbitMQ every time before publishing a message + if (!_declaredExchanges.ContainsKey(formattedTopic)) + { + await channel.ExchangeDeclareAsync( + exchange: formattedTopic, + type: ExchangeType.Fanout, + durable: true, + autoDelete: false, + cancellationToken: cancellationToken); + + _declaredExchanges.TryAdd(formattedTopic, true); + } + } + + public async Task ConfigureConsumingAsync(IChannel channel, string formattedTopic, string queueName, CancellationToken cancellationToken) + { + // need to declare an exchange so that we can bind a queue to it + await ConfigurePublishingAsync(channel, formattedTopic, cancellationToken); + + await channel.QueueDeclareAsync( + queue: queueName, + durable: true, + exclusive: true, + autoDelete: true, + cancellationToken: cancellationToken); + + await channel.QueueBindAsync( + queue: queueName, + exchange: formattedTopic, + routingKey: string.Empty, + cancellationToken: cancellationToken); + } +}