Skip to content

Commit 140ab4f

Browse files
authored
Merge pull request #1787 from dosper7/azure-service-bus-emulator-support
add azure service bus AutoProvision property
2 parents 2eea1f8 + e39a164 commit 140ab4f

3 files changed

Lines changed: 82 additions & 80 deletions

File tree

README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,25 @@ services.AddCap(x =>
287287
});
288288
```
289289

290+
### Azure Service Bus Emulator Support
291+
292+
The [Azure Service Bus Emulator](https://learn.microsoft.com/en-us/azure/service-bus-messaging/overview-emulator) uses separate ports for AMQP messaging (5672) and the HTTP Admin API (5300). Because CAP uses a single connection string for both the `ServiceBusClient` (AMQP) and the `ServiceBusAdministrationClient` (HTTP), it cannot target both ports simultaneously.
293+
294+
To work around this, set `AutoProvision` to `false` to skip automatic creation of topics, subscriptions, and rules via the Admin API. You must pre-create the required entities in the emulator's configuration instead.
295+
296+
```csharp
297+
services.AddCap(x =>
298+
{
299+
x.UseAzureServiceBus(opt =>
300+
{
301+
opt.ConnectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";
302+
opt.AutoProvision = false;
303+
});
304+
});
305+
```
306+
307+
> **Note:** When `AutoProvision` is `false`, topics, subscriptions, and subscription filter rules must already exist before the application starts. This option is also useful when entities are managed externally (e.g., via Infrastructure as Code).
308+
290309
## Dashboard
291310

292311
CAP provides a real-time dashboard to view sent and received messages and their status.
@@ -307,4 +326,4 @@ We welcome contributions! Participating in discussions, reporting issues, and su
307326

308327
### License
309328

310-
CAP is licensed under the [MIT License](https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt).
329+
CAP is licensed under the [MIT License](https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt).

src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs

Lines changed: 54 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ public async Task SubscribeAsync(IEnumerable<string> topics)
5757

5858
await ConnectAsync();
5959

60-
topics = topics.Concat(_asbOptions!.SQLFilters?.Select(o => o.Key) ?? Enumerable.Empty<string>());
60+
if (!_asbOptions.AutoProvision)
61+
return;
62+
63+
topics = topics.Concat(_asbOptions!.SQLFilters?.Select(o => o.Key) ?? []);
6164

6265
var allRules = _administrationClient!.GetRulesAsync(_asbOptions!.TopicPath, _subscriptionName).ToEnumerable();
6366
var allRuleNames = allRules.Select(o => o.Name);
@@ -88,21 +91,21 @@ public async Task SubscribeAsync(IEnumerable<string> topics)
8891
currentRuleToAdd = correlationRule;
8992
}
9093

91-
await _administrationClient.CreateRuleAsync(_asbOptions.TopicPath, _subscriptionName,
92-
new CreateRuleOptions
93-
{
94-
Name = newRule,
95-
Filter = currentRuleToAdd
96-
});
94+
await _administrationClient.CreateRuleAsync(_asbOptions.TopicPath, _subscriptionName,
95+
new CreateRuleOptions
96+
{
97+
Name = newRule,
98+
Filter = currentRuleToAdd
99+
});
97100

98-
_logger.LogInformation($"Azure Service Bus add rule: {newRule}");
101+
_logger.LogInformation("Azure Service Bus add rule: {RuleName}", newRule);
99102
}
100103

101104
foreach (var oldRule in allRuleNames.Except(topics))
102105
{
103106
await _administrationClient.DeleteRuleAsync(_asbOptions.TopicPath, _subscriptionName, oldRule);
104107

105-
_logger.LogInformation($"Azure Service Bus remove rule: {oldRule}");
108+
_logger.LogInformation("Azure Service Bus remove rule: {RuleName}", oldRule);
106109
}
107110
}
108111

@@ -141,7 +144,7 @@ public async Task RejectAsync(object? sender)
141144

142145
public async ValueTask DisposeAsync()
143146
{
144-
if (!_serviceBusProcessor!.IsProcessing)
147+
if (!_serviceBusProcessor!.IsProcessing)
145148
await _serviceBusProcessor.DisposeAsync();
146149
}
147150

@@ -196,49 +199,55 @@ public async Task ConnectAsync()
196199
{
197200
if (_serviceBusProcessor == null)
198201
{
199-
if (_asbOptions.TokenCredential != null)
200-
{
201-
_administrationClient =
202-
new ServiceBusAdministrationClient(_asbOptions.Namespace, _asbOptions.TokenCredential);
203-
_serviceBusClient = new ServiceBusClient(_asbOptions.Namespace, _asbOptions.TokenCredential);
204-
}
205-
else
206-
{
207-
_administrationClient = new ServiceBusAdministrationClient(_asbOptions.ConnectionString);
208-
_serviceBusClient = new ServiceBusClient(_asbOptions.ConnectionString);
209-
}
210202

211-
var topicConfigs =
212-
_asbOptions.CustomProducers.Select(producer =>
213-
(topicPaths: producer.TopicPath, subscribe: producer.CreateSubscription))
214-
.Append((topicPaths: _asbOptions.TopicPath, subscribe: true))
215-
.GroupBy(n => n.topicPaths, StringComparer.OrdinalIgnoreCase)
216-
.Select(n => (topicPaths: n.Key, subscribe: n.Max(o => o.subscribe)));
203+
_serviceBusClient = _asbOptions.TokenCredential is not null ?
204+
new ServiceBusClient(_asbOptions.Namespace, _asbOptions.TokenCredential) :
205+
new ServiceBusClient(_asbOptions.ConnectionString);
217206

218-
foreach (var (topicPath, subscribe) in topicConfigs)
207+
if (_asbOptions.AutoProvision)
219208
{
220-
if (!await _administrationClient.TopicExistsAsync(topicPath))
209+
if (_asbOptions.TokenCredential != null)
221210
{
222-
await _administrationClient.CreateTopicAsync(topicPath);
223-
_logger.LogInformation($"Azure Service Bus created topic: {topicPath}");
211+
_administrationClient =
212+
new ServiceBusAdministrationClient(_asbOptions.Namespace, _asbOptions.TokenCredential);
224213
}
225-
226-
if (subscribe && !await _administrationClient.SubscriptionExistsAsync(topicPath, _subscriptionName))
214+
else
227215
{
228-
var subscriptionDescription =
229-
new CreateSubscriptionOptions(topicPath, _subscriptionName)
230-
{
231-
RequiresSession = _asbOptions.EnableSessions,
232-
AutoDeleteOnIdle = _asbOptions.SubscriptionAutoDeleteOnIdle,
233-
LockDuration = _asbOptions.SubscriptionMessageLockDuration,
234-
DefaultMessageTimeToLive = _asbOptions.SubscriptionDefaultMessageTimeToLive,
235-
MaxDeliveryCount = _asbOptions.SubscriptionMaxDeliveryCount,
236-
};
216+
_administrationClient = new ServiceBusAdministrationClient(_asbOptions.ConnectionString);
217+
}
237218

238-
await _administrationClient.CreateSubscriptionAsync(subscriptionDescription);
219+
var topicConfigs =
220+
_asbOptions.CustomProducers.Select(producer =>
221+
(topicPaths: producer.TopicPath, subscribe: producer.CreateSubscription))
222+
.Append((topicPaths: _asbOptions.TopicPath, subscribe: true))
223+
.GroupBy(n => n.topicPaths, StringComparer.OrdinalIgnoreCase)
224+
.Select(n => (topicPaths: n.Key, subscribe: n.Max(o => o.subscribe)));
239225

240-
_logger.LogInformation(
241-
$"Azure Service Bus topic {topicPath} created subscription: {_subscriptionName}");
226+
foreach (var (topicPath, subscribe) in topicConfigs)
227+
{
228+
if (!await _administrationClient.TopicExistsAsync(topicPath))
229+
{
230+
await _administrationClient.CreateTopicAsync(topicPath);
231+
_logger.LogInformation("Azure Service Bus created topic: {TopicPath}", topicPath);
232+
}
233+
234+
if (subscribe && !await _administrationClient.SubscriptionExistsAsync(topicPath, _subscriptionName))
235+
{
236+
var subscriptionDescription =
237+
new CreateSubscriptionOptions(topicPath, _subscriptionName)
238+
{
239+
RequiresSession = _asbOptions.EnableSessions,
240+
AutoDeleteOnIdle = _asbOptions.SubscriptionAutoDeleteOnIdle,
241+
LockDuration = _asbOptions.SubscriptionMessageLockDuration,
242+
DefaultMessageTimeToLive = _asbOptions.SubscriptionDefaultMessageTimeToLive,
243+
MaxDeliveryCount = _asbOptions.SubscriptionMaxDeliveryCount,
244+
};
245+
246+
await _administrationClient.CreateSubscriptionAsync(subscriptionDescription);
247+
248+
_logger.LogInformation(
249+
$"Azure Service Bus topic {topicPath} created subscription: {_subscriptionName}");
250+
}
242251
}
243252
}
244253

@@ -295,39 +304,5 @@ private TransportMessage ConvertMessage(ServiceBusReceivedMessage message)
295304
return new TransportMessage(headers, message.Body);
296305
}
297306

298-
private static void CheckValidSubscriptionName(string subscriptionName)
299-
{
300-
const string pathDelimiter = @"/";
301-
const int ruleNameMaximumLength = 50;
302-
char[] invalidEntityPathCharacters = { '@', '?', '#', '*' };
303-
304-
if (string.IsNullOrWhiteSpace(subscriptionName)) throw new ArgumentNullException(subscriptionName);
305-
306-
// and "\" will be converted to "/" on the REST path anyway. Gateway/REST do not
307-
// have to worry about the begin/end slash problem, so this is purely a client side check.
308-
var tmpName = subscriptionName.Replace(@"\", pathDelimiter);
309-
if (tmpName.Length > ruleNameMaximumLength)
310-
throw new ArgumentOutOfRangeException(subscriptionName,
311-
$@"Subscribe name '{subscriptionName}' exceeds the '{ruleNameMaximumLength}' character limit.");
312-
313-
if (tmpName.StartsWith(pathDelimiter, StringComparison.OrdinalIgnoreCase) ||
314-
tmpName.EndsWith(pathDelimiter, StringComparison.OrdinalIgnoreCase))
315-
throw new ArgumentException(
316-
$@"The subscribe name cannot contain '/' as prefix or suffix. The supplied value is '{subscriptionName}'",
317-
subscriptionName);
318-
319-
if (tmpName.Contains(pathDelimiter))
320-
throw new ArgumentException($@"The subscribe name contains an invalid character '{pathDelimiter}'",
321-
subscriptionName);
322-
323-
foreach (var uriSchemeKey in invalidEntityPathCharacters)
324-
{
325-
if (subscriptionName.IndexOf(uriSchemeKey) >= 0)
326-
throw new ArgumentException(
327-
$@"'{subscriptionName}' contains character '{uriSchemeKey}' which is not allowed because it is reserved in the Uri scheme.",
328-
subscriptionName);
329-
}
330-
}
331-
332307
#endregion private methods
333308
}

src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ public class AzureServiceBusOptions
3232
/// </summary>
3333
public string Namespace { get; set; } = default!;
3434

35+
/// <summary>
36+
/// When set to <c>true</c> (default), CAP will automatically create topics, subscriptions,
37+
/// and rules using the <see cref="Azure.Messaging.ServiceBus.Administration.ServiceBusAdministrationClient"/>.
38+
/// Set to <c>false</c> to skip automatic provisioning, which is useful when the admin API
39+
/// is unavailable (e.g., Azure Service Bus Emulator).
40+
/// </summary>
41+
public bool AutoProvision { get; set; } = true;
42+
3543
/// <summary>
3644
/// Whether Service Bus sessions are enabled. If enabled, all messages must contain a
3745
/// <see cref="AzureServiceBusHeaders.SessionId" /> header. Defaults to false.

0 commit comments

Comments
 (0)