Skip to content

Commit b704f9e

Browse files
author
Timothy Dodd
committed
Enhance log processing with scalability improvements
- Added a CircuitBreaker to handle API failures and prevent excessive retries. - Implemented StateService for persistent state tracking across restarts. - Introduced FingerprintBackfillService to backfill SHA256-based fingerprints. - Added LogRetentionService for periodic cleanup of old logs and summaries. - Replaced offset-based pagination with cursor-based pagination for efficiency. - Enhanced SignalR with per-connection filtering for real-time log delivery. - Added bounded memory queue to prevent OOM during sustained load. - Enabled auto-creation of missing database indexes in the background. - Introduced SHA256-based log fingerprinting for deduplication and validation. - Added periodic cleanup of stale entries to prevent memory leaks. - Moved hard-coded values to appsettings.json for better configurability. - Wrapped hourly log summary updates in transactions for atomicity. - Added `/metrics` endpoints for monitoring API and SignalR performance. - Improved log validation to reject invalid or excessively old logs early. - Optimized log cache with fingerprint-based deduplication. - Enhanced logging and debugging for state persistence and cleanup processes. - Updated `todo.md` with completed tasks and future improvements.
1 parent ea40b20 commit b704f9e

24 files changed

+1256
-296
lines changed

.claude/settings.local.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
"Bash(timeout 30 dotnet test src/RoboDodd.OrmLite.Tests/RoboDodd.OrmLite.Tests.csproj --filter \"FullyQualifiedName=RoboDodd.OrmLite.Tests.ServiceStackMigrationIntegrationTests.ErrorHandling_WithInvalidOperations_HandlesGracefully\" --verbosity normal)",
77
"Bash(timeout:*)",
88
"Bash(npm run build:*)",
9-
"Bash(npx ng build:*)"
9+
"Bash(npx ng build:*)",
10+
"Bash(xargs ls:*)"
1011
],
1112
"deny": []
1213
}
13-
}
14+
}

src/LogMkAgent/Common/LogLineExtensions.cs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System.Collections.Concurrent;
2+
using System.Security.Cryptography;
3+
using System.Text;
24
using LogMkCommon;
35

46
namespace LogMkAgent.Common;
@@ -7,16 +9,21 @@ public static class LogLineExtensions
79
// Thread-safe sequence counters per pod
810
private static readonly ConcurrentDictionary<string, long> PodSequenceCounters = new();
911

12+
// Track last activity time per pod key for cleanup
13+
private static readonly ConcurrentDictionary<string, DateTime> PodLastSeen = new();
14+
1015
public static void AssignSequenceNumber(this LogLine logLine)
1116
{
1217
var podKey = $"{logLine.DeploymentName}:{logLine.PodName}";
1318
logLine.SequenceNumber = PodSequenceCounters.AddOrUpdate(podKey, 1, (key, current) => current + 1);
19+
PodLastSeen[podKey] = DateTime.UtcNow;
1420
}
1521

1622
// Alternative method that returns the sequence number
1723
public static long GetNextSequenceNumber(string deploymentName, string podName)
1824
{
1925
var podKey = $"{deploymentName}:{podName}";
26+
PodLastSeen[podKey] = DateTime.UtcNow;
2027
return PodSequenceCounters.AddOrUpdate(podKey, 1, (key, current) => current + 1);
2128
}
2229

@@ -32,6 +39,7 @@ public static void ResetSequenceNumber(string deploymentName, string podName)
3239
{
3340
var podKey = $"{deploymentName}:{podName}";
3441
PodSequenceCounters.TryRemove(podKey, out _);
42+
PodLastSeen.TryRemove(podKey, out _);
3543
}
3644

3745
// Get current sequence without incrementing
@@ -40,6 +48,46 @@ public static long GetCurrentSequenceNumber(string deploymentName, string podNam
4048
var podKey = $"{deploymentName}:{podName}";
4149
return PodSequenceCounters.TryGetValue(podKey, out var current) ? current : 0;
4250
}
43-
}
4451

