@@ -23,21 +23,41 @@ public class LogController : ControllerBase
2323 private readonly LogSummaryRepo _logSummaryRepo ;
2424 private readonly WorkQueueRepo _workQueueRepo ;
2525 private readonly LogApiMetrics _metrics ;
26- private readonly int LogMaxDaysOld = 30 ; // Maximum age of logs to accept
27- public LogController ( ILogger < LogController > logger , LogRepo logRepo , LogHubService logHubService , LogSummaryRepo logSummaryRepo , WorkQueueRepo workQueueRepo , LogApiMetrics metrics , IConfiguration configuration )
26+ private readonly LogCacheService _cacheService ;
27+
28+ // Configuration: Maximum age limits
29+ private readonly int LogMaxDaysOld = 30 ; // For new pods (backfill scenario)
30+ private readonly int LogMaxMinutesOldForExistingPods = 5 ; // For existing pods (real-time scenario)
31+ private readonly bool EnableDuplicateDetection = true ;
32+
33+ public LogController ( ILogger < LogController > logger , LogRepo logRepo , LogHubService logHubService ,
34+ LogSummaryRepo logSummaryRepo , WorkQueueRepo workQueueRepo , LogApiMetrics metrics ,
35+ LogCacheService cacheService , IConfiguration configuration )
2836 {
2937 _logger = logger ;
3038 _logRepo = logRepo ;
3139 _logHubService = logHubService ;
3240 _logSummaryRepo = logSummaryRepo ;
3341 _workQueueRepo = workQueueRepo ;
3442 _metrics = metrics ;
43+ _cacheService = cacheService ;
44+
3545 if ( configuration != null )
3646 {
3747 if ( int . TryParse ( configuration [ "LogSettings:MaxDaysOld" ] , out var maxDays ) )
3848 {
3949 LogMaxDaysOld = maxDays ;
4050 }
51+
52+ if ( int . TryParse ( configuration [ "LogSettings:MaxMinutesOldForExistingPods" ] , out var maxMinutes ) )
53+ {
54+ LogMaxMinutesOldForExistingPods = maxMinutes ;
55+ }
56+
57+ if ( bool . TryParse ( configuration [ "LogSettings:EnableDuplicateDetection" ] , out var enableDuplicates ) )
58+ {
59+ EnableDuplicateDetection = enableDuplicates ;
60+ }
4161 }
4262 }
4363 [ AllowAnonymous ]
@@ -78,13 +98,16 @@ public async Task<ActionResult<LogResponse>> Create(
7898 _logger . LogDebug ( "Processing batch {BatchId} with {Count} log lines" , batchId , logLines . Count ) ;
7999 _metrics . IncrementLogsReceived ( logLines . Count ) ;
80100
101+ // Track pods for cache invalidation (new pods that were just inserted)
102+ var newPods = new HashSet < string > ( ) ;
103+
81104 // Process each log line individually - don't let errors stop the batch
82105 for ( int i = 0 ; i < logLines . Count ; i ++ )
83106 {
84107 try
85108 {
86109 var logLine = logLines [ i ] ;
87- var validationResult = ValidateLogLine ( logLine , i ) ;
110+ var validationResult = await ValidateLogLineAsync ( logLine , i ) ;
88111
89112 if ( ! validationResult . IsValid )
90113 {
@@ -98,6 +121,13 @@ public async Task<ActionResult<LogResponse>> Create(
98121 continue ;
99122 }
100123
124+ // Track if this is a new pod
125+ var podExists = await _cacheService . PodExistsAsync ( logLine . PodName ) ;
126+ if ( ! podExists )
127+ {
128+ newPods . Add ( logLine . PodName ) ;
129+ }
130+
101131 // Convert to database entity
102132 var logEntity = new Log
103133 {
@@ -141,6 +171,25 @@ public async Task<ActionResult<LogResponse>> Create(
141171
142172 _logger . LogDebug ( "Successfully inserted {InsertedCount}/{ValidCount} logs for batch {BatchId}" ,
143173 insertedCount , validLogs . Count , batchId ) ;
174+
175+ // Update cache for successfully inserted logs
176+ if ( insertedCount > 0 )
177+ {
178+ // Invalidate pod existence cache for new pods and start backfill tracking
179+ foreach ( var newPod in newPods )
180+ {
181+ _cacheService . InvalidatePodExistence ( newPod ) ;
182+ _cacheService . StartBackfillTracking ( newPod ) ;
183+ _logger . LogInformation ( "Started backfill tracking for new pod: {PodName}" , newPod ) ;
184+ }
185+
186+ // Update recent logs cache for all pods in this batch
187+ var podGroups = validLogs . GroupBy ( log => log . Pod ) ;
188+ foreach ( var group in podGroups )
189+ {
190+ _cacheService . UpdateRecentLogsCache ( group . Key , group ) ;
191+ }
192+ }
144193 }
145194 catch ( Exception ex )
146195 {
@@ -150,26 +199,23 @@ public async Task<ActionResult<LogResponse>> Create(
150199 }
151200 }
152201
153- // Send to real-time hub immediately - don't let this fail the API call
202+ // Send to real-time hub
154203 if ( insertedCount > 0 )
155204 {
156- _ = Task . Run ( async ( ) =>
205+ try
157206 {
158- try
159- {
160- // Send all inserted logs for real-time delivery (limit to 500 for safety)
161- var logsToSend = validLogs . Take ( 500 ) . ToList ( ) ;
207+ // Send all inserted logs for real-time delivery (limit to 500 for safety)
208+ var logsToSend = validLogs . Take ( 500 ) . ToList ( ) ;
162209
163- if ( logsToSend . Any ( ) )
164- {
165- await _logHubService . SendLogs ( logsToSend ) ;
166- }
167- }
168- catch ( Exception ex )
210+ if ( logsToSend . Any ( ) )
169211 {
170- _logger . LogWarning ( ex , "Failed to send logs to hub for batch {BatchId}" , batchId ) ;
212+ await _logHubService . SendLogs ( logsToSend ) ;
171213 }
172- } , cancellationToken ) ;
214+ }
215+ catch ( Exception ex )
216+ {
217+ _logger . LogWarning ( ex , "Failed to send logs to hub for batch {BatchId}" , batchId ) ;
218+ }
173219 }
174220
175221 // Log validation errors for monitoring
@@ -270,7 +316,7 @@ private async Task<int> InsertLogsResilientlyAsync(List<Log> logs, string batchI
270316 }
271317 }
272318
273- private LogValidationResult ValidateLogLine ( LogLine logLine , int index )
319+ private async Task < LogValidationResult > ValidateLogLineAsync ( LogLine logLine , int index )
274320 {
275321 var errors = new List < string > ( ) ;
276322
@@ -294,12 +340,47 @@ private LogValidationResult ValidateLogLine(LogLine logLine, int index)
294340 // Additional validations
295341 if ( logLine != null )
296342 {
343+ // Future timestamp check
297344 if ( logLine . TimeStamp > DateTimeOffset . UtcNow . AddMinutes ( 5 ) )
298345 errors . Add ( "TimeStamp is too far in the future" ) ;
299346
300- if ( logLine . TimeStamp < DateTimeOffset . UtcNow . AddDays ( - LogMaxDaysOld ) )
301- errors . Add ( "TimeStamp is too old (>30 days)" ) ;
347+ // Conditional timestamp validation based on pod existence and backfill period
348+ var podExists = await _cacheService . PodExistsAsync ( logLine . PodName ) ;
349+ var isInBackfillPeriod = podExists && _cacheService . IsInBackfillPeriod ( logLine . PodName ) ;
302350
351+ if ( ! podExists || isInBackfillPeriod )
352+ {
353+ // New pod OR in backfill grace period: allow backfill (30 days)
354+ if ( logLine . TimeStamp < DateTimeOffset . UtcNow . AddDays ( - LogMaxDaysOld ) )
355+ {
356+ errors . Add ( $ "TimeStamp is too old (>{ LogMaxDaysOld } days)") ;
357+ }
358+ }
359+ else
360+ {
361+ // Existing pod outside backfill window: strict real-time validation (5 minutes)
362+ if ( logLine . TimeStamp < DateTimeOffset . UtcNow . AddMinutes ( - LogMaxMinutesOldForExistingPods ) )
363+ {
364+ errors . Add ( $ "TimeStamp is too old (>{ LogMaxMinutesOldForExistingPods } minutes) for existing pod") ;
365+ }
366+ }
367+
368+ // Duplicate detection (if enabled)
369+ if ( EnableDuplicateDetection && podExists )
370+ {
371+ var isDuplicate = await _cacheService . IsDuplicateLogAsync (
372+ logLine . PodName ,
373+ logLine . TimeStamp ,
374+ logLine . SequenceNumber ,
375+ logLine . Line ) ;
376+
377+ if ( isDuplicate )
378+ {
379+ errors . Add ( "Duplicate log entry detected" ) ;
380+ }
381+ }
382+
383+ // Length validations
303384 if ( logLine . Line ? . Length > 10000 )
304385 errors . Add ( "Line content too long (max 10,000 characters)" ) ;
305386
@@ -405,6 +486,7 @@ public IActionResult GetValidationSettings()
405486 var settings = new LogMkCommon . ValidationSettings
406487 {
407488 MaxDaysOld = LogMaxDaysOld ,
489+ MaxMinutesOldForExistingPods = LogMaxMinutesOldForExistingPods ,
408490 MaxFutureMinutes = 5 ,
409491 MaxLineLength = 10000 ,
410492 MaxDeploymentNameLength = 100 ,
@@ -413,7 +495,8 @@ public IActionResult GetValidationSettings()
413495 PodNamePattern = @"^[a-zA-Z0-9\-._]+$" ,
414496 AllowEmptyLogLevel = false ,
415497 MaxBatchSize = 1000 ,
416- Version = "1.0" ,
498+ EnableDuplicateDetection = EnableDuplicateDetection ,
499+ Version = "1.1" ,
417500 LastUpdated = DateTime . UtcNow
418501 } ;
419502
@@ -527,17 +610,14 @@ public async Task<ActionResult<LogResponse>> CreateSingle(
527610 _logger . LogDebug ( "Successfully inserted single log for Pod: {Pod}" , logEntry . Pod ) ;
528611
529612 // Send to real-time hub
530- _ = Task . Run ( async ( ) =>
613+ try
531614 {
532- try
533- {
534- await _logHubService . SendLogs ( new List < Log > { logEntity } ) ;
535- }
536- catch ( Exception ex )
537- {
538- _logger . LogWarning ( ex , "Failed to send single log to hub" ) ;
539- }
540- } , cancellationToken ) ;
615+ await _logHubService . SendLogs ( new List < Log > { logEntity } ) ;
616+ }
617+ catch ( Exception ex )
618+ {
619+ _logger . LogWarning ( ex , "Failed to send single log to hub" ) ;
620+ }
541621
542622 var response = new LogResponse
543623 {
0 commit comments