Skip to content

Commit e41a572

Browse files
author
Timothy Dodd
committed
Improve batching, logging, and duplicate handling
Enhanced `BatchingService` and `LogController` to improve scalability, robustness, and clarity: - Added throttling for drop warnings to reduce log spam. - Optimized queue size enforcement by dropping 10% of the queue at capacity. - Handled HTTP 206 responses as successful in `SendBatchWithRetryAsync`. - Differentiated between client (4xx) and server (5xx) errors; client errors no longer trigger retries. - Increased default batch size, queue size, and batch rate limits in `BatchingOptions` and `appsettings.json`. - Improved duplicate log handling in `Create`: - Counted duplicates separately and excluded them from validation errors. - Adjusted batch status and HTTP responses to treat duplicates as expected behavior. - Enhanced logging and metrics: - Logged duplicate counts at debug level. - Summarized validation errors and limited detailed logs to the first five. - Updated metrics to exclude duplicates from validation error counts. These changes optimize performance, reduce unnecessary retries, and provide clearer insights into system behavior.
1 parent 684333c commit e41a572

3 files changed

Lines changed: 74 additions & 26 deletions

File tree

src/LogMkAgent/Services/BatchService.cs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public class BatchingService : IDisposable
5050
private readonly TimeSpan _maxWaitTime = TimeSpan.FromSeconds(2); // Force send after 2 seconds max
5151
private DateTime _firstLogAddedTime = DateTime.MinValue;
5252

53+
// Throttle drop warnings to avoid log spam
54+
private DateTime _lastDropWarning = DateTime.MinValue;
55+
private long _dropsSinceLastWarning;
56+
5357
private volatile bool _disposed;
5458

5559
public BatchingService(
@@ -107,16 +111,26 @@ public void AddData(LogLine data)
107111
return;
108112
}
109113

110-
// Enforce queue size limit - drop oldest items if at capacity
114+
// Enforce queue size limit - drop 10% of queue to make room and avoid per-item churn
111115
if (_options.MaxQueueSize > 0 && _batchData.Count >= _options.MaxQueueSize)
112116
{
117+
var toDrop = Math.Max(1, _options.MaxQueueSize / 10);
113118
var dropped = 0;
114-
while (_batchData.Count >= _options.MaxQueueSize && _batchData.TryDequeue(out _))
119+
while (dropped < toDrop && _batchData.TryDequeue(out _))
115120
{
116121
dropped++;
117122
}
118123
Interlocked.Add(ref _totalDropped, dropped);
119-
_logger.LogWarning("Queue at capacity ({MaxSize}), dropped {Dropped} oldest items", _options.MaxQueueSize, dropped);
124+
Interlocked.Add(ref _dropsSinceLastWarning, dropped);
125+
126+
// Throttle warning to at most once per 10 seconds
127+
var now = DateTime.UtcNow;
128+
if ((now - _lastDropWarning).TotalSeconds >= 10)
129+
{
130+
_logger.LogWarning("Queue at capacity ({MaxSize}), dropped {Dropped} oldest items (total since last warning: {TotalDrops})",
131+
_options.MaxQueueSize, dropped, Interlocked.Exchange(ref _dropsSinceLastWarning, 0));
132+
_lastDropWarning = now;
133+
}
120134
}
121135

