Skip to content

Commit bc51497

Browse files
committed
update
1 parent ec6a388 commit bc51497

2 files changed

Lines changed: 101 additions & 29 deletions

File tree

Mailman/Queue/NewsRunner.py

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -128,29 +128,81 @@ def _dispose(self, mlist, msg, msgdata):
128128
mlist.internal_name(), str(e))
129129
return True
130130

131+
def _onefile(self, msg, msgdata):
132+
"""Process a single news message.
133+
134+
This method overrides the base class's _onefile to add news-specific
135+
validation and processing.
136+
137+
Args:
138+
msg: The message to process
139+
msgdata: Additional message metadata
140+
"""
141+
try:
142+
# Validate the message
143+
if not self._validate_message(msg, msgdata):
144+
syslog('error', 'NewsRunner._onefile: Message validation failed')
145+
self._shunt.enqueue(msg, msgdata)
146+
return
147+
148+
# Get the list name from the message data
149+
listname = msgdata.get('listname')
150+
if not listname:
151+
syslog('error', 'NewsRunner._onefile: No listname in message data')
152+
self._shunt.enqueue(msg, msgdata)
153+
return
154+
155+
# Open the list
156+
try:
157+
mlist = self._open_list(listname)
158+
except Exception as e:
159+
self.log_error('list_open_error', str(e), listname=listname)
160+
self._shunt.enqueue(msg, msgdata)
161+
return
162+
163+
# Process the message
164+
try:
165+
keepqueued = self._dispose(mlist, msg, msgdata)
166+
if keepqueued:
167+
self._switchboard.enqueue(msg, msgdata)
168+
except Exception as e:
169+
self._handle_error(e, msg=msg, mlist=mlist)
170+
171+
except Exception as e:
172+
syslog('error', 'NewsRunner._onefile: Unexpected error: %s', str(e))
173+
self._shunt.enqueue(msg, msgdata)
174+
131175
def _oneloop(self):
132-
# If NNTP is not enabled, sleep for a while before checking again
133-
if not self._nntp:
134-
# Check the stop flag every second during sleep
135-
for _ in range(60):
136-
if self._stop:
137-
return 0
138-
time.sleep(1)
139-
return 0
176+
"""Process one batch of messages from the news queue."""
177+
try:
178+
# Get the list of files to process
179+
files = self._switchboard.files()
180+
filecnt = len(files)
140181

141-
# Get one message from the queue
142-
msg = self._switchboard.dequeue()
143-
if msg is None:
182+
# Process each file
183+
for filebase in files:
184+
try:
185+
# Dequeue the file
186+
msg, msgdata = self._switchboard.dequeue(filebase)
187+
if msg is None:
188+
continue
189+
190+
# Process the message
191+
try:
192+
self._onefile(msg, msgdata)
193+
except Exception as e:
194+
syslog('error', 'NewsRunner._oneloop: Error processing message %s: %s', filebase, str(e))
195+
continue
196+
197+
except Exception as e:
198+
syslog('error', 'NewsRunner._oneloop: Error dequeuing file %s: %s', filebase, str(e))
199+
continue
200+
201+
except Exception as e:
202+
syslog('error', 'NewsRunner._oneloop: Error in main loop: %s', str(e))
144203
return 0
145204

146-
# Process the message
147-
try:
148-
self._dopost(msg)
149-
except Exception as e:
150-
mailman_log('error', 'NewsRunner error: %s', str(e))
151-
# Put the message back in the queue
152-
self._switchboard.enqueue(msg, msgdata={})
153-
return 1
205+
return filecnt
154206

155207
def _queue_news(self, listname, msg, msgdata):
156208
"""Queue a news message for processing."""

Mailman/Queue/Runner.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
class Runner:
4141
QDIR = None
4242
SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME
43+
MIN_RETRY_DELAY = 300 # 5 minutes minimum delay between retries
4344

4445
# Message tracking configuration - can be overridden by subclasses
4546
_track_messages = False # Whether to track processed messages
@@ -74,6 +75,10 @@ def __init__(self, slice=None, numslices=1):
7475
self._last_cleanup = time.time()
7576
self._cleanup_interval = 3600
7677

78+
# Initialize error tracking attributes
79+
self._last_error_time = 0
80+
self._error_count = 0
81+
7782
syslog('debug', 'Runner: Initialization complete')
7883
except Exception as e:
7984
syslog('error', 'Runner: Initialization failed: %s\nTraceback:\n%s',
@@ -112,17 +117,32 @@ def run(self):
112117
# subprocesses we've created and do any other necessary cleanups.
113118
self._cleanup()
114119

115-
def log_error(self, error_type, error, msg=None, mlist=None, **context):
116-
"""Structured error logging with context."""
117-
context.update({
120+
def log_error(self, error_type, error_msg, **kwargs):
121+
"""Log an error with the given type and message.
122+
123+
Args:
124+
error_type: A string identifying the type of error
125+
error_msg: The error message to log
126+
**kwargs: Additional context to include in the log message
127+
"""
128+
context = {
118129
'runner': self.__class__.__name__,
119-
'list': mlist.internal_name() if mlist else 'N/A',
120-
'msg_id': msg.get('message-id', 'N/A') if msg else 'N/A',
121130
'error_type': error_type,
122-
'error': str(error)
123-
})
124-
syslog('error', '%(runner)s: %(error_type)s - list: %(list)s, msg: %(msg_id)s, error: %(error)s',
125-
context)
131+
'error_msg': error_msg,
132+
}
133+
context.update(kwargs)
134+
135+
# Format the error message
136+
msg_parts = ['%s: %s' % (error_type, error_msg)]
137+
if 'msg' in context:
138+
msg_parts.append('Message-ID: %s' % context['msg'].get('message-id', 'unknown'))
139+
if 'listname' in context:
140+
msg_parts.append('List: %s' % context['listname'])
141+
if 'traceback' in context:
142+
msg_parts.append('Traceback:\n%s' % context['traceback'])
143+
144+
# Log the error
145+
syslog('error', ' '.join(msg_parts))
126146

127147
def log_warning(self, warning_type, msg=None, mlist=None, **context):
128148
"""Structured warning logging with context."""
@@ -196,7 +216,7 @@ def _oneloop(self):
196216

197217
# Process the message
198218
try:
199-
self._dopost(msg, msgdata)
219+
self._onefile(msg, msgdata)
200220
except Exception as e:
201221
syslog('error', 'Runner._oneloop: Error processing message %s: %s', filebase, str(e))
202222
continue

0 commit comments

Comments
 (0)