Skip to content

Commit 6857bd2

Browse files
committed
update
1 parent 695017f commit 6857bd2

5 files changed

Lines changed: 194 additions & 327 deletions

File tree

Mailman/Queue/IncomingRunner.py

Lines changed: 37 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -157,100 +157,55 @@ def _convert_message(self, msg):
157157
return Runner._convert_message(self, msg)
158158

159159
def _dispose(self, mlist, msg, msgdata):
160-
"""Process an incoming message."""
161-
msgid = msg.get('message-id', 'n/a')
162-
filebase = msgdata.get('_filebase', 'unknown')
163-
160+
# Try to get the list lock.
161+
try:
162+
mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
163+
except LockFile.TimeOutError:
164+
# Oh well, try again later
165+
return 1
166+
# Process the message through a handler pipeline. The handler
167+
# pipeline can actually come from one of three places: the message
168+
# metadata, the mlist, or the global pipeline.
169+
#
170+
# If a message was requeued due to an uncaught exception, its metadata
171+
# will contain the retry pipeline. Use this above all else.
172+
# Otherwise, if the mlist has a `pipeline' attribute, it should be
173+
# used. Final fallback is the global pipeline.
164174
try:
165-
# Enhanced logging for message details
166-
mailman_log('debug', 'IncomingRunner._dispose: Starting to process message %s (file: %s)', msgid, filebase)
167-
mailman_log('debug', 'Message details:')
168-
mailman_log('debug', ' Message ID: %s', msgid)
169-
mailman_log('debug', ' From: %s', msg.get('from', 'unknown'))
170-
mailman_log('debug', ' To: %s', msg.get('to', 'unknown'))
171-
mailman_log('debug', ' Subject: %s', msg.get('subject', '(no subject)'))
172-
mailman_log('debug', ' Message type: %s', type(msg).__name__)
173-
mailman_log('debug', ' Message data: %s', str(msgdata))
174-
mailman_log('debug', ' Pipeline: %s', msgdata.get('pipeline', 'No pipeline'))
175-
176-
# Check if this is an administrative message
177-
is_admin = msgdata.get('admin_type', False)
178-
mailman_log('debug', ' Is admin message: %s', is_admin)
179-
180-
# Check if this is a list post
181-
is_list_post = msgdata.get('list_post', False)
182-
mailman_log('debug', ' Is list post: %s', is_list_post)
183-
184-
# Log recipients information
185-
recipients = msgdata.get('recips', [])
186-
mailman_log('debug', ' Recipients: %s', recipients)
187-
if not recipients:
188-
mailman_log('warning', 'IncomingRunner: No recipients found in msgdata for message %s, pipeline handlers may set them', msgid)
189-
mailman_log('debug', ' Message data: %s', str(msgdata))
190-
mailman_log('debug', ' To header: %s', msg.get('to', 'unknown'))
191-
mailman_log('debug', ' Cc header: %s', msg.get('cc', 'unknown'))
192-
mailman_log('debug', ' Resent-To: %s', msg.get('resent-to', 'unknown'))
193-
mailman_log('debug', ' Resent-Cc: %s', msg.get('resent-cc', 'unknown'))
194-
195-
# Convert Python's Message to Mailman's Message if needed
196-
msg = self._convert_message(msg)
197-
198-
# Check if this is a bounce message
199-
if self._is_bounce(msg):
200-
mailman_log('debug', 'IncomingRunner._dispose: Message %s is a bounce, routing to bounce queue', msgid)
201-
# Route to bounce queue
202-
bounce_queue = Switchboard(mm_cfg.BOUNCEQUEUE_DIR)
203-
bounce_queue.enqueue(msg, msgdata)
204-
return False
205-
206-
# Get the pipeline
207175
pipeline = self._get_pipeline(mlist, msg, msgdata)
208-
if not pipeline:
209-
mailman_log('error', 'IncomingRunner._dispose: No pipeline found for message %s', msgid)
210-
return False
211-
212-
# Process the message through the pipeline
213-
try:
214-
more = self._dopipeline(mlist, msg, msgdata, pipeline)
215-
if more:
216-
mailman_log('debug', 'IncomingRunner._dispose: Message %s needs more processing', msgid)
217-
return True
218-
except Exception as e:
219-
mailman_log('error', 'IncomingRunner._dispose: Error processing message %s: %s\nTraceback:\n%s',
220-
msgid, str(e), traceback.format_exc())
221-
return False
222-
223-
mailman_log('debug', 'IncomingRunner._dispose: Successfully processed message %s', msgid)
224-
return True
225-
226-
except Exception as e:
227-
mailman_log('error', 'IncomingRunner._dispose: Error processing message %s: %s\nTraceback:\n%s',
228-
msgid, str(e), traceback.format_exc())
229-
return False
176+
msgdata['pipeline'] = pipeline
177+
more = self._dopipeline(mlist, msg, msgdata, pipeline)
178+
if not more:
179+
del msgdata['pipeline']
180+
mlist.Save()
181+
return more
182+
finally:
183+
mlist.Unlock()
230184

