2929import time
3030import traceback
3131from Mailman import Errors
32+ import threading
3233
3334
3435class VirginRunner (IncomingRunner ):
@@ -40,6 +41,29 @@ class VirginRunner(IncomingRunner):
4041 # Cleanup interval for message tracking data
4142 _cleanup_interval = 3600 # 1 hour in seconds
4243
44+ # Message tracking configuration
45+ _processed_messages = set ()
46+ _processed_lock = threading .Lock ()
47+ _last_cleanup = time .time ()
48+ _max_processed_messages = 10000
49+ _processed_times = {} # Track processing times for messages
50+
51+ def __init__ (self , slice = None , numslices = 1 ):
52+ mailman_log ('debug' , 'VirginRunner: Starting initialization' )
53+ try :
54+ Runner .__init__ (self , slice , numslices )
55+
56+ # Initialize processed messages tracking
57+ self ._processed_messages = set ()
58+ self ._processed_times = {}
59+ self ._last_cleanup = time .time ()
60+
61+ mailman_log ('debug' , 'VirginRunner: Initialization complete' )
62+ except Exception as e :
63+ mailman_log ('error' , 'VirginRunner: Initialization failed: %s\n Traceback:\n %s' ,
64+ str (e ), traceback .format_exc ())
65+ raise
66+
4367 def _check_message_processed (self , msgid , filebase ):
4468 """Check if a message has already been processed and if retry delay is met.
4569 Returns True if the message can be processed, False if it's a duplicate or retry delay not met."""
@@ -149,19 +173,16 @@ def _get_pipeline(self, mlist, msg, msgdata):
149173
150174 def _cleanup_old_messages (self ):
151175 """Clean up old message tracking data."""
152- try :
153- mailman_log ('debug' , 'VirginRunner: Starting cleanup of old message tracking data' )
154- now = time .time ()
155- old_msgids = []
156- for msgid , process_time in list (self ._processed_times .items ()):
157- if now - process_time > self ._max_tracking_age :
158- old_msgids .append (msgid )
159- for msgid in old_msgids :
160- del self ._processed_times [msgid ]
161- self ._processed_messages .discard (msgid )
162- mailman_log ('debug' , 'VirginRunner: Cleaned up %d old message entries' , len (old_msgids ))
163- except Exception as e :
164- mailman_log ('error' , 'VirginRunner: Error during cleanup: %s' , str (e ))
176+ with self ._processed_lock :
177+ if len (self ._processed_messages ) > self ._max_processed_messages :
178+ mailman_log ('debug' , 'VirginRunner._cleanup_old_messages: Clearing processed messages set (size: %d)' ,
179+ len (self ._processed_messages ))
180+ self ._processed_messages .clear ()
181+ if len (self ._processed_times ) > self ._max_processed_messages :
182+ mailman_log ('debug' , 'VirginRunner._cleanup_old_messages: Clearing processed times dict (size: %d)' ,
183+ len (self ._processed_times ))
184+ self ._processed_times .clear ()
185+ self ._last_cleanup = time .time ()
165186
166187 def _onefile (self , msg , msgdata ):
167188 """Process a single file from the queue."""
@@ -184,3 +205,12 @@ def _onefile(self, msg, msgdata):
184205 return keepqueued
185206 finally :
186207 mlist .Unlock ()
208+
209+ def _unmark_message_processed (self , msgid ):
210+ """Remove a message from the processed messages set."""
211+ with self ._processed_lock :
212+ if msgid in self ._processed_messages :
213+ self ._processed_messages .remove (msgid )
214+ if msgid in self ._processed_times :
215+ del self ._processed_times [msgid ]
216+ mailman_log ('debug' , 'VirginRunner: Unmarked message %s as processed' , msgid )
0 commit comments