Skip to content

Commit 717a082

Browse files
committed
update
1 parent 23d8be1 commit 717a082

2 files changed

Lines changed: 65 additions & 88 deletions

File tree

Mailman/Queue/IncomingRunner.py

Lines changed: 45 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -317,102 +317,61 @@ def _cleanup(self):
317317
mailman_log('qrunner', 'IncomingRunner._cleanup: Cleanup complete')
318318

319319
def _oneloop(self):
320-
mailman_log('qrunner', 'IncomingRunner._oneloop: Starting loop')
320+
"""Process one batch of messages from the incoming queue."""
321321
# First, list all the files in our queue directory.
322322
# Switchboard.files() is guaranteed to hand us the files in FIFO
323323
# order. Return an integer count of the number of files that were
324324
# available for this qrunner to process.
325-
files = self._switchboard.files()
326-
mailman_log('qrunner', 'IncomingRunner._oneloop: Found %d files to process', len(files))
327-
328-
for filebase in files:
329-
try:
330-
# Log that we're starting to process this file
331-
mailman_log('qrunner', 'IncomingRunner._oneloop: Starting to process queue file: %s', filebase)
332-
333-
# Ask the switchboard for the message and metadata objects
334-
# associated with this filebase.
325+
try:
326+
# Get the list of files to process
327+
files = self._switchboard.files()
328+
filecnt = len(files)
329+
330+
# Only log at debug level if we found files to process
331+
if filecnt > 0:
332+
mailman_log('debug', 'IncomingRunner._oneloop: Found %d files to process', filecnt)
333+
334+
# Process each file
335+
for filebase in files:
335336
try:
337+
# Dequeue the file
336338
msg, msgdata = self._switchboard.dequeue(filebase)
337-
if msg is None or msgdata is None:
338-
mailman_log('qrunner', 'IncomingRunner._oneloop: Failed to dequeue file %s - invalid message data', filebase)
339-
# Move to shunt queue
340-
try:
341-
src = os.path.join(self._switchboard.whichq(), filebase + '.bak')
342-
dst = os.path.join(mm_cfg.BADQUEUE_DIR, filebase + '.psv')
343-
if not os.path.exists(mm_cfg.BADQUEUE_DIR):
344-
os.makedirs(mm_cfg.BADQUEUE_DIR, 0o770)
345-
os.rename(src, dst)
346-
mailman_log('qrunner', 'IncomingRunner._oneloop: Moved invalid file to shunt queue: %s -> %s (reason: null message or metadata)', filebase, dst)
347-
except Exception as e:
348-
mailman_log('qrunner', 'IncomingRunner._oneloop: Failed to move invalid file to shunt queue: %s', str(e))
339+
if msg is None:
349340
continue
350341

351-
mailman_log('qrunner', 'IncomingRunner._oneloop: Successfully dequeued file %s', filebase)
342+
mailman_log('info', 'IncomingRunner._oneloop: Successfully dequeued file %s', filebase)
352343

353-
# Validate message data structure
354-
if not isinstance(msgdata, dict):
355-
mailman_log('qrunner', 'IncomingRunner._oneloop: Invalid message data structure for file %s: expected dict, got %s',
356-
filebase, type(msgdata))
357-
# Move to shunt queue
358-
try:
359-
src = os.path.join(self._switchboard.whichq(), filebase + '.bak')
360-
dst = os.path.join(mm_cfg.BADQUEUE_DIR, filebase + '.psv')
361-
if not os.path.exists(mm_cfg.BADQUEUE_DIR):
362-
os.makedirs(mm_cfg.BADQUEUE_DIR, 0o770)
363-
os.rename(src, dst)
364-
mailman_log('qrunner', 'IncomingRunner._oneloop: Moved invalid file to shunt queue: %s -> %s (reason: invalid message data structure)', filebase, dst)
365-
except Exception as e:
366-
mailman_log('qrunner', 'IncomingRunner._oneloop: Failed to move invalid file to shunt queue: %s', str(e))
367-
continue
344+
# Process the message
345+
try:
346+
# Get the list name from the message data
347+
listname = msgdata.get('listname', mm_cfg.MAILMAN_SITE_LIST)
348+
349+
# Process the message
350+
result = self._dispose(listname, msg, msgdata)
351+
352+
# If the message should be kept in the queue, requeue it
353+
if result:
354+
self._switchboard.enqueue(msg, msgdata)
355+
mailman_log('info', 'IncomingRunner._oneloop: Message requeued for later processing: %s', filebase)
356+
else:
357+
mailman_log('info', 'IncomingRunner._oneloop: Message processing complete, moving to shunt queue %s (msgid: %s)',
358+
filebase, msg.get('message-id', 'n/a'))
359+
360+
except Exception as e:
361+
mailman_log('error', 'IncomingRunner._oneloop: Error processing message: %s\n%s',
362+
str(e), traceback.format_exc())
363+
# Move to shunt queue on error
364+
self._shunt.enqueue(msg, msgdata)
368365