45-
// Up
52+
/// <summary>
53+
/// Removes sequence counter entries for pods not seen within the given timespan.
54+
/// Call periodically to prevent unbounded dictionary growth.
55+
/// </summary>
56+
public static int CleanupStaleEntries(TimeSpan maxAge)
57+
{
58+
var cutoff = DateTime.UtcNow - maxAge;
59+
var removed = 0;
60+
61+
foreach (var kvp in PodLastSeen)
62+
{
63+
if (kvp.Value < cutoff)
64+
{
65+
if (PodLastSeen.TryRemove(kvp.Key, out _))
66+
{
67+
PodSequenceCounters.TryRemove(kvp.Key, out _);
68+
removed++;
69+
}
70+
}
71+
}
72+
73+
return removed;
74+
}
75+
76+
public static int TrackedPodCount => PodSequenceCounters.Count;
77+
78+
/// <summary>
79+
/// Computes a content-based SHA256 fingerprint of the log line content.
80+
/// Truncated to 16 hex characters for compactness.
81+
/// </summary>
82+
public static void AssignFingerprint(this LogLine logLine)
83+
{
84+
if (string.IsNullOrEmpty(logLine.Line))
85+
{
86+
logLine.Fingerprint = string.Empty;
87+
return;
88+
}
89+
90+
var hashBytes = SHA256.HashData(Encoding.UTF8.GetBytes(logLine.Line));
91+
logLine.Fingerprint = Convert.ToHexString(hashBytes, 0, 8).ToLowerInvariant();
92+
}
93+
}

src/LogMkAgent/Program.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ public static async Task Main(string[] args)
2929
// Configure strongly typed settings
3030
services.Configure<LogWatcherOptions>(configuration.GetSection("LogWatcherOptions"));
3131
services.Configure<ApiSettings>(configuration.GetSection("LoggingApi"));
32+
services.Configure<BatchingOptions>(configuration.GetSection("BatchingOptions"));
3233

3334
// Register services
35+
services.AddSingleton<StateService>();
3436
services.AddSingleton<BatchingService>();
3537
services.AddSingleton<HttpLogger>();
3638
services.AddSingleton<SettingsService>();

src/LogMkAgent/Services/BatchService.cs

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,21 @@ public class BatchingService : IDisposable
2828
TimeSpan.FromSeconds(15)
2929
};
3030

31+
// Circuit breaker
32+
private readonly CircuitBreaker _circuitBreaker;
33+
34+
// Rate limiting
35+
private readonly int _maxBatchesPerMinute;
36+
private readonly Queue<DateTime> _batchSendTimestamps = new();
37+
private readonly object _rateLimitLock = new();
38+
3139
// Metrics for monitoring
3240
private long _totalItemsProcessed;
3341
private long _totalBatchesSent;
3442
private long _totalFailures;
3543
private long _totalValidationFailures;
44+
private long _totalRateLimitDelays;
45+
private long _totalDropped;
3646
private DateTime _lastSuccessfulSend = DateTime.UtcNow;
3747

3848
// Debounce configuration
@@ -53,6 +63,11 @@ public BatchingService(
5363
_options = batchingOptions?.Value ?? throw new ArgumentNullException(nameof(batchingOptions));
5464
_settingsService = settingsService ?? throw new ArgumentNullException(nameof(settingsService));
5565

66+
_maxBatchesPerMinute = _options.MaxBatchesPerMinute;
67+
_circuitBreaker = new CircuitBreaker(logger,
68+
failureThreshold: _options.CircuitBreakerFailureThreshold,
69+
cooldownPeriod: _options.CircuitBreakerCooldown);
70+
5671
ValidateOptions();
5772

5873
// Configure debounce delay from options if available, otherwise use defaults
@@ -62,8 +77,8 @@ public BatchingService(
6277
_maxWaitTime = _options.BatchInterval;
6378
}
6479

65-
_logger.LogInformation("BatchingService initialized with debounce: {DebounceDelay}ms, max wait: {MaxWait}s, max size: {MaxSize}, timeout: {Timeout}",
66-
_debounceDelay.TotalMilliseconds, _maxWaitTime.TotalSeconds, _options.MaxBatchSize, _options.SendTimeout);
80+
_logger.LogInformation("BatchingService initialized with debounce: {DebounceDelay}ms, max wait: {MaxWait}s, max size: {MaxSize}, timeout: {Timeout}, rate limit: {RateLimit}/min, max queue: {MaxQueue}, max retry queue: {MaxRetryQueue}",
81+
_debounceDelay.TotalMilliseconds, _maxWaitTime.TotalSeconds, _options.MaxBatchSize, _options.SendTimeout, _maxBatchesPerMinute, _options.MaxQueueSize, _options.MaxRetryQueueSize);
6782
}
6883

