Skip to content

Commit bcf07e4

Browse files
marcominervadluc
andauthored
Azure queues config (#425)
## Motivation and Context (Why the change? What's the scenario?) Currently, Azure Queues settings like message processing retries are hard-coded. ## High level description (Approach, Design) This PR moves the settings to the `AzureQueuesConfig` class. --------- Co-authored-by: Devis Lucato <devis@microsoft.com>
1 parent ce815a0 commit bcf07e4

4 files changed

Lines changed: 88 additions & 24 deletions

File tree

extensions/AzureQueues/AzureQueuesConfig.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

3+
using System;
34
using System.Text.Json.Serialization;
5+
using System.Text.RegularExpressions;
46
using Azure;
57
using Azure.Core;
68
using Azure.Storage;
@@ -17,6 +19,8 @@ public class AzureQueuesConfig
1719
private AzureSasCredential? _azureSasCredential;
1820
private TokenCredential? _tokenCredential;
1921

22+
private static readonly Regex s_validPoisonQueueSuffixRegex = new(@"^[a-z0-9-]{1}(?!.*--)[a-z0-9-]{0,28}[a-z0-9]$");
23+
2024
[JsonConverter(typeof(JsonStringEnumConverter))]
2125
public enum AuthTypes
2226
{
@@ -35,6 +39,38 @@ public enum AuthTypes
3539
public string AccountKey { get; set; } = "";
3640
public string EndpointSuffix { get; set; } = "core.windows.net";
3741

42+
/// <summary>
43+
/// How often to check if there are new messages.
44+
/// </summary>
45+
public int PollDelayMsecs { get; set; } = 100;
46+
47+
/// <summary>
48+
/// How many messages to fetch at a time.
49+
/// </summary>
50+
public int FetchBatchSize { get; set; } = 3;
51+
52+
/// <summary>
53+
/// How long to lock messages once fetched. Azure Queue default is 30 secs.
54+
/// </summary>
55+
public int FetchLockSeconds { get; set; } = 300;
56+
57+
/// <summary>
58+
/// How many times to dequeue a messages and process before moving it to a poison queue.
59+
/// </summary>
60+
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;
61+
62+
/// <summary>
63+
/// Suffix used for the poison queues.
64+
/// </summary>
65+
private string? _poisonQueueSuffix = "-poison";
66+
67+
public string PoisonQueueSuffix
68+
{
69+
get => this._poisonQueueSuffix!;
70+
// Queue names must be lowercase.
71+
set => this._poisonQueueSuffix = value?.ToLowerInvariant() ?? string.Empty;
72+
}
73+
3874
public void SetCredential(StorageSharedKeyCredential credential)
3975
{
4076
this.Auth = AuthTypes.ManualStorageSharedKeyCredential;
@@ -70,4 +106,27 @@ public TokenCredential GetTokenCredential()
70106
return this._tokenCredential
71107
?? throw new ConfigurationException("TokenCredential not defined");
72108
}
109+
110+
/// <summary>
111+
/// Verify that the current state is valid.
112+
/// </summary>
113+
public void Validate()
114+
{
115+
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.PollDelayMsecs);
116+
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.FetchBatchSize);
117+
ArgumentOutOfRangeException.ThrowIfLessThan(this.FetchLockSeconds, 30);
118+
ArgumentOutOfRangeException.ThrowIfNegative(this.MaxRetriesBeforePoisonQueue);
119+
ArgumentException.ThrowIfNullOrWhiteSpace(this.PoisonQueueSuffix);
120+
121+
// Queue names must follow the rules described at
122+
// https://learn.microsoft.com/rest/api/storageservices/naming-queues-and-metadata#queue-names.
123+
// In this case, we need to validate only the suffix part, so rules are slightly different
124+
// (for example, as it is a suffix, it can safely start with a dash (-) character).
125+
// Queue names can be up to 63 characters long, so for the suffix we define a maximum length
126+
// of 30, so there is room for the other name part.
127+
if (!s_validPoisonQueueSuffixRegex.IsMatch(this.PoisonQueueSuffix))
128+
{
129+
throw new ArgumentException($"Invalid {nameof(this.PoisonQueueSuffix)} format.", nameof(this.PoisonQueueSuffix));
130+
}
131+
}
73132
}

extensions/AzureQueues/AzureQueuesPipeline.cs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,12 @@ private sealed class MessageEventArgs : EventArgs
3535
/// </summary>
3636
private event AsyncMessageHandler<MessageEventArgs>? Received;
3737

38-
// How often to check if there are new messages
39-
private const int PollDelayMsecs = 100;
40-
41-
// How many messages to fetch at a time
42-
private const int FetchBatchSize = 3;
43-
44-
// How long to lock messages once fetched. Azure Queue default is 30 secs.
45-
private const int FetchLockSeconds = 300;
46-
47-
// How many times to dequeue a messages and process before moving it to a poison queue
48-
private const int MaxRetryBeforePoisonQueue = 20;
49-
50-
// Suffix used for the poison queues
51-
private const string PoisonQueueSuffix = "-poison";
52-
5338
// Queue client builder, requiring the queue name in input
5439
private readonly Func<string, QueueClient> _clientBuilder;
5540