231185
def _get_pipeline(self, mlist, msg, msgdata):
232-
"""Get the pipeline for processing the message."""
233186
# We must return a copy of the list, otherwise, the first message that
234187
# flows through the pipeline will empty it out!
235188
return msgdata.get('pipeline',
236-
getattr(mlist, 'pipeline',
237-
mm_cfg.GLOBAL_PIPELINE))[:]
189+
getattr(mlist, 'pipeline',
190+
mm_cfg.GLOBAL_PIPELINE))[:]
238191

239192
def _dopipeline(self, mlist, msg, msgdata, pipeline):
240-
"""Process the message through the pipeline of handlers."""
241193
while pipeline:
242194
handler = pipeline.pop(0)
243195
modname = 'Mailman.Handlers.' + handler
244196
__import__(modname)
245197
try:
246198
pid = os.getpid()
247199
sys.modules[modname].process(mlist, msg, msgdata)
248-
# Failsafe -- a child may have leaked through
200+
# Failsafe -- a child may have leaked through.
249201
if pid != os.getpid():
250202
mailman_log('error', 'Child process leaked through: %s', modname)
251203
os._exit(1)
252204
except Errors.DiscardMessage:
253-
# Throw the message away
205+
# Throw the message away; we need do nothing else with it.
206+
# We do need to push the current handler back in the pipeline
207+
# just in case the syslog call throws an exception and the
208+
# message is shunted.
254209
pipeline.insert(0, handler)
255210
mailman_log('vette', """Message discarded, msgid: %s
256211
list: %s,
@@ -259,10 +214,14 @@ def _dopipeline(self, mlist, msg, msgdata, pipeline):
259214
mlist.real_name, handler)
260215
return 0
261216
except Errors.HoldMessage:
262-
# Let the approval process take it from here
217+
# Let the approval process take it from here. The message no
218+
# longer needs to be queued.
263219
return 0
264220
except Errors.RejectMessage as e:
265-
# Log rejection and bounce message
221+
# Log this.
222+
# We do need to push the current handler back in the pipeline
223+
# just in case the syslog call or BounceMessage throws an
224+
# exception and the message is shunted.
266225
pipeline.insert(0, handler)
267226
mailman_log('vette', """Message rejected, msgid: %s
268227
list: %s,
@@ -272,8 +231,9 @@ def _dopipeline(self, mlist, msg, msgdata, pipeline):
272231
mlist.real_name, handler, e.notice())
273232
mlist.BounceMessage(msg, msgdata, e)
274233
return 0
275-
except Exception as e:
234+
except:
276235
# Push this pipeline module back on the stack, then re-raise
236+
# the exception.
277237
pipeline.insert(0, handler)
278238
raise
279239
# We've successfully completed handling of this message

Mailman/Queue/OutgoingRunner.py

Lines changed: 76 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,22 @@ def __init__(self, slice=None, numslices=1):
8181
self._last_cleanup = time.time()
8282

8383
# We look this function up only at startup time
84-
self._modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE
85-
mailman_log('debug', 'OutgoingRunner: Attempting to import delivery module: %s', self._modname)
84+
modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE
85+
mailman_log('debug', 'OutgoingRunner: Attempting to import delivery module: %s', modname)
8686

