11using System . Collections . Concurrent ;
22using LogMkCommon ;
33using Microsoft . Extensions . Options ;
4+ using LogMkAgent . Services ;
45
56namespace LogMkAgent . Services ;
67
@@ -10,6 +11,7 @@ public class BatchingService : IDisposable
1011 private readonly LogApiClient _httpClient ;
1112 private readonly ILogger < BatchingService > _logger ;
1213 private readonly BatchingOptions _options ;
14+ private readonly SettingsService _settingsService ;
1315 private readonly Timer _timer ;
1416 private readonly SemaphoreSlim _sendSemaphore = new ( 1 , 1 ) ;
1517 private readonly CancellationTokenSource _cancellationTokenSource = new ( ) ;
@@ -29,18 +31,21 @@ public class BatchingService : IDisposable
2931 private long _totalItemsProcessed ;
3032 private long _totalBatchesSent ;
3133 private long _totalFailures ;
34+ private long _totalValidationFailures ;
3235 private DateTime _lastSuccessfulSend = DateTime . UtcNow ;
3336
3437 private volatile bool _disposed ;
3538
3639 public BatchingService (
3740 IOptions < BatchingOptions > batchingOptions ,
3841 LogApiClient client ,
39- ILogger < BatchingService > logger )
42+ ILogger < BatchingService > logger ,
43+ SettingsService settingsService )
4044 {
4145 _httpClient = client ?? throw new ArgumentNullException ( nameof ( client ) ) ;
4246 _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
4347 _options = batchingOptions ? . Value ?? throw new ArgumentNullException ( nameof ( batchingOptions ) ) ;
48+ _settingsService = settingsService ?? throw new ArgumentNullException ( nameof ( settingsService ) ) ;
4449
4550 ValidateOptions ( ) ;
4651
@@ -172,7 +177,21 @@ private async Task ProcessNewBatchAsync()
172177 if ( currentBatch . Count == 0 )
173178 return ;
174179
175- var success = await SendBatchWithRetryAsync ( currentBatch , 1 ) . ConfigureAwait ( false ) ;
180+ // Pre-validate the batch before sending
181+ var validatedBatch = await ValidateBatchAsync ( currentBatch ) . ConfigureAwait ( false ) ;
182+ if ( validatedBatch . Count == 0 )
183+ {
184+ _logger . LogWarning ( "Entire batch of {Count} logs failed validation, skipping send" , currentBatch . Count ) ;
185+ return ;
186+ }
187+
188+ if ( validatedBatch . Count < currentBatch . Count )
189+ {
190+ _logger . LogInformation ( "Pre-validation filtered {Filtered} invalid logs from batch of {Total}. Sending {Valid} valid logs." ,
191+ currentBatch . Count - validatedBatch . Count , currentBatch . Count , validatedBatch . Count ) ;
192+ }
193+
194+ var success = await SendBatchWithRetryAsync ( validatedBatch , 1 ) . ConfigureAwait ( false ) ;
176195
177196 if ( ! success )
178197 {
@@ -203,6 +222,44 @@ private List<LogLine> ExtractBatch()
203222 return batch ;
204223 }
205224
225+ private async Task < List < LogLine > > ValidateBatchAsync ( List < LogLine > batch )
226+ {
227+ try
228+ {
229+ var validator = await _settingsService . GetValidatorAsync ( ) . ConfigureAwait ( false ) ;
230+ if ( validator == null )
231+ {
232+ _logger . LogWarning ( "Failed to get validator from settings service, sending batch without pre-validation" ) ;
233+ return batch ;
234+ }
235+
236+ var validationResult = validator . ValidateBatch ( batch ) ;
237+
238+ if ( validationResult . InvalidCount > 0 )
239+ {
240+ Interlocked . Add ( ref _totalValidationFailures , validationResult . InvalidCount ) ;
241+
242+ // Log the first few validation errors for diagnostics
243+ var errorSummary = validationResult . ValidationResults
244+ . SelectMany ( r => r . Errors )
245+ . GroupBy ( error => error )
246+ . ToDictionary ( g => g . Key , g => g . Count ( ) ) ;
247+
248+ _logger . LogWarning ( "Pre-validation failed for {InvalidCount}/{TotalCount} logs. " +
249+ "Error breakdown: {ErrorSummary}" ,
250+ validationResult . InvalidCount , validationResult . TotalCount ,
251+ string . Join ( ", " , errorSummary . Select ( kvp => $ "{ kvp . Key } : { kvp . Value } ") ) ) ;
252+ }
253+
254+ return validationResult . ValidLogs ;
255+ }
256+ catch ( Exception ex )
257+ {
258+ _logger . LogError ( ex , "Error during batch validation, sending batch without pre-validation" ) ;
259+ return batch ;
260+ }
261+ }
262+
206263 private async Task < bool > SendBatchWithRetryAsync ( List < LogLine > batch , int attemptNumber )
207264 {
208265 if ( batch . Count == 0 )
@@ -306,6 +363,7 @@ public BatchingServiceStats GetStats()
306363 TotalItemsProcessed = _totalItemsProcessed ,
307364 TotalBatchesSent = _totalBatchesSent ,
308365 TotalFailures = _totalFailures ,
366+ TotalValidationFailures = _totalValidationFailures ,
309367 PendingRetries = _retryQueue . Count ,
310368 LastSuccessfulSend = _lastSuccessfulSend
311369 } ;
@@ -363,6 +421,7 @@ public class BatchingServiceStats
363421 public long TotalItemsProcessed { get ; set ; }
364422 public long TotalBatchesSent { get ; set ; }
365423 public long TotalFailures { get ; set ; }
424+ public long TotalValidationFailures { get ; set ; }
366425 public int PendingRetries { get ; set ; }
367426 public DateTime LastSuccessfulSend { get ; set ; }
368427}
0 commit comments