122136
_batchData.Enqueue(data);
@@ -423,20 +437,34 @@ private async Task<bool> SendBatchWithRetryAsync(List<LogLine> batch, int attemp
423437

424438
var response = await _httpClient.SendDataAsync("api/log", batch, linkedCts.Token).ConfigureAwait(false);
425439

426-
if (response.IsSuccessStatusCode)
440+
var statusCode = (int)response.StatusCode;
441+
442+
if (response.IsSuccessStatusCode || statusCode == 206)
427443
{
444+
// 2xx or 206 Partial — API received and processed the batch
428445
_circuitBreaker.RecordSuccess();
429446
Interlocked.Add(ref _totalItemsProcessed, batch.Count);
430447
Interlocked.Increment(ref _totalBatchesSent);
431448
_lastSuccessfulSend = DateTime.UtcNow;
432449

433-
_logger.LogDebug("Successfully sent batch of {Count} log lines", batch.Count);
450+
_logger.LogDebug("Successfully sent batch of {Count} log lines (status {StatusCode})", batch.Count, statusCode);
434451
return true;
435452
}
453+
else if (statusCode >= 400 && statusCode < 500)
454+
{
455+
// 4xx client error — API received the request but rejected it (validation, duplicates, etc.)
456+
// Do NOT retry: the same data will get the same response
457+
_circuitBreaker.RecordSuccess(); // API is reachable, not a connectivity issue
458+
_logger.LogWarning("Batch rejected by API ({StatusCode} {ReasonPhrase}), not retrying — {Count} logs dropped",
459+
response.StatusCode, response.ReasonPhrase, batch.Count);
460+
Interlocked.Add(ref _totalFailures, batch.Count);
461+
return true; // Return true to prevent retry
462+
}
436463
else
437464
{
465+
// 5xx server error — API is having issues, worth retrying
438466
_circuitBreaker.RecordFailure();
439-
_logger.LogWarning("HTTP error sending batch: {StatusCode} {ReasonPhrase}",
467+
_logger.LogWarning("Server error sending batch: {StatusCode} {ReasonPhrase}",
440468
response.StatusCode, response.ReasonPhrase);
441469
return false;
442470
}
@@ -575,9 +603,9 @@ private class BatchItem
575603
public class BatchingOptions
576604
{
577605
public TimeSpan BatchInterval { get; set; } = TimeSpan.FromSeconds(2); // Now acts as max wait time
578-
public int MaxBatchSize { get; set; } = 100;
606+
public int MaxBatchSize { get; set; } = 500;
579607
public TimeSpan SendTimeout { get; set; } = TimeSpan.FromSeconds(30);
580-
public int MaxBatchesPerMinute { get; set; } = 60;
608+
public int MaxBatchesPerMinute { get; set; } = 120;
581609
public int MaxQueueSize { get; set; } = 10000;
582610
public int MaxRetryQueueSize { get; set; } = 5000;
583611
public int CircuitBreakerFailureThreshold { get; set; } = 5;

src/LogMkAgent/appsettings.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@
2929
"MaxDaysOld": 30
3030
},
3131
"BatchingOptions": {
32-
"MaxBatchesPerMinute": 60,
33-
"MaxQueueSize": 10000,
34-
"MaxRetryQueueSize": 5000
32+
"MaxBatchSize": 500,
33+
"MaxBatchesPerMinute": 120,
34+
"MaxQueueSize": 50000,
35+
"MaxRetryQueueSize": 10000
3536
}
3637
}

src/LogMkApi/Controllers/LogController.cs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public async Task<ActionResult<LogResponse>> Create(
9494
var validLogs = new List<Log>();
9595
var errors = new List<LogValidationError>();
9696
var skippedCount = 0;
97+
var duplicateCount = 0;
9798

9899
_logger.LogDebug("Processing batch {BatchId} with {Count} log lines", batchId, logLines.Count);
99100
_metrics.IncrementLogsReceived(logLines.Count);
@@ -111,6 +112,14 @@ public async Task<ActionResult<LogResponse>> Create(
111112

112113
if (!validationResult.IsValid)
113114
{
115+
// If the ONLY error is duplicate detection, count it separately (not as an error)
116+
if (validationResult.Errors.Length == 1 && validationResult.Errors[0] == "Duplicate log entry detected")
117+
{
118+
duplicateCount++;
119+
skippedCount++;
120+
continue;
121+
}
122+
114123
errors.Add(new LogValidationError
115124
{
116125
Index = i,
@@ -214,7 +223,13 @@ public async Task<ActionResult<LogResponse>> Create(
214223
}
215224
}
216225

217-
// Log validation errors for monitoring
226+
// Log duplicate count at debug level (duplicates are normal, not errors)
227+
if (duplicateCount > 0)
228+
{
229+
_logger.LogDebug("Batch {BatchId}: {DuplicateCount} duplicate logs skipped", batchId, duplicateCount);
230+
}
231+
232+
// Log real validation errors for monitoring
218233
if (errors.Any())
219234
{
220235
// Group errors by type for better insights
@@ -225,7 +240,7 @@ public async Task<ActionResult<LogResponse>> Create(
225240

226241
_logger.LogWarning("Batch {BatchId}: {ErrorCount} validation errors, {SkippedCount} logs skipped. " +
227242
"Error breakdown: {ErrorSummary}",
228-
batchId, errors.Count, skippedCount,
243+
batchId, errors.Count, skippedCount - duplicateCount,
229244
string.Join(", ", errorSummary.Select(kvp => $"{kvp.Key}: {kvp.Value}")));
230245

231246
// Log individual rejected lines for debugging (limit to first 5)
@@ -235,7 +250,7 @@ public async Task<ActionResult<LogResponse>> Create(
235250
batchId, error.Index, string.Join(", ", error.Errors), error.LogLine);
236251
}
237252

238-
_metrics.IncrementErrors("validation", skippedCount);
253+
_metrics.IncrementErrors("validation", errors.Count);
239254
}
240255

241256
// Log insert errors
@@ -255,7 +270,7 @@ public async Task<ActionResult<LogResponse>> Create(
255270
ProcessedCount = insertedCount,
256271
SkippedCount = skippedCount,
257272
ReceivedAt = receivedAt,
258-
Status = GetBatchStatus(logLines.Count, insertedCount, skippedCount, insertErrors.Any())
273+
Status = GetBatchStatus(logLines.Count, insertedCount, skippedCount, insertErrors.Any(), duplicateCount)
259274
};
260275

261276
// Include validation errors if any (but limit them)
@@ -270,19 +285,22 @@ public async Task<ActionResult<LogResponse>> Create(
270285
}
271286

272287
// Return appropriate status code
273-
if (insertedCount == 0 && logLines.Count > 0)
288+
// Duplicates are NOT errors — they're expected in normal operation (agent retries, etc.)
289+
var realErrorCount = errors.Count; // Excludes duplicates (already filtered above)
290+
291+
if (insertedCount == 0 && realErrorCount > 0)
274292
{
275-
// No logs were inserted
293+
// No logs inserted AND there were real validation/insert errors
276294
return BadRequest(response);
277295
}
278-
else if (skippedCount > 0 || insertErrors.Any())
296+
else if (insertErrors.Any() || realErrorCount > 0)
279297
{
280-
// Partial success
281-
return StatusCode(206, response); // 206 Partial Content
298+
// Partial success with real errors (not just duplicates)
299+
return StatusCode(206, response);
282300
}
283301
else
284302
{
285-
// Complete success
303+
// Complete success, or all skipped were just duplicates
286304
return Ok(response);
287305
}
288306
}
@@ -405,14 +423,15 @@ private string SanitizeForLogging(LogLine logLine)
405423
}
406424
}
407425

408-
private string GetBatchStatus(int received, int processed, int skipped, bool hasInsertErrors)
426+
private string GetBatchStatus(int received, int processed, int skipped, bool hasInsertErrors, int duplicates = 0)
409427
{
410-
if (processed == 0 && received > 0)
428+
var realErrors = skipped - duplicates;
429+
if (processed == 0 && realErrors > 0)
411430
return "Failed";
412-
else if (processed == received && !hasInsertErrors)
413-
return "Success";
414-
else
431+
else if (hasInsertErrors || realErrors > 0)
415432
return "Partial";
433+
else
434+
return "Success";
416435
}
417436

418437
[HttpGet("stats")]

0 commit comments

Comments
 (0)