Skip to content

Commit 538b219

Browse files
authored
Merge branch 'main' into revert-2225-revert-2098-bugfix/type-error-version-checking
2 parents 658007b + 46186df commit 538b219

3 files changed

Lines changed: 60 additions & 5 deletions

File tree

docs/reference/kombu.pidbox.rst

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@
7070
.. automethod:: get_reply_queue
7171
.. automethod:: get_queue
7272

73+
Mailbox Options
74+
~~~~~~~~~~~~~~~
75+
76+
.. versionadded:: 5.6.0
77+
78+
The `Mailbox` supports several configuration options that affect
79+
the behavior of its exchanges and queues.
80+
81+
- ``durable``: If True, declares durable exchanges that survive broker restarts.
82+
- ``exclusive``: If True, declares exclusive exchanges (usable by only one connection).
83+
84+
These provide finer control over broker-side behavior and are useful
85+
in production environments where queue durability matters.\
86+
7387
Node
7488
----
7589

@@ -91,4 +105,3 @@
91105
.. automethod:: handle
92106
.. automethod:: handle_message
93107
.. automethod:: reply
94-

kombu/pidbox.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def __init__(self, namespace,
181181
type='direct', connection=None, clock=None,
182182
accept=None, serializer=None, producer_pool=None,
183183
queue_ttl=None, queue_expires=None,
184+
queue_durable=False, queue_exclusive=False,
184185
reply_queue_ttl=None, reply_queue_expires=10.0):
185186
self.namespace = namespace
186187
self.connection = connection
@@ -193,9 +194,16 @@ def __init__(self, namespace,
193194
self.serializer = self.serializer if serializer is None else serializer
194195
self.queue_ttl = queue_ttl
195196
self.queue_expires = queue_expires
197+
self.queue_durable = queue_durable
198+
self.queue_exclusive = queue_exclusive
196199
self.reply_queue_ttl = reply_queue_ttl
197200
self.reply_queue_expires = reply_queue_expires
198201
self._producer_pool = producer_pool
202+
if queue_exclusive and queue_durable:
203+
raise ValueError(
204+
"queue_exclusive and queue_durable cannot both be True "
205+
"(exclusive queues are automatically deleted and cannot be durable).",
206+
)
199207

200208
def __call__(self, connection):
201209
bound = copy(self)
@@ -236,8 +244,9 @@ def get_reply_queue(self):
236244
f'{oid}.{self.reply_exchange.name}',
237245
exchange=self.reply_exchange,
238246
routing_key=oid,
239-
durable=False,
240-
auto_delete=True,
247+
durable=self.queue_durable,
248+
exclusive=self.queue_exclusive,
249+
auto_delete=not self.queue_durable,
241250
expires=self.reply_queue_expires,
242251
message_ttl=self.reply_queue_ttl,
243252
)
@@ -250,8 +259,9 @@ def get_queue(self, hostname):
250259
return Queue(
251260
f'{hostname}.{self.namespace}.pidbox',
252261
exchange=self.exchange,
253-
durable=False,
254-
auto_delete=True,
262+
durable=self.queue_durable,
263+
exclusive=self.queue_exclusive,
264+
auto_delete=not self.queue_durable,
255265
expires=self.queue_expires,
256266
message_ttl=self.queue_ttl,
257267
)

t/unit/test_pidbox.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,38 @@ def get_next(self, consumer):
342342
if m:
343343
return m.payload
344344

345+
def test_mailbox_queue_exclusive(self):
346+
mbox = pidbox.Mailbox(
347+
'flagbox_ex',
348+
queue_exclusive=True,
349+
queue_durable=False,
350+
)(self.connection)
351+
352+
for q in (mbox.get_queue('worker1'), mbox.get_reply_queue()):
353+
assert q.exclusive is True
354+
assert q.durable is False
355+
assert q.auto_delete is True
356+
357+
def test_mailbox_queue_durable(self):
358+
mbox = pidbox.Mailbox(
359+
'flagbox_dur',
360+
queue_exclusive=False,
361+
queue_durable=True,
362+
)(self.connection)
363+
364+
for q in (mbox.get_queue('worker1'), mbox.get_reply_queue()):
365+
assert q.durable is True
366+
assert q.exclusive is False
367+
assert q.auto_delete is False
368+
369+
def test_mailbox_invalid_flag_combo(self):
370+
with pytest.raises(ValueError):
371+
pidbox.Mailbox(
372+
'flagbox_bad',
373+
queue_exclusive=True,
374+
queue_durable=True,
375+
)(self.connection)
376+
345377

346378
GLOBAL_PIDBOX = pidbox.Mailbox('global_unittest_mailbox')
347379

0 commit comments

Comments
 (0)