Skip to content

Commit 107fe28

Browse files
committed
round robin
1 parent 80fdeed commit 107fe28

3 files changed

Lines changed: 53 additions & 13 deletions

File tree

Mailman/Defaults.py.in

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -933,6 +933,11 @@ DEFAULT_RESPOND_TO_POST_REQUESTS = Yes
933933
# BAW: Eventually we may support weighted hash spaces.
934934
# BAW: Although not enforced, the # of slices must be a power of 2
935935

936+
# Distribution method for queue runners: 'hash' (default) or 'round_robin'
937+
# Hash-based distribution ensures same message always goes to same runner
938+
# Round-robin distribution provides more even load distribution
939+
QUEUE_DISTRIBUTION_METHOD = 'hash'
940+
936941
QRUNNERS = [
937942
('ArchRunner', 1), # messages for the archiver
938943
('BounceRunner', 2), # for processing the qfile/bounces directory

Mailman/Queue/Runner.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ def __init__(self, slice=None, numslices=1):
4343
self._kids = {}
4444
# Create our own switchboard. Don't use the switchboard cache because
4545
# we want to provide slice and numslice arguments.
46-
self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
46+
distribution = getattr(mm_cfg, 'QUEUE_DISTRIBUTION_METHOD', 'hash')
47+
self._switchboard = Switchboard(self.QDIR, slice, numslices, True, distribution)
4748
# Create the shunt switchboard
4849
self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
4950
self._stop = False

Mailman/Queue/Switchboard.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@
6565

6666

6767
class Switchboard:
68-
def __init__(self, whichq, slice=None, numslices=1, recover=False):
68+
def __init__(self, whichq, slice=None, numslices=1, recover=False, distribution='hash'):
6969
self.__whichq = whichq
70+
self.__distribution = distribution
7071
# Create the directory if it doesn't yet exist.
7172
# FIXME
7273
omask = os.umask(0) # rwxrws---
@@ -82,8 +83,13 @@ def __init__(self, whichq, slice=None, numslices=1, recover=False):
8283
self.__upper = None
8384
# BAW: test performance and end-cases of this algorithm
8485
if numslices != 1:
85-
self.__lower = (((shamax+1) * slice) / numslices)
86-
self.__upper = ((((shamax+1) * (slice+1)) / numslices)) - 1
86+
if distribution == 'hash':
87+
self.__lower = (((shamax+1) * slice) / numslices)
88+
self.__upper = ((((shamax+1) * (slice+1)) / numslices)) - 1
89+
elif distribution == 'round_robin':
90+
self.__slice = slice
91+
self.__numslices = numslices
92+
# Add more distribution methods here as needed
8793
if recover:
8894
self.recover_backup_files()
8995

@@ -105,7 +111,23 @@ def enqueue(self, _msg, _metadata={}, **_kws):
105111
else:
106112
protocol = 0
107113
msgsave = pickle.dumps(str(_msg), protocol, fix_imports=True)
108-
hashfood = msgsave + listname.encode() + repr(now).encode()
114+
115+
# Choose distribution method
116+
if self.__distribution == 'round_robin':
117+
# Use a simple counter for round-robin distribution
118+
import threading
119+
if not hasattr(self, '_counter'):
120+
self._counter = 0
121+
self._counter_lock = threading.Lock()
122+
123+
with self._counter_lock:
124+
self._counter = (self._counter + 1) % self.__numslices
125+
current_slice = self._counter
126+
hashfood = msgsave + listname.encode() + repr(now).encode() + str(current_slice).encode()
127+
else:
128+
# Default hash-based distribution
129+
hashfood = msgsave + listname.encode() + repr(now).encode()
130+
109131
# Encode the current time into the file name for FIFO sorting in
110132
# files(). The file name consists of two parts separated by a `+':
111133
# the received time for this message (i.e. when it first showed up on
@@ -192,14 +214,26 @@ def files(self, extension='.pck'):
192214
if ext != extension:
193215
continue
194216
when, digest = filebase.split('+')
195-
# Throw out any files which don't match our bitrange. BAW: test
196-
# performance and end-cases of this algorithm. MAS: both
197-
# comparisons need to be <= to get complete range.
198-
if lower is None or (lower <= int(digest, 16) <= upper):
199-
key = float(when)
200-
while key in times:
201-
key += DELTA
202-
times[key] = filebase
217+
218+
# Choose distribution method for file filtering
219+
if self.__distribution == 'round_robin':
220+
# For round-robin, use modulo of digest to determine slice
221+
slice_num = int(digest, 16) % self.__numslices
222+
if slice_num == self.__slice:
223+
key = float(when)
224+
while key in times:
225+
key += DELTA
226+
times[key] = filebase
227+
else:
228+
# Default hash-based distribution
229+
# Throw out any files which don't match our bitrange. BAW: test
230+
# performance and end-cases of this algorithm. MAS: both
231+
# comparisons need to be <= to get complete range.
232+
if lower is None or (lower <= int(digest, 16) <= upper):
233+
key = float(when)
234+
while key in times:
235+
key += DELTA
236+
times[key] = filebase
203237
# FIFO sort
204238
keys = list(times.keys())
205239
keys.sort()

0 commit comments

Comments
 (0)