22using System . Data ;
33using MailKit ;
44using MailKit . Net . Imap ;
5+ using MailKit . Search ;
56using MailKit . Security ;
67using MimeKit ;
78using ServiceStack . OrmLite ;
@@ -22,9 +23,11 @@ public class EmailMonitoringService : BackgroundService
2223 private const int ReconnectDelayMs = 600000 ;
2324 private const int IdleRetryDelayMs = 5000 ;
2425 private const int IdleTimeoutMinutes = 9 ;
26+ private const int HourlyReprocessIntervalMs = 7200000 ; // 2 hour in milliseconds
2527 private readonly IEmailMover _emailMover ;
2628 private CancellationToken _serviceCancellationToken ;
2729 private readonly ConcurrentQueue < EmailMoveOperation > _moveQueue = new ( ) ;
30+ private DateTime _lastFullReprocess = DateTime . MinValue ;
2831
2932
3033 public EmailMonitoringService (
@@ -79,7 +82,8 @@ private async Task ConnectAndMonitorAsync(CancellationToken cancellationToken)
7982 await inbox . OpenAsync ( FolderAccess . ReadOnly , cancellationToken ) ;
8083
8184 _logger . LogInformation ( "✅ Connected successfully. Processing existing emails..." ) ;
82- await ProcessExistingEmailsAsync ( cancellationToken ) ;
85+ await ProcessMails ( false , cancellationToken ) ;
86+ _lastFullReprocess = DateTime . UtcNow ; // Track when we last did a full reprocess
8387
8488 // Subscribe to events
8589 inbox . CountChanged += OnCountChanged ;
@@ -108,7 +112,7 @@ private async Task ConnectToServerAsync(ImapClient client, CancellationToken can
108112 await client . AuthenticateAsync ( _config . Username , _config . Password , cancellationToken ) ;
109113 }
110114
111- private async Task ProcessExistingEmailsAsync ( CancellationToken cancellationToken )
115+ private async Task ProcessMails ( bool lastSeenOnly , CancellationToken cancellationToken )
112116 {
113117 if ( _client ? . Inbox == null )
114118 return ;
@@ -117,16 +121,32 @@ private async Task ProcessExistingEmailsAsync(CancellationToken cancellationToke
117121 _logger . LogInformation ( "📬 Processing {TotalEmails} existing emails..." , totalEmails ) ;
118122
119123
124+ IList < IMessageSummary > ? fetchedMessages = null ;
125+ var flags = MessageSummaryItems . Envelope | MessageSummaryItems . UniqueId | MessageSummaryItems . BodyStructure | MessageSummaryItems . Full ;
120126 var emailsToProcess = new List < EmailReceivedEventArgs > ( ) ;
121-
122- for ( int i = 0 ; i < totalEmails ; i ++ )
127+ if ( lastSeenOnly )
128+ {
129+ var items = await _client . Inbox . SearchAsync ( SearchQuery . NotSeen , cancellationToken ) ;
130+ fetchedMessages = await _client . Inbox . FetchAsync ( items , flags , cancellationToken ) ;
131+ }
132+ else
133+ {
134+ fetchedMessages = await _client . Inbox . FetchAsync ( 0 , totalEmails - 1 , flags , cancellationToken ) ;
135+ }
136+ if ( fetchedMessages == null || fetchedMessages . Count == 0 )
137+ {
138+ _logger . LogInformation ( "No emails to process in the inbox." ) ;
139+ return ;
140+ }
141+ for ( int i = 0 ; i < fetchedMessages . Count ; i ++ )
123142 {
124143 if ( cancellationToken . IsCancellationRequested )
125144 break ;
126145
127146 try
128147 {
129- ProcessSingleEmail ( _client , _client . Inbox , i , isExisting : true , emailsToProcess ) ;
148+ var message = fetchedMessages [ i ] ;
149+ await ProcessSingleEmail ( message , _client . Inbox , emailsToProcess ) ;
130150 }
131151 catch ( Exception ex )
132152 {
@@ -140,6 +160,7 @@ private async Task ProcessExistingEmailsAsync(CancellationToken cancellationToke
140160 // Small delay after processing existing emails
141161 await Task . Delay ( 50 , cancellationToken ) ;
142162 }
163+
143164 private async Task ProcessBatchAsync ( List < EmailReceivedEventArgs > emailsToProcess )
144165 {
145166 if ( ! emailsToProcess . Any ( ) )
@@ -159,7 +180,7 @@ private async Task ProcessBatchAsync(List<EmailReceivedEventArgs> emailsToProces
159180 var emailsMoved = 0 ;
160181 if ( triggers . Any ( ) )
161182 {
162- var ops = await _emailMover . ExecuteTriggersAsync ( triggers ) ;
183+ var ops = _emailMover . ExecuteTriggersAsync ( triggers ) ;
163184 _logger . LogInformation ( "📦 Executed {OperationCount} email move operations" , ops . Count ) ;
164185 // Count how many emails were moved
165186 foreach ( var op in ops )
@@ -182,54 +203,102 @@ private async Task ProcessBatchAsync(List<EmailReceivedEventArgs> emailsToProces
182203 batchEventArgs . EmailsProcessed , batchEventArgs . RulesMatched , batchEventArgs . EmailsMoved , batchEventArgs . ProcessingTime . TotalMilliseconds ) ;
183204 }
184205
185- private void ProcessSingleEmail ( ImapClient client , IMailFolder folder , int index , bool isExisting , List < EmailReceivedEventArgs > processList )
206+ private async Task ProcessSingleEmail ( IMessageSummary message , IMailFolder folder , List < EmailReceivedEventArgs > processList )
186207 {
187- var message = folder . GetMessage ( index ) ;
188208
189- var senderNames = string . Join ( ";" , message . From . Select ( x => x . Name ?? string . Empty ) ) ;
190- var senderAddresses = string . Join ( ";" , message . From . OfType < MailboxAddress > ( ) . Select ( x => x . Address ) ) ;
209+ string senderName = message . Envelope . From . FirstOrDefault ( ) ? . Name ?? "" ;
210+ string senderEmail = message . Envelope . From . FirstOrDefault ( ) ? . ToString ( ) ?? "" ;
211+
212+ var senderNames = string . Join ( ";" , message . Envelope . From . Select ( x => x . Name ?? string . Empty ) ) ;
213+ var senderAddresses = string . Join ( ";" , message . Envelope . From . OfType < MailboxAddress > ( ) . Select ( x => x . Address ) ) ;
214+
215+ TextPart ? bodyPart = null ;
216+ if ( message . HtmlBody != null )
217+ {
218+ bodyPart = await folder . GetBodyPartAsync ( message . UniqueId , message . HtmlBody ) as TextPart ;
219+ }
220+ else if ( message . TextBody != null )
221+ {
222+ bodyPart = await folder . GetBodyPartAsync ( message . UniqueId , message . TextBody ) as TextPart ;
223+ }
191224
192225 var emailArgs = new EmailReceivedEventArgs
193226 {
194- From = message . From . ToString ( ) ,
227+ From = message . Envelope . From . FirstOrDefault ( ) ? . ToString ( ) ?? string . Empty ,
195228 SenderName = senderNames ,
196229 SenderAddress = senderAddresses ,
197- Subject = message . Subject ?? string . Empty ,
198- Body = message . TextBody ?? message . HtmlBody ?? string . Empty ,
199- IsExisting = isExisting ,
200- MessageIndex = index ,
230+ Subject = message . Envelope . Subject ?? string . Empty ,
231+ Body = bodyPart != null ? bodyPart . Text : "" ,
232+
201233 ReceivedDate = message . Date . DateTime ,
202- Folder = folder . Name
234+ Folder = folder . Name ,
235+ IsImportant = message . Flags . HasValue && message . Flags . Value . HasFlag ( MessageFlags . Flagged ) ,
236+ IsRead = message . Flags . HasValue && message . Flags . Value . HasFlag ( MessageFlags . Seen ) ,
237+ UniqueId = message . UniqueId
203238 } ;
239+ if ( emailArgs . IsImportant )
240+ {
241+ // log import emails and skip them
242+ _logger . LogInformation ( "⭐ Important email detected: {Subject} from {Sender} at {Date}" ,
243+ emailArgs . Subject , emailArgs . SenderName , emailArgs . ReceivedDate ) ;
244+ return ; // Skip processing important emails for now
245+ }
204246 processList . Add ( emailArgs ) ;
205- ;
247+
206248 }
249+
207250 private async Task MonitorForNewEmailsAsync ( CancellationToken stoppingToken )
208251 {
209252 while ( ! stoppingToken . IsCancellationRequested && _client ? . IsConnected == true )
210253 {
211254 try
212255 {
256+ // Check if it's time for hourly full reprocessing
257+ var timeSinceLastReprocess = DateTime . UtcNow - _lastFullReprocess ;
258+ if ( timeSinceLastReprocess . TotalMilliseconds >= HourlyReprocessIntervalMs )
259+ {
260+ _logger . LogInformation ( "⏰ Hourly reprocessing time reached. Exiting IDLE to reprocess all emails..." ) ;
261+ await ProcessMails ( false , stoppingToken ) ;
262+ _lastFullReprocess = DateTime . UtcNow ;
263+ continue ; // Skip the IDLE cycle and immediately check again
264+ }
265+
213266 // Process any queued move operations
214267 if ( _moveQueue . Count > 0 )
215268 {
216269 await ProcessQueuedMoveOperationsAsync ( ) ;
217270 }
218271
272+ // Calculate remaining time until next full reprocess
273+ var remainingTime = TimeSpan . FromMilliseconds ( HourlyReprocessIntervalMs ) - timeSinceLastReprocess ;
274+ var idleTimeout = TimeSpan . FromMinutes ( IdleTimeoutMinutes ) ;
275+
276+ // Use the shorter of the two timeouts
277+ var actualTimeout = remainingTime < idleTimeout ? remainingTime : idleTimeout ;
278+
279+ // Don't IDLE if the timeout would be very short
280+ if ( actualTimeout . TotalSeconds < 30 )
281+ {
282+ await Task . Delay ( 1000 , stoppingToken ) ; // Brief delay before checking again
283+ continue ;
284+ }
285+
219286 // Use a fresh CancellationTokenSource for each IDLE cycle
220- _idleDoneSource = new CancellationTokenSource ( TimeSpan . FromMinutes ( IdleTimeoutMinutes ) ) ;
287+ _idleDoneSource = new CancellationTokenSource ( actualTimeout ) ;
221288
222289 try
223290 {
224291 // Enter IDLE mode – will block until done token is canceled or timeout
225292 if ( _client . Capabilities . HasFlag ( ImapCapabilities . Idle ) )
226293 {
294+ _logger . LogDebug ( "📱 Entering IDLE mode for {Timeout} (next full reprocess in {NextReprocess})" ,
295+ actualTimeout , remainingTime ) ;
227296 await _client . IdleAsync ( _idleDoneSource . Token , stoppingToken ) ;
228297 }
229298 else
230299 {
231300 // Fallback for servers that don't support IDLE
232- await Task . Delay ( ReconnectDelayMs , stoppingToken ) ;
301+ await Task . Delay ( Math . Min ( ( int ) actualTimeout . TotalMilliseconds , ReconnectDelayMs ) , stoppingToken ) ;
233302 await CheckForNewEmails ( ) ;
234303 }
235304 }
@@ -240,7 +309,7 @@ private async Task MonitorForNewEmailsAsync(CancellationToken stoppingToken)
240309 _logger . LogInformation ( "📧 Monitoring stopped due to service shutdown" ) ;
241310 break ;
242311 }
243- // Expected when _idleDoneSource is triggered by event handlers or move queue
312+ // Expected when _idleDoneSource is triggered by event handlers, move queue, or timeout
244313 _logger . LogDebug ( "IDLE interrupted for processing" ) ;
245314 }
246315 finally
@@ -256,7 +325,6 @@ private async Task MonitorForNewEmailsAsync(CancellationToken stoppingToken)
256325 await CheckForNewEmails ( ) ;
257326 }
258327
259-
260328 // Brief delay before next IDLE cycle
261329 await Task . Delay ( 100 , stoppingToken ) ;
262330 }
@@ -278,7 +346,7 @@ private async Task CheckForNewEmails()
278346 var currentCount = _client . Inbox . Count ;
279347 if ( currentCount > _lastProcessedCount )
280348 {
281- await ProcessNewEmailsAsync ( currentCount ) ;
349+ await ProcessMails ( true , _serviceCancellationToken ) ;
282350 }
283351 }
284352 catch ( Exception ex )
@@ -287,30 +355,6 @@ private async Task CheckForNewEmails()
287355 }
288356 }
289357
290- private async Task ProcessNewEmailsAsync ( int currentCount )
291- {
292- var newEmailCount = currentCount - _lastProcessedCount ;
293- _logger . LogInformation ( "🔔 Processing {NewEmailCount} new email(s)!" , newEmailCount ) ;
294- List < EmailReceivedEventArgs > emailsToProcess = new List < EmailReceivedEventArgs > ( ) ;
295-
296- for ( int i = _lastProcessedCount ; i < currentCount ; i ++ )
297- {
298- if ( _serviceCancellationToken . IsCancellationRequested )
299- break ;
300-
301- try
302- {
303- ProcessSingleEmail ( _client ! , _client ! . Inbox , i , isExisting : false , emailsToProcess ) ;
304- }
305- catch ( Exception ex )
306- {
307- _logger . LogWarning ( ex , "Error processing new email at index {Index}" , i ) ;
308- }
309- }
310-
311- _lastProcessedCount = currentCount ;
312- await this . ProcessBatchAsync ( emailsToProcess ) ;
313- }
314358
315359 private async Task ProcessQueuedMoveOperationsAsync ( )
316360 {
@@ -366,8 +410,8 @@ private async Task ProcessSingleMoveOperationAsync(IDbConnection dbConnection, E
366410 var destinationFolder = GetDestinationFolder ( moveOperation . DestinationFolder ) ;
367411
368412 // Perform the move
369- var indexes = moveOperation . Emails . Select ( e => e . MessageIndex ) . ToList ( ) ;
370- await sourceFolder . MoveToAsync ( indexes , destinationFolder ) ;
413+
414+ await sourceFolder . MoveToAsync ( moveOperation . EmailIds , destinationFolder ) ;
371415
372416 // Save to database
373417 SaveEmailsToDatabase ( dbConnection , moveOperation . Emails ) ;
0 commit comments