6984
private void ValidateOptions()
@@ -92,6 +107,18 @@ public void AddData(LogLine data)
92107
return;
93108
}
94109

110+
// Enforce queue size limit - drop oldest items if at capacity
111+
if (_options.MaxQueueSize > 0 && _batchData.Count >= _options.MaxQueueSize)
112+
{
113+
var dropped = 0;
114+
while (_batchData.Count >= _options.MaxQueueSize && _batchData.TryDequeue(out _))
115+
{
116+
dropped++;
117+
}
118+
Interlocked.Add(ref _totalDropped, dropped);
119+
_logger.LogWarning("Queue at capacity ({MaxSize}), dropped {Dropped} oldest items", _options.MaxQueueSize, dropped);
120+
}
121+
95122
_batchData.Enqueue(data);
96123

97124
// Track when first log was added if this is the first in a batch
@@ -232,11 +259,48 @@ private async Task ProcessRetryQueueAsync()
232259
}
233260
}
234261

262+
private bool IsRateLimited()
263+
{
264+
lock (_rateLimitLock)
265+
{
266+
var now = DateTime.UtcNow;
267+
var windowStart = now.AddMinutes(-1);
268+
269+
// Remove timestamps outside the sliding window
270+
while (_batchSendTimestamps.Count > 0 && _batchSendTimestamps.Peek() < windowStart)
271+
{
272+
_batchSendTimestamps.Dequeue();
273+
}
274+
275+
if (_batchSendTimestamps.Count >= _maxBatchesPerMinute)
276+
{
277+
return true;
278+
}
279+
280+
_batchSendTimestamps.Enqueue(now);
281+
return false;
282+
}
283+
}
284+
235285
private async Task ProcessNewBatchAsync()
236286
{
237287
if (_batchData.IsEmpty)
238288
return;
239289

290+
// Check rate limit before extracting the batch
291+
if (IsRateLimited())
292+
{
293+
Interlocked.Increment(ref _totalRateLimitDelays);
294+
_logger.LogWarning("Rate limit reached ({MaxBatchesPerMinute}/min), delaying batch send", _maxBatchesPerMinute);
295+
// Re-trigger after a short delay instead of dropping
296+
_ = Task.Run(async () =>
297+
{
298+
await Task.Delay(TimeSpan.FromSeconds(2), _cancellationTokenSource.Token).ConfigureAwait(false);
299+
ResetDebounceTimer();
300+
}, _cancellationTokenSource.Token);
301+
return;
302+
}
303+
240304
var currentBatch = ExtractBatch();
241305
if (currentBatch.Count == 0)
242306
return;
@@ -259,9 +323,22 @@ private async Task ProcessNewBatchAsync()
259323

260324
if (!success)
261325
{
262-
// Add to retry queue
326+
// Add to retry queue with size enforcement
263327
lock (_retryLock)
264328
{
329+
if (_options.MaxRetryQueueSize > 0)
330+
{
331+
var retryItemCount = _retryQueue.Sum(r => r.Data.Count);
332+
if (retryItemCount + currentBatch.Count > _options.MaxRetryQueueSize)
333+
{
334+
var dropped = currentBatch.Count;
335+
Interlocked.Add(ref _totalDropped, dropped);
336+
_logger.LogWarning("Retry queue at capacity ({MaxSize} items), dropping batch of {Count} logs",
337+
_options.MaxRetryQueueSize, dropped);
338+
return;
339+
}
340+
}
341+
265342
var retryItem = new BatchItem
266343
{
267344
Data = currentBatch,
@@ -329,6 +406,13 @@ private async Task<bool> SendBatchWithRetryAsync(List<LogLine> batch, int attemp
329406
if (batch.Count == 0)
330407
return true;
331408

409+
// Check circuit breaker before attempting to send
410+
if (!_circuitBreaker.AllowRequest())
411+
{
412+
_logger.LogDebug("Circuit breaker is open, skipping batch send of {Count} items", batch.Count);
413+
return false;
414+
}
415+
332416
try
333417
{
334418
_logger.LogDebug("Sending batch of {Count} log lines (attempt {Attempt})", batch.Count, attemptNumber);
@@ -341,6 +425,7 @@ private async Task<bool> SendBatchWithRetryAsync(List<LogLine> batch, int attemp
341425

342426
if (response.IsSuccessStatusCode)
343427
{
428+
_circuitBreaker.RecordSuccess();
344429
Interlocked.Add(ref _totalItemsProcessed, batch.Count);
345430
Interlocked.Increment(ref _totalBatchesSent);
346431
_lastSuccessfulSend = DateTime.UtcNow;
@@ -350,13 +435,15 @@ private async Task<bool> SendBatchWithRetryAsync(List<LogLine> batch, int attemp
350435
}
351436
else
352437
{
438+
_circuitBreaker.RecordFailure();
353439
_logger.LogWarning("HTTP error sending batch: {StatusCode} {ReasonPhrase}",
354440
response.StatusCode, response.ReasonPhrase);
355441
return false;
356442
}
357443
}
358444
catch (TaskCanceledException ex)
359445
{
446+
_circuitBreaker.RecordFailure();
360447
_logger.LogWarning(ex, "Request cancelled sending batch (attempt {Attempt})", attemptNumber);
361448
return false;
362449
}
@@ -367,17 +454,19 @@ private async Task<bool> SendBatchWithRetryAsync(List<LogLine> batch, int attemp
367454
}
368455
catch (OperationCanceledException)
369456
{
457+
_circuitBreaker.RecordFailure();
370458
_logger.LogWarning("Batch send timed out after {Timeout}", _options.SendTimeout);
371459
return false;
372460
}
373461
catch (HttpRequestException ex)
374462
{
463+
_circuitBreaker.RecordFailure();
375464
_logger.LogWarning(ex, "Network error sending batch (attempt {Attempt})", attemptNumber);
376465
return false;
377466
}
378-
379467
catch (Exception ex)
380468
{
469+
_circuitBreaker.RecordFailure();
381470
_logger.LogError(ex, "Unexpected error sending batch (attempt {Attempt})", attemptNumber);
382471
return false;
383472
}
@@ -432,8 +521,12 @@ public BatchingServiceStats GetStats()
432521
TotalBatchesSent = _totalBatchesSent,
433522
TotalFailures = _totalFailures,
434523
TotalValidationFailures = _totalValidationFailures,
524+
TotalRateLimitDelays = _totalRateLimitDelays,
525+
TotalDropped = _totalDropped,
435526
PendingRetries = _retryQueue.Count,
436-
LastSuccessfulSend = _lastSuccessfulSend
527+
LastSuccessfulSend = _lastSuccessfulSend,
528+
CircuitBreakerState = _circuitBreaker.State.ToString(),
529+
CircuitBreakerFailures = _circuitBreaker.ConsecutiveFailures
437530
};
438531
}
439532
}
@@ -484,6 +577,11 @@ public class BatchingOptions
484577
public TimeSpan BatchInterval { get; set; } = TimeSpan.FromSeconds(2); // Now acts as max wait time
485578
public int MaxBatchSize { get; set; } = 100;
486579
public TimeSpan SendTimeout { get; set; } = TimeSpan.FromSeconds(30);
580+
public int MaxBatchesPerMinute { get; set; } = 60;
581+
public int MaxQueueSize { get; set; } = 10000;
582+
public int MaxRetryQueueSize { get; set; } = 5000;
583+
public int CircuitBreakerFailureThreshold { get; set; } = 5;
584+
public TimeSpan CircuitBreakerCooldown { get; set; } = TimeSpan.FromSeconds(30);
487585
}
488586

489587
public class BatchingServiceStats
@@ -493,8 +591,12 @@ public class BatchingServiceStats
493591
public long TotalBatchesSent { get; set; }
494592
public long TotalFailures { get; set; }
495593
public long TotalValidationFailures { get; set; }
594+
public long TotalRateLimitDelays { get; set; }
595+
public long TotalDropped { get; set; }
496596
public int PendingRetries { get; set; }
497597
public DateTime LastSuccessfulSend { get; set; }
598+
public string CircuitBreakerState { get; set; } = string.Empty;
599+
public int CircuitBreakerFailures { get; set; }
498600
}
499601

500602
public class ApiSettings

0 commit comments

Comments
 (0)