Skip to content

Commit 689daa8

Browse files
committed
update
1 parent bd545fa commit 689daa8

2 files changed

Lines changed: 82 additions & 43 deletions

File tree

Mailman/Queue/Runner.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,13 @@ def _onefile(self, msg, msgdata):
323323
try:
324324
result = self._dispose(mlist, msg, msgdata)
325325
if result:
326+
# If _dispose returns True, requeue the message
326327
self._switchboard.enqueue(msg, msgdata)
328+
syslog('debug', 'Runner._onefile: Message requeued for %s', listname)
329+
else:
330+
# If _dispose returns False, finish processing and remove the file
331+
self._switchboard.finish(msgdata.get('filebase', ''))
332+
syslog('debug', 'Runner._onefile: Message processing completed for %s', listname)
327333
return result
328334
except Exception as e:
329335
self._handle_error(e, msg=msg, mlist=mlist)

Mailman/Queue/Switchboard.py

Lines changed: 76 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -97,63 +97,35 @@ def __init__(self, whichq, slice=None, numslices=1, recover=False):
9797
self.recover_backup_files()
9898
# Clean up any stale locks during initialization
9999
self.cleanup_stale_locks()
100+
# Clean up any stale backup files
101+
self.cleanup_stale_backups()
102+
# Clean up any stale processed files
103+
self.cleanup_stale_processed()
100104

101105
def whichq(self):
102106
return self.__whichq
103107

104108
def enqueue(self, msg, msgdata=None, listname=None, _plaintext=False, **kwargs):
105109
"""Add a message to the queue.
106110
107-
Additional keyword arguments are stored in msgdata.
111+
Args:
112+
msg: The message to enqueue
113+
msgdata: Optional message metadata
114+
listname: Optional list name
115+
_plaintext: Whether to save as plaintext
116+
**kwargs: Additional metadata to add
108117
"""
118+
# Initialize msgdata if not provided
109119
if msgdata is None:
110120
msgdata = {}
111-
if listname:
112-
msgdata['listname'] = listname
113121

114-
# Store any additional keyword arguments in msgdata
122+
# Add any additional metadata
115123
msgdata.update(kwargs)
116124

117-
# Log the full msgdata before processing
118-
mailman_log('debug', 'Switchboard.enqueue: Full msgdata before processing:\n%s', str(msgdata))
125+
# Add listname if provided
126+
if listname:
127+
msgdata['listname'] = listname
119128

120-
# Convert string message to Message object if needed
121-
if isinstance(msg, str):
122-
try:
123-
msg = email.message_from_string(msg)
124-
except Exception as e:
125-
mailman_log('error', 'Switchboard.enqueue: Failed to convert string message to Message object: %s', str(e))
126-
raise
127-
128-
# First check if we have a recipient set
129-
if 'recipient' not in msgdata:
130-
# Try to get recipient from msgdata['recips'] first
131-
if msgdata.get('recips'):
132-
msgdata['recipient'] = msgdata['recips'][0]
133-
mailman_log('debug', 'Switchboard.enqueue: Set recipient from recips for message: %s',
134-
msg.get('message-id', 'n/a'))
135-
# Then try envelope-to header
136-
elif msg.get('envelope-to'):
137-
msgdata['recipient'] = msg.get('envelope-to')
138-
mailman_log('debug', 'Switchboard.enqueue: Set recipient from envelope-to header for message: %s',
139-
msg.get('message-id', 'n/a'))
140-
# Finally try To header
141-
elif msg.get('to'):
142-
# Parse the To header to get the first recipient
143-
addrs = email.utils.getaddresses([msg.get('to')])
144-
if addrs and addrs[0][1]:
145-
msgdata['recipient'] = addrs[0][1]
146-
mailman_log('debug', 'Switchboard.enqueue: Set recipient from To header for message: %s',
147-
msg.get('message-id', 'n/a'))
148-
else:
149-
mailman_log('error', 'Switchboard.enqueue: No valid recipient found in To header for message: %s',
150-
msg.get('message-id', 'n/a'))
151-
raise ValueError('No valid recipient found in To header')
152-
else:
153-
mailman_log('error', 'Switchboard.enqueue: No recipient found in msgdata for message: %s',
154-
msg.get('message-id', 'n/a'))
155-
raise ValueError('No recipient found in msgdata')
156-
157129
# Then check if we need to set recips
158130
if 'recips' not in msgdata or not msgdata['recips']:
159131
# If we have a recipient but no recips, use the recipient
@@ -214,6 +186,8 @@ def enqueue(self, msg, msgdata=None, listname=None, _plaintext=False, **kwargs):
214186
mailman_log('error', 'Switchboard.enqueue: Failed to write message to %s: %s', filebase, str(e))
215187
raise
216188