8787
try:
88-
mod = __import__(self._modname)
88+
mod = __import__(modname)
8989
mailman_log('debug', 'OutgoingRunner: Successfully imported delivery module')
9090
except ImportError as e:
91-
mailman_log('error', 'OutgoingRunner: Failed to import delivery module %s: %s', self._modname, str(e))
91+
mailman_log('error', 'OutgoingRunner: Failed to import delivery module %s: %s', modname, str(e))
9292
mailman_log('error', 'OutgoingRunner: Traceback: %s', traceback.format_exc())
9393
raise
9494

9595
try:
96-
self._func = getattr(sys.modules[self._modname], 'process')
96+
self._func = getattr(sys.modules[modname], 'process')
9797
mailman_log('debug', 'OutgoingRunner: Successfully got process function from module')
9898
except AttributeError as e:
99-
mailman_log('error', 'OutgoingRunner: Failed to get process function from module %s: %s', self._modname, str(e))
99+
mailman_log('error', 'OutgoingRunner: Failed to get process function from module %s: %s', modname, str(e))
100100
mailman_log('error', 'OutgoingRunner: Traceback: %s', traceback.format_exc())
101101
raise
102102

@@ -242,100 +242,77 @@ def _validate_message(self, msg, msgdata):
242242
return msg, False
243243