369-
# Validate required message data fields
370-
required_fields = ['listname']
371-
missing_fields = [field for field in required_fields if field not in msgdata]
372-
if missing_fields:
373-
mailman_log('qrunner', 'IncomingRunner._oneloop: Missing required fields in message data for file %s: %s',
374-
filebase, ', '.join(missing_fields))
375-
# Move to shunt queue
376-
try:
377-
src = os.path.join(self._switchboard.whichq(), filebase + '.bak')
378-
dst = os.path.join(mm_cfg.BADQUEUE_DIR, filebase + '.psv')
379-
if not os.path.exists(mm_cfg.BADQUEUE_DIR):
380-
os.makedirs(mm_cfg.BADQUEUE_DIR, 0o770)
381-
os.rename(src, dst)
382-
mailman_log('qrunner', 'IncomingRunner._oneloop: Moved invalid file to shunt queue: %s -> %s (reason: missing required fields: %s)',
383-
filebase, dst, ', '.join(missing_fields))
384-
except Exception as e:
385-
mailman_log('qrunner', 'IncomingRunner._oneloop: Failed to move invalid file to shunt queue: %s', str(e))
386-
continue
387-
388366
except Exception as e:
389-
mailman_log('qrunner', 'IncomingRunner._oneloop: Failed to dequeue file %s: %s', filebase, str(e))
390-
continue
367+
mailman_log('error', 'IncomingRunner._oneloop: Error dequeuing file %s: %s\n%s',
368+
filebase, str(e), traceback.format_exc())
391369

392-
# Process the message
393-
more = self._dispose(msgdata['listname'], msg, msgdata)
394-
if more:
395-
# The message needs more processing, so enqueue it at the
396-
# end of the self._switchboard's queue.
397-
mailman_log('qrunner', 'IncomingRunner._oneloop: Message needs more processing, requeuing %s', filebase)
398-
self._switchboard.enqueue(msg, msgdata)
399-
else:
400-
# The message is done being processed by this qrunner, so
401-
# shunt it off to the next queue.
402-
msgid = msg.get('message-id', 'n/a')
403-
mailman_log('qrunner', 'IncomingRunner._oneloop: Message processing complete, moving to shunt queue %s (msgid: %s)', filebase, msgid)
404-
self._shunt.enqueue(msg, msgdata)
405-
except Exception as e:
406-
# Log the error and requeue the message for later processing
407-
mailman_log('qrunner', 'IncomingRunner._oneloop: Error processing queue file %s: %s\n%s',
408-
filebase, str(e), traceback.format_exc())
409-
if msg is not None and msgdata is not None:
410-
try:
411-
self._switchboard.enqueue(msg, msgdata)
412-
mailman_log('qrunner', 'IncomingRunner._oneloop: Successfully requeued file %s', filebase)
413-
except Exception as e2:
414-
mailman_log('qrunner', 'IncomingRunner._oneloop: Failed to requeue file %s: %s', filebase, str(e2))
415-
416-
mailman_log('qrunner', 'IncomingRunner._oneloop: Loop complete, processed %d files', len(files))
417-
return len(files)
370+
# Only log completion at debug level if we processed files
371+
if filecnt > 0:
372+
mailman_log('debug', 'IncomingRunner._oneloop: Loop complete, processed %d files', filecnt)
373+
374+
except Exception as e:
375+
mailman_log('error', 'IncomingRunner._oneloop: Unexpected error in main loop: %s\n%s',
376+
str(e), traceback.format_exc())
418377

Mailman/Queue/VirginRunner.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ def _check_retry_delay(self, msgid, filebase):
6262
# Continue processing even if cleanup fails
6363

6464
# Check retry delay
65-
last_retry = self._retry_times.get(msgid, 0)
66-
time_since_last_retry = current_time - last_retry
65+
last_retry = self._retry_times.get(msgid)
66+
time_since_last_retry = 0 if last_retry is None else current_time - last_retry
6767

6868
# Log detailed retry information at debug level
6969
mailman_log('debug', 'VirginRunner: Retry check for message %s (file: %s):', msgid, filebase)
@@ -72,6 +72,24 @@ def _check_retry_delay(self, msgid, filebase):
7272
mailman_log('debug', ' Time since last retry: %d seconds', time_since_last_retry)
7373
mailman_log('debug', ' Minimum retry delay: %d seconds', self.MIN_RETRY_DELAY)
7474

75+
# If message has never been retried (last_retry is None), it can be processed
76+
if last_retry is None:
77+
# Update both data structures atomically
78+
try:
79+
self._processed_messages.add(msgid)
80+
self._retry_times[msgid] = current_time
81+
mailman_log('debug', 'VirginRunner: Message %s (file: %s) has never been retried, proceeding with processing',
82+
msgid, filebase)
83+
return True
84+
except Exception as e:
85+
# If we fail to update the tracking data, remove the message from processed set
86+
self._processed_messages.discard(msgid)
87+
self._retry_times.pop(msgid, None)
88+
mailman_log('error', 'VirginRunner: Failed to update tracking data for message %s: %s',
89+
msgid, str(e))
90+
return False
91+
92+
# For messages that have been retried before, check the delay
7593
if time_since_last_retry < self.MIN_RETRY_DELAY:
7694
# Log at info level when retry check fails
7795
mailman_log('info', 'VirginRunner: Message %s (file: %s) retried too soon. Time since last retry: %d seconds, minimum required: %d seconds',

0 commit comments

Comments
 (0)