Skip to content

Commit 273d0cb

Browse files
Add adaptive embedding throughput shaping for 429 throttling
- Downshift embedding batch size on repeated 429s by recursively splitting batches - Reuse successful smaller batch size for subsequent requests in the same run - Fail clearly when batch size 1 still receives sustained 429 throttling - Add sequential request pacing with configurable min inter-request delay - Add configurable MaxEmbeddingBatchSize and MinInterRequestDelayMs options - Harden Retry-After parsing for retry-after, retry-after-ms, x-ms-retry-after-ms, and message hints - Update configuration comments and default appsettings values
1 parent 5b3721a commit 273d0cb

4 files changed

Lines changed: 192 additions & 26 deletions

File tree

EssentialCSharp.Chat.Shared/Extensions/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ public static IServiceCollection AddAzureOpenAIServices(
100100
// Configure AI options from configuration
101101
services.Configure<AIOptions>(configuration.GetSection("AIOptions"));
102102

103-
// Configure retry options from configuration section
104-
// Environment variables like EmbeddingRetry:MaxRetries will override defaults
103+
// Configure retry options from configuration section.
104+
// Environment variables can override via AIOptions__EmbeddingRetry__*.
105105
services.AddOptions<EmbeddingRetryOptions>()
106106
.Bind(configuration.GetSection(EmbeddingRetryOptions.SectionPath))
107107
.ValidateDataAnnotations()

EssentialCSharp.Chat.Shared/Models/EmbeddingRetryOptions.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ public sealed class EmbeddingRetryOptions
3434
[Range(1, 600000)]
3535
public int MaxDelayMs { get; set; } = 60000;
3636

37+
/// <summary>
38+
/// Maximum embedding request payload size sent per API call.
39+
/// The service may adaptively downshift below this value when throttled.
40+
/// </summary>
41+
[Range(1, 2048)]
42+
public int MaxEmbeddingBatchSize { get; set; } = 2048;
43+
44+
/// <summary>
45+
/// Minimum delay between embedding API requests in milliseconds.
46+
/// This adds request pacing to reduce sustained rate-limit pressure.
47+
/// </summary>
48+
[Range(0, 600000)]
49+
public int MinInterRequestDelayMs { get; set; } = 250;
50+
3751
/// <summary>
3852
/// Exponential backoff multiplier. Each retry delay is multiplied by this value.
3953
/// For example, with baseDelay=1000ms and multiplier=2.0:
@@ -74,6 +88,15 @@ public void Validate()
7488
if (BaseDelayMs > MaxDelayMs)
7589
throw new InvalidOperationException("BaseDelayMs must be less than or equal to MaxDelayMs.");
7690

91+
if (MaxEmbeddingBatchSize <= 0)
92+
throw new InvalidOperationException("MaxEmbeddingBatchSize must be positive.");
93+
94+
if (MaxEmbeddingBatchSize > 2048)
95+
throw new InvalidOperationException("MaxEmbeddingBatchSize cannot exceed Azure embedding API limit (2048).");
96+
97+
if (MinInterRequestDelayMs < 0)
98+
throw new InvalidOperationException("MinInterRequestDelayMs must be non-negative.");
99+
77100
if (BackoffMultiplier < 1.0)
78101
throw new InvalidOperationException("BackoffMultiplier must be >= 1.0.");
79102

EssentialCSharp.Chat.Shared/Services/EmbeddingService.cs

Lines changed: 165 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Microsoft.Extensions.VectorData;
77
using Npgsql;
88
using System.ClientModel;
9+
using System.ClientModel.Primitives;
910
using System.Globalization;
1011

1112
namespace EssentialCSharp.Chat.Common.Services;
@@ -32,9 +33,12 @@ public partial class EmbeddingService(
3233

3334
private readonly EmbeddingRetryOptions _retryOptions = ValidateRetryOptions(retryOptions?.Value ?? new EmbeddingRetryOptions());
3435
private readonly ILogger<EmbeddingService>? _logger = logger;
36+
private static readonly SemaphoreSlim _embeddingRequestLock = new(1, 1);
37+
private DateTimeOffset _lastEmbeddingRequestStartedUtc = DateTimeOffset.MinValue;
3538

3639
// Only allow simple identifiers: letters, digits, and underscores, starting with a letter or underscore.
3740
private static readonly Regex _safeIdentifierRegex = new(@"^[a-zA-Z_][a-zA-Z0-9_]*$", RegexOptions.Compiled);
41+
private static readonly Regex _retryAfterSecondsRegex = new(@"retry\s+after\s+(?<seconds>\d+)\s+seconds?", RegexOptions.Compiled | RegexOptions.IgnoreCase);
3842

3943
private static EmbeddingRetryOptions ValidateRetryOptions(EmbeddingRetryOptions options)
4044
{
@@ -96,6 +100,10 @@ private static bool IsTransientStatusCode(int statusCode) =>
96100
return ex.InnerException is null ? null : TryGetStatusCode(ex.InnerException);
97101
}
98102

103+
private static bool IsRateLimitError(Exception ex) =>
104+
TryGetStatusCode(ex) == 429
105+
|| ex.Message.Contains("RateLimitReached", StringComparison.OrdinalIgnoreCase);
106+
99107
/// <summary>
100108
/// Extracts the Retry-After delay from known exception types if present.
101109
/// Returns null if the header is not present or invalid.
@@ -105,19 +113,61 @@ private static bool IsTransientStatusCode(int statusCode) =>
105113
if (ex is ClientResultException clientResultException)
106114
{
107115
var rawResponse = clientResultException.GetRawResponse();
108-
var headerValue = rawResponse?.Headers.TryGetValue("retry-after", out var value) == true
109-
? value
110-
: null;
111-
if (TryParseRetryAfterValue(headerValue, out var retryAfter))
116+
if (TryGetHeaderValue(rawResponse, "x-ms-retry-after-ms", out var msHeaderValue)
117+
&& TryParseMilliseconds(msHeaderValue, out var msRetryAfter))
118+
{
119+
return msRetryAfter;
120+
}
121+
122+
if (TryGetHeaderValue(rawResponse, "retry-after-ms", out var retryAfterMsHeaderValue)
123+
&& TryParseMilliseconds(retryAfterMsHeaderValue, out var retryAfterMs))
124+
{
125+
return retryAfterMs;
126+
}
127+
128+
if (TryGetHeaderValue(rawResponse, "retry-after", out var headerValue)
129+
&& TryParseRetryAfterValue(headerValue, out var retryAfter))
130+
{
112131
return retryAfter;
132+
}
113133
}
114134

115-
if (ex is HttpRequestException)
116-
return null;
135+
if (TryParseRetryAfterValue(ex.Message, out var messageRetryAfter))
136+
return messageRetryAfter;
117137

118138
return ex.InnerException is null ? null : ExtractRetryAfter(ex.InnerException);
119139
}
120140

141+
private static bool TryGetHeaderValue(PipelineResponse? response, string headerName, out string? headerValue)
142+
{
143+
headerValue = null;
144+
if (response is null)
145+
return false;
146+
147+
if (response.Headers.TryGetValue(headerName, out var value) && !string.IsNullOrWhiteSpace(value))
148+
{
149+
headerValue = value;
150+
return true;
151+
}
152+
153+
return false;
154+
}
155+
156+
private static bool TryParseMilliseconds(string? value, out TimeSpan retryAfter)
157+
{
158+
retryAfter = default;
159+
if (string.IsNullOrWhiteSpace(value))
160+
return false;
161+
162+
if (int.TryParse(value, NumberStyles.Integer, CultureInfo.InvariantCulture, out var msValue) && msValue >= 0)
163+
{
164+
retryAfter = TimeSpan.FromMilliseconds(msValue);
165+
return true;
166+
}
167+
168+
return false;
169+
}
170+
121171
private static bool TryParseRetryAfterValue(string? headerValue, out TimeSpan retryAfter)
122172
{
123173
retryAfter = default;
@@ -140,6 +190,15 @@ private static bool TryParseRetryAfterValue(string? headerValue, out TimeSpan re
140190
}
141191
}
142192

193+
var secondsMatch = _retryAfterSecondsRegex.Match(headerValue);
194+
if (secondsMatch.Success
195+
&& int.TryParse(secondsMatch.Groups["seconds"].Value, NumberStyles.Integer, CultureInfo.InvariantCulture, out var extractedSeconds)
196+
&& extractedSeconds >= 0)
197+
{
198+
retryAfter = TimeSpan.FromSeconds(extractedSeconds);
199+
return true;
200+
}
201+
143202
return false;
144203
}
145204

