Skip to content

Commit 8093aa2

Browse files
committed
Optimized Runner to use os.stat() for directory change detection
1 parent f635e9e commit 8093aa2

1 file changed

Lines changed: 42 additions & 49 deletions

File tree

Mailman/Queue/Runner.py

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from io import StringIO
2525
from functools import wraps
2626
import threading
27+
import os
2728

2829
from Mailman import mm_cfg
2930
from Mailman import Utils
@@ -54,6 +55,7 @@ class Runner:
5455
_last_cleanup = time.time() # Last cleanup time
5556
_cleanup_interval = 3600 # Cleanup interval in seconds
5657
_current_backoff = INITIAL_BACKOFF # Current backoff time in seconds
58+
_last_mtime = 0 # Last directory modification time
5759

5860
def __init__(self, slice=None, numslices=1):
5961
syslog('debug', '%s: Starting initialization', self.__class__.__name__)
@@ -82,6 +84,9 @@ def __init__(self, slice=None, numslices=1):
8284
self._last_error_time = 0
8385
self._error_count = 0
8486

87+
self._current_backoff = self.INITIAL_BACKOFF
88+
self._last_mtime = 0
89+
8590
syslog('debug', '%s: Initialization complete', self.__class__.__name__)
8691
except Exception as e:
8792
syslog('error', '%s: Initialization failed: %s\nTraceback:\n%s',
@@ -207,39 +212,43 @@ def _handle_error(self, exc, msg=None, mlist=None, preserve=True):
207212
return True
208213

209214
def _oneloop(self):
210-
# First, list all the files in our queue directory.
211-
# Switchboard.files() is guaranteed to hand us the files in FIFO
212-
# order. Return an integer count of the number of files that were
213-
# available for this qrunner to process.
215+
"""Run one iteration of the runner's main loop.
216+
217+
Returns:
218+
int: Number of files processed, or 0 if no files found
219+
"""
220+
# Check if directory has been modified since last check
221+
try:
222+
st = os.stat(self.QDIR)
223+
current_mtime = st.st_mtime
224+
if current_mtime <= self._last_mtime:
225+
# Directory hasn't changed, use backoff
226+
self._snooze(self._current_backoff)
227+
# Double the backoff time, up to MAX_BACKOFF
228+
self._current_backoff = min(self._current_backoff * 2, self.MAX_BACKOFF)
229+
return 0
230+
# Directory has changed, reset backoff
231+
self._current_backoff = self.INITIAL_BACKOFF
232+
self._last_mtime = current_mtime
233+
except OSError as e:
234+
syslog('error', '%s: Error checking directory %s: %s',
235+
self.__class__.__name__, self.QDIR, str(e))
236+
return 0
237+
238+
# Process files in the directory
214239
files = self._switchboard.files()
215240
if not files:
216241
syslog('debug', '%s: No files to process', self.__class__.__name__)
217242
return 0
218-
243+
244+
# Process each file
219245
for filebase in files:
246+
if self._stop:
247+
break
220248
try:
221249
# Ask the switchboard for the message and metadata objects
222250
# associated with this filebase.
223251
msg, msgdata = self._switchboard.dequeue(filebase)
224-
except Exception as e:
225-
# This used to just catch email.Errors.MessageParseError,
226-
# but other problems can occur in message parsing, e.g.
227-
# ValueError, and exceptions can occur in unpickling too.
228-
# We don't want the runner to die, so we just log and skip
229-
# this entry, but maybe preserve it for analysis.
230-
self._log(e)
231-
if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
232-
syslog('error',
233-
'Skipping and preserving unparseable message: %s',
234-
filebase)
235-
preserve = True
236-
else:
237-
syslog('error',
238-
'Ignoring unparseable message: %s', filebase)
239-
preserve = False
240-
self._switchboard.finish(filebase, preserve=preserve)
241-
continue
242-
try:
243252
self._onefile(msg, msgdata)
244253
self._switchboard.finish(filebase)
245254
except Exception as e:
@@ -411,32 +420,16 @@ def _doperiodic(self):
411420
"""
412421
pass
413422

414-
def _snooze(self, filecnt):
415-
"""Sleep for a while, but check for stop flag periodically.
416-
417-
Implements exponential backoff when no files are found to process.
423+
def _snooze(self, secs):
424+
"""Sleep for the specified number of seconds, but wake up if the
425+
stop flag is set.
426+
427+
Args:
428+
secs: Number of seconds to sleep.
418429
"""
419-
if filecnt > 0:
420-
# Reset backoff when files are found
421-
self._current_backoff = self.INITIAL_BACKOFF
422-
# Only log if we're sleeping for more than 5 seconds
423-
if self.SLEEPTIME > 5:
424-
syslog('debug', '%s: Sleeping for %d seconds after processing %d files in this iteration',
425-
self.__class__.__name__, self.SLEEPTIME, filecnt)
426-
sleep_time = self.SLEEPTIME
427-
else:
428-
# No files found, use exponential backoff
429-
sleep_time = min(self._current_backoff, self.MAX_BACKOFF)
430-
syslog('debug', '%s: No files to process, sleeping for %d seconds',
431-
self.__class__.__name__, sleep_time)
432-
# Double the backoff time for next iteration, up to MAX_BACKOFF
433-
self._current_backoff = min(self._current_backoff * 2, self.MAX_BACKOFF)
434-
435-
for _ in range(sleep_time):
436-
if self._stop:
437-
syslog('debug', '%s: Stop flag detected, waking up', self.__class__.__name__)
438-
return
439-
time.sleep(1)
430+
endtime = time.time() + secs
431+
while time.time() < endtime and not self._stop:
432+
time.sleep(0.1)
440433

441434
def _shortcircuit(self):
442435
"""Return a true value if the individual file processing loop should

0 commit comments

Comments
 (0)