41+
// Queue confirguration
42+
private readonly AzureQueuesConfig _config;
43+
5644
// Queue client, once connected
5745
private QueueClient? _queue;
5846

@@ -77,6 +65,9 @@ public AzureQueuesPipeline(
7765
AzureQueuesConfig config,
7866
ILogger<AzureQueuesPipeline>? log = null)
7967
{
68+
this._config = config;
69+
this._config.Validate();
70+
8071
this._log = log ?? DefaultLogger<AzureQueuesPipeline>.Instance;
8172

8273
switch (config.Auth)
@@ -161,14 +152,14 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt
161152
Response? result = await this._queue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
162153
this._log.LogTrace("Queue ready: status code {0}", result?.Status);
163154

164-
this._poisonQueue = this._clientBuilder(this._queueName + PoisonQueueSuffix);
155+
this._poisonQueue = this._clientBuilder(this._queueName + this._config.PoisonQueueSuffix);
165156
result = await this._poisonQueue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
166157
this._log.LogTrace("Poison queue ready: status code {0}", result?.Status);
167158

168159
if (options.DequeueEnabled)
169160
{
170-
this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, PollDelayMsecs);
171-
this._dispatchTimer = new Timer(PollDelayMsecs); // milliseconds
161+
this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, this._config.PollDelayMsecs);
162+
this._dispatchTimer = new Timer(this._config.PollDelayMsecs); // milliseconds
172163
this._dispatchTimer.Elapsed += this.DispatchMessages;
173164
this._dispatchTimer.Start();
174165
}
@@ -201,7 +192,7 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
201192

202193
try
203194
{
204-
if (message.DequeueCount <= MaxRetryBeforePoisonQueue)
195+
if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue)
205196
{
206197
bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false);
207198
if (success)
@@ -271,7 +262,7 @@ private void DispatchMessages(object? sender, ElapsedEventArgs ev)
271262
try
272263
{
273264
// Fetch and Hide N messages
274-
Response<QueueMessage[]> receiveMessages = this._queue.ReceiveMessages(FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(FetchLockSeconds));
265+
Response<QueueMessage[]> receiveMessages = this._queue.ReceiveMessages(this._config.FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(this._config.FetchLockSeconds));
275266
if (receiveMessages.HasValue && receiveMessages.Value.Length > 0)
276267
{
277268
messages = receiveMessages.Value;
@@ -336,10 +327,10 @@ private async Task MoveMessageToPoisonQueueAsync(QueueMessage message, Cancellat
336327

337328
var poisonMsg = new
338329
{
339-
MessageText = message.MessageText,
330+
message.MessageText,
340331
Id = message.MessageId,
341-
InsertedOn = message.InsertedOn,
342-
DequeueCount = message.DequeueCount,
332+
message.InsertedOn,
333+
message.DequeueCount,
343334
};
344335

345336
var neverExpire = TimeSpan.FromSeconds(-1);

extensions/AzureQueues/DependencyInjection.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public static partial class KernelMemoryBuilderExtensions
1313
{
1414
public static IKernelMemoryBuilder WithAzureQueuesOrchestration(this IKernelMemoryBuilder builder, AzureQueuesConfig config)
1515
{
16+
config.Validate();
17+
1618
builder.Services.AddAzureQueuesOrchestration(config);
1719
return builder;
1820
}
@@ -22,6 +24,8 @@ public static partial class DependencyInjection
2224
{
2325
public static IServiceCollection AddAzureQueuesOrchestration(this IServiceCollection services, AzureQueuesConfig config)
2426
{
27+
config.Validate();
28+
2529
IQueue QueueFactory(IServiceProvider serviceProvider)
2630
{
2731
return serviceProvider.GetService<AzureQueuesPipeline>()

service/Service/appsettings.json

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,17 @@
276276
// Note: you can use an env var 'KernelMemory__Services__AzureQueue__ConnectionString' to set this
277277
"ConnectionString": "",
278278
// Setting used only for country clouds
279-
"EndpointSuffix": "core.windows.net"
279+
"EndpointSuffix": "core.windows.net",
280+
// How often to check if there are new messages
281+
"PollDelayMsecs": 100,
282+
// How many messages to fetch at a time
283+
"FetchBatchSize": 3,
284+
// How long to lock messages once fetched. Azure Queue default is 30 secs
285+
"FetchLockSeconds": 300,
286+
// How many times to dequeue a messages and process before moving it to a poison queue
287+
"MaxRetriesBeforePoisonQueue": 20,
288+
// Suffix used for the poison queues.
289+
"PoisonQueueSuffix": "-poison"
280290
},
281291
"Elasticsearch": {
282292
// SHA-256 fingerprint. When running the docker image this is printed after starting the server

0 commit comments

Comments
 (0)