189+
# Add filebase to msgdata for cleanup
190+
msgdata['filebase'] = filebase
217191
return filebase
218192
finally:
219193
# Always clean up the lock file
@@ -247,6 +221,9 @@ def dequeue(self, filebase):
247221
# Read the message and metadata
248222
try:
249223
msg, data = self._dequeue(filename)
224+
# Add filebase to msgdata for cleanup
225+
if data is not None:
226+
data['filebase'] = filebase
250227
# Log the full msgdata after dequeuing
251228
mailman_log('debug', 'Switchboard.dequeue: Full msgdata after dequeuing:\n%s', str(data))
252229
except Exception as e:
@@ -660,6 +637,62 @@ def cleanup_stale_locks(self):
660637
except OSError as e:
661638
mailman_log('error', 'Error cleaning up stale locks: %s', str(e))
662639

640+
def cleanup_stale_backups(self):
641+
"""Clean up any stale backup files in the queue directory.
642+
643+
This method removes backup files that are older than 24 hours
644+
to prevent accumulation of stale files.
645+
"""
646+
try:
647+
now = time.time()
648+
stale_age = 24 * 3600 # 24 hours in seconds
649+
650+
for f in os.listdir(self.__whichq):
651+
if f.endswith('.bak'):
652+
bakfile = os.path.join(self.__whichq, f)
653+
try:
654+
# Check file age
655+
file_age = now - os.path.getmtime(bakfile)
656+
if file_age > stale_age:
657+
mailman_log('warning',
658+
'Cleaning up stale backup file %s (age: %d seconds)',
659+
bakfile, file_age)
660+
os.unlink(bakfile)
661+
except OSError as e:
662+
mailman_log('error',
663+
'Failed to clean up stale backup file %s: %s',
664+
bakfile, str(e))
665+
except OSError as e:
666+
mailman_log('error', 'Error cleaning up stale backup files: %s', str(e))
667+
668+
def cleanup_stale_processed(self):
669+
"""Clean up any stale processed files in the queue directory.
670+
671+
This method removes processed files that are older than 7 days
672+
to prevent accumulation of stale files.
673+
"""
674+
try:
675+
now = time.time()
676+
stale_age = 7 * 24 * 3600 # 7 days in seconds
677+
678+
for f in os.listdir(self.__whichq):
679+
if f.endswith('.pck'):
680+
pckfile = os.path.join(self.__whichq, f)
681+
try:
682+
# Check file age
683+
file_age = now - os.path.getmtime(pckfile)
684+
if file_age > stale_age:
685+
mailman_log('warning',
686+
'Cleaning up stale processed file %s (age: %d seconds)',
687+
pckfile, file_age)
688+
os.unlink(pckfile)
689+
except OSError as e:
690+
mailman_log('error',
691+
'Failed to clean up stale processed file %s: %s',
692+
pckfile, str(e))
693+
except OSError as e:
694+
mailman_log('error', 'Error cleaning up stale processed files: %s', str(e))
695+
663696
def _make_filebase(self, msg, msgdata):
664697
import hashlib
665698
import time

0 commit comments

Comments
 (0)