244244
def _dispose(self, mlist, msg, msgdata):
245-
"""Process an outgoing message."""
246-
msgid = msg.get('message-id', 'n/a')
247-
filebase = msgdata.get('_filebase', 'unknown')
248-
249-
# Log the full msgdata at the start of processing
250-
mailman_log('debug', 'OutgoingRunner._dispose: Full msgdata at start:\n%s', str(msgdata))
251-
252-
# Ensure we have a MailList object
253-
if isinstance(mlist, str):
254-
try:
255-
mlist = get_mail_list()(mlist, lock=0)
256-
should_unlock = True
257-
except Errors.MMUnknownListError:
258-
mailman_log('error', 'OutgoingRunner: Unknown list %s', mlist)
259-
self._shunt.enqueue(msg, msgdata)
260-
return True
261-
else:
262-
should_unlock = False
263-
245+
# See if we should retry delivery of this message again.
246+
deliver_after = msgdata.get('deliver_after', 0)
247+
if time.time() < deliver_after:
248+
return True
249+
# Make sure we have the most up-to-date state
250+
mlist.Load()
264251
try:
265-
mailman_log('debug', 'OutgoingRunner._dispose: Starting to process outgoing message %s (file: %s) for list %s',
266-
msgid, filebase, mlist.internal_name())
267-
268-
# Check retry delay and duplicate processing
269-
if not self._check_retry_delay(msgid, filebase):
270-
mailman_log('debug', 'OutgoingRunner._dispose: Message %s failed retry delay check, skipping', msgid)
271-
return False
272-
273-
# Make sure we have the most up-to-date state
274-
try:
275-
mlist.Load()
276-
mailman_log('debug', 'OutgoingRunner._dispose: Successfully loaded list %s', mlist.internal_name())
277-
except Errors.MMCorruptListDatabaseError as e:
278-
mailman_log('error', 'OutgoingRunner._dispose: Failed to load list %s: %s\nTraceback:\n%s',
279-
mlist.internal_name(), str(e), traceback.format_exc())
280-
self._unmark_message_processed(msgid)
281-
return False
282-
except Exception as e:
283-
mailman_log('error', 'OutgoingRunner._dispose: Unexpected error loading list %s: %s\nTraceback:\n%s',
284-
mlist.internal_name(), str(e), traceback.format_exc())
285-
self._unmark_message_processed(msgid)
286-
return False
287-
288-
# Validate message type first
289-
msg, success = self._validate_message(msg, msgdata)
290-
if not success:
291-
mailman_log('error', 'OutgoingRunner._dispose: Message validation failed for message %s', msgid)
292-
self._unmark_message_processed(msgid)
293-
return False
294-
295-
# Log the full msgdata after validation
296-
mailman_log('debug', 'OutgoingRunner._dispose: Full msgdata after validation:\n%s', str(msgdata))
297-
298-
# Validate message headers
299-
if not msg.get('message-id'):
300-
mailman_log('error', 'OutgoingRunner._dispose: Message missing Message-ID header')
301-
self._unmark_message_processed(msgid)
302-
return False
303-
304-
# Process the outgoing message
305-
try:
306-
mailman_log('debug', 'OutgoingRunner._dispose: Processing outgoing message %s', msgid)
307-
308-
# Get message type and recipient
309-
msgtype = msgdata.get('_msgtype', 'unknown')
310-
recipient = msgdata.get('recipient', 'unknown')
311-
312-
mailman_log('debug', 'OutgoingRunner._dispose: Message %s is type %s for recipient %s',
313-
msgid, msgtype, recipient)
314-
315-
# Process based on message type
316-
if msgtype == 'bounce':
317-
success = self._process_bounce(mlist, msg, msgdata)
318-
elif msgtype == 'admin':
319-
success = self._process_admin(mlist, msg, msgdata)
320-
else:
321-
success = self._process_regular(mlist, msg, msgdata)
322-
323-
if success:
324-
mailman_log('debug', 'OutgoingRunner._dispose: Successfully processed outgoing message %s', msgid)
325-
return True
326-
else:
327-
mailman_log('error', 'OutgoingRunner._dispose: Failed to process outgoing message %s', msgid)
328-
return False
329-
330-
except Exception as e:
331-
mailman_log('error', 'OutgoingRunner._dispose: Error processing outgoing message %s: %s\nTraceback:\n%s',
332-
msgid, str(e), traceback.format_exc())
333-
self._unmark_message_processed(msgid)
334-
return False
335-
336-
finally:
337-
if should_unlock:
338-
mlist.Unlock()
252+
pid = os.getpid()
253+
self._func(mlist, msg, msgdata)
254+
# Failsafe -- a child may have leaked through.
255+
if pid != os.getpid():
256+
mailman_log('error', 'child process leaked thru: %s', mm_cfg.DELIVERY_MODULE)
257+
os._exit(1)
258+
self.__logged = False
259+
except socket.error:
260+
# There was a problem connecting to the SMTP server. Log this
261+
# once, but crank up our sleep time so we don't fill the error
262+
# log.
263+
port = mm_cfg.SMTPPORT
264+
if port == 0:
265+
port = 'smtp'
266+
# Log this just once.
267+
if not self.__logged:
268+
mailman_log('error', 'Cannot connect to SMTP server %s on port %s',
269+
mm_cfg.SMTPHOST, port)
270+
self.__logged = True
271+
self._snooze(0)
272+
return True
273+
except Errors.SomeRecipientsFailed as e:
274+
# Handle local rejects of probe messages differently.
275+
if msgdata.get('probe_token') and e.permfailures:
276+
self._probe_bounce(mlist, msgdata['probe_token'])
277+
else:
278+
# Delivery failed at SMTP time for some or all of the
279+
# recipients. Permanent failures are registered as bounces,
280+
# but temporary failures are retried for later.
281+
#
282+
# BAW: msg is going to be the original message that failed
283+
# delivery, not a bounce message. This may be confusing if
284+
# this is what's sent to the user in the probe message. Maybe
285+
# we should craft a bounce-like message containing information
286+
# about the permanent SMTP failure?
287+
if e.permfailures:
288+
self._queue_bounces(mlist.internal_name(), e.permfailures,
289+
msg)
290+
# Move temporary failures to the qfiles/retry queue which will
291+
# occasionally move them back here for another shot at
292+
# delivery.
293+
if e.tempfailures:
294+
now = time.time()
295+
recips = e.tempfailures
296+
last_recip_count = msgdata.get('last_recip_count', 0)
297+
deliver_until = msgdata.get('deliver_until', now)
298+
if len(recips) == last_recip_count:
299+
# We didn't make any progress, so don't attempt
300+
# delivery any longer. BAW: is this the best
301+
# disposition?
302+
if now > deliver_until:
303+
return False
304+
else:
305+
# Keep trying to delivery this message for a while
306+
deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD
307+
# Don't retry delivery too soon.
308+
deliver_after = now + mm_cfg.DELIVERY_RETRY_WAIT
309+
msgdata['deliver_after'] = deliver_after
310+
msgdata['last_recip_count'] = len(recips)
311+
msgdata['deliver_until'] = deliver_until
312+
msgdata['recips'] = recips
313+
self.__retryq.enqueue(msg, msgdata)
314+
# We've successfully completed handling of this message
315+
return False
339316

340317
def _process_bounce(self, mlist, msg, msgdata):
341318
"""Process a bounce message."""

0 commit comments

Comments
 (0)