@@ -165,6 +224,31 @@ private TimeSpan ClampRetryDelay(TimeSpan delay) =>
165224
? TimeSpan.FromMilliseconds(_retryOptions.MaxDelayMs)
166225
: delay;
167226

227+
private async Task<T> ExecuteEmbeddingRequestWithPacingAsync<T>(
228+
Func<CancellationToken, Task<T>> embeddingRequest,
229+
CancellationToken cancellationToken)
230+
{
231+
await _embeddingRequestLock.WaitAsync(cancellationToken);
232+
try
233+
{
234+
var minimumSpacing = TimeSpan.FromMilliseconds(_retryOptions.MinInterRequestDelayMs);
235+
if (minimumSpacing > TimeSpan.Zero && _lastEmbeddingRequestStartedUtc != DateTimeOffset.MinValue)
236+
{
237+
var elapsed = DateTimeOffset.UtcNow - _lastEmbeddingRequestStartedUtc;
238+
var remainingDelay = minimumSpacing - elapsed;
239+
if (remainingDelay > TimeSpan.Zero)
240+
await Task.Delay(remainingDelay, cancellationToken);
241+
}
242+
243+
_lastEmbeddingRequestStartedUtc = DateTimeOffset.UtcNow;
244+
return await embeddingRequest(cancellationToken);
245+
}
246+
finally
247+
{
248+
_embeddingRequestLock.Release();
249+
}
250+
}
251+
168252
/// <summary>
169253
/// Wraps an async operation with retry logic for transient failures.
170254
/// </summary>
@@ -237,7 +321,9 @@ private async Task<T> ExecuteWithRetryAsync<T>(
237321
public async Task<ReadOnlyMemory<float>> GenerateEmbeddingAsync(string text, CancellationToken cancellationToken = default)
238322
{
239323
var embedding = await ExecuteWithRetryAsync(
240-
async ct => await embeddingGenerator.GenerateAsync(text, cancellationToken: ct),
324+
async ct => await ExecuteEmbeddingRequestWithPacingAsync(
325+
async pacingCt => await embeddingGenerator.GenerateAsync(text, cancellationToken: pacingCt),
326+
ct),
241327
"GenerateEmbedding",
242328
cancellationToken);
243329
return embedding.Vector;
@@ -287,44 +373,89 @@ public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore(
287373

288374
// ── Step 2 & 3: Batch-embed and immediately upsert each batch ─────────────────
289375
// Azure OpenAI supports at most EmbeddingBatchSize inputs per GenerateAsync call.
290-
// bookContents is streamed in fixed-size batches without full upfront materialization,
291-
// keeping peak memory bounded to one batch of chunk objects and their embeddings at a time.
376+
// The effective request size starts at min(EmbeddingBatchSize, MaxEmbeddingBatchSize)
377+
// and adaptively downshifts on 429 throttling responses.
378+
// bookContents is streamed in batches without full upfront materialization,
379+
// keeping peak memory bounded to one batch (or adaptive split) at a time.
292380
// The staging-swap (Step 3) is safe because it only runs after all batches have
293381
// been successfully upserted.
294-
var buffer = new List<BookContentChunk>(EmbeddingBatchSize);
382+
var configuredMaxBatchSize = Math.Clamp(_retryOptions.MaxEmbeddingBatchSize, 1, EmbeddingBatchSize);
383+
var adaptiveBatchSize = configuredMaxBatchSize;
384+
var buffer = new List<BookContentChunk>(configuredMaxBatchSize);
295385
int totalCount = 0;
296386

297-
async Task EmbedAndUpsertBatchAsync()
387+
async Task EmbedAndUpsertExactBatchAsync(IReadOnlyList<BookContentChunk> batch)
298388
{
299389
var batchEmbeddings = await ExecuteWithRetryAsync(
300-
async ct => await embeddingGenerator.GenerateAsync(
301-
buffer.Select(c => c.ChunkText), cancellationToken: ct),
302-
$"GenerateBatchEmbeddings(size={buffer.Count})",
390+
async ct => await ExecuteEmbeddingRequestWithPacingAsync(
391+
async pacingCt => await embeddingGenerator.GenerateAsync(
392+
batch.Select(c => c.ChunkText), cancellationToken: pacingCt),
393+
ct),
394+
$"GenerateBatchEmbeddings(size={batch.Count})",
303395
cancellationToken);
304396

305-
if (batchEmbeddings.Count != buffer.Count)
397+
if (batchEmbeddings.Count != batch.Count)
306398
throw new InvalidOperationException(
307-
$"Embedding count mismatch: expected {buffer.Count}, got {batchEmbeddings.Count}.");
399+
$"Embedding count mismatch: expected {batch.Count}, got {batchEmbeddings.Count}.");
400+
401+
for (int i = 0; i < batch.Count; i++)
402+
batch[i].TextEmbedding = batchEmbeddings[i].Vector;
308403

309-
for (int i = 0; i < buffer.Count; i++)
310-
buffer[i].TextEmbedding = batchEmbeddings[i].Vector;
404+
await staging.UpsertAsync(batch, cancellationToken);
405+
totalCount += batch.Count;
406+
}
407+
408+
async Task EmbedAndUpsertBatchAdaptiveAsync(IReadOnlyList<BookContentChunk> batch)
409+
{
410+
try
411+
{
412+
await EmbedAndUpsertExactBatchAsync(batch);
413+
}
414+
catch (Exception ex) when (IsRateLimitError(ex) && batch.Count > 1)
415+
{
416+
var splitSize = Math.Max(1, batch.Count / 2);
417+
if (adaptiveBatchSize > splitSize)
418+
{
419+
var previousAdaptiveBatchSize = adaptiveBatchSize;
420+
adaptiveBatchSize = splitSize;
421+
if (_logger is not null)
422+
{
423+
LogEmbeddingBatchDownshift(_logger, previousAdaptiveBatchSize, adaptiveBatchSize, _retryOptions.MaxRetries + 1);
424+
}
425+
}
311426

312-
await staging.UpsertAsync(buffer, cancellationToken);
313-
totalCount += buffer.Count;
314-
buffer.Clear();
427+
var first = batch.Take(splitSize).ToArray();
428+
var second = batch.Skip(splitSize).ToArray();
429+
await EmbedAndUpsertBatchAdaptiveAsync(first);
430+
await EmbedAndUpsertBatchAdaptiveAsync(second);
431+
}
432+
catch (Exception ex) when (IsRateLimitError(ex) && batch.Count == 1)
433+
{
434+
throw new InvalidOperationException(
435+
$"Embedding request failed with repeated 429 rate limits even at batch size 1 after {_retryOptions.MaxRetries + 1} attempts.",
436+
ex);
437+
}
315438
}
316439

317440
try
318441
{
319442
foreach (var chunk in bookContents)
320443
{
321444
buffer.Add(chunk);
322-
if (buffer.Count == EmbeddingBatchSize)
323-
await EmbedAndUpsertBatchAsync();
445+
if (buffer.Count >= adaptiveBatchSize)
446+
{
447+
var batchToProcess = buffer.ToArray();
448+
buffer.Clear();
449+
await EmbedAndUpsertBatchAdaptiveAsync(batchToProcess);
450+
}
324451
}
325452

326453
if (buffer.Count > 0)
327-
await EmbedAndUpsertBatchAsync();
454+
{
455+
var batchToProcess = buffer.ToArray();
456+
buffer.Clear();
457+
await EmbedAndUpsertBatchAdaptiveAsync(batchToProcess);
458+
}
328459

329460
Console.WriteLine($"Uploaded {totalCount} chunks to staging collection '{stagingName}'.");
330461
}
@@ -417,4 +548,14 @@ private static partial void LogEmbeddingRetryAttemptsExhausted(
417548
int attemptCount,
418549
string lastError,
419550
int? statusCode);
551+
552+
[LoggerMessage(
553+
EventId = 12004,
554+
Level = LogLevel.Warning,
555+
Message = "Embedding batch downshift triggered after throttling. PreviousBatchSize={PreviousBatchSize}, NewBatchSize={NewBatchSize}, RetryAttemptsPerRequest={RetryAttemptsPerRequest}")]
556+
private static partial void LogEmbeddingBatchDownshift(
557+
ILogger logger,
558+
int previousBatchSize,
559+
int newBatchSize,
560+
int retryAttemptsPerRequest);
420561
}

EssentialCSharp.Web/appsettings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
"MaxRetries": 5,
2626
"BaseDelayMs": 1000,
2727
"MaxDelayMs": 60000,
28+
"MaxEmbeddingBatchSize": 2048,
29+
"MinInterRequestDelayMs": 250,
2830
"BackoffMultiplier": 2.0,
2931
"MaxJitterFraction": 0.2
3032
},

0 commit comments

Comments
 (0)