Skip to content

Commit d16f182

Browse files
committed
fix: kick() ignores routing label when _task_queues is narrowed to 1
When with_queues() is used to narrow a broker to a single queue (e.g. in a worker factory function), kick() hits the `len == 1` shortcut and hardcodes the routing key to that queue's name, silently ignoring any explicit queue_name label on the message. This breaks the common pattern of using a multi-queue broker with per-worker factory functions, where tasks dispatched from inside a running task need to route to a different queue: broker = AioPikaBroker(..., task_queues=[queue_a, queue_b, queue_c]) @broker.task(queue_name="queue_b") async def mail_task(): ... @broker.task # routes to queue_a by default async def default_task(): await mail_task.kiq() # silently routed to queue_a, NOT queue_b def get_worker_a(): return broker.with_queues(queue_a) # narrows _task_queues to 1 Fix: check the routing label first; only fall back to the single-queue shortcut (backward-compatible) when no explicit label is present.
1 parent cf89301 commit d16f182

1 file changed

Lines changed: 28 additions & 13 deletions

File tree

taskiq_aio_pika/broker.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,15 @@ def with_queues(self, *queues: Queue) -> Self:
336336
"""
337337
Replace existing queues with new ones.
338338
339-
:param queues: queues to add.
339+
Commonly used in worker entry-point factory functions to narrow which
340+
queue(s) a specific worker process listens to, while the full broker
341+
(with all queues) is still used for publishing via kick().
342+
343+
Note: this only affects which queues the worker *listens* to.
344+
Routing at kick() time is still determined by the ``queue_name`` label
345+
on the message (or the single-queue fallback for unlabelled tasks).
346+
347+
:param queues: queues to listen to.
340348
:return: self.
341349
"""
342350
self._task_queues = list(queues)
@@ -372,25 +380,32 @@ async def kick(self, message: BrokerMessage) -> None:
372380
)
373381
delay = parse_val(float, message.labels.get("delay"))
374382

375-
if len(self._task_queues) == 1:
376-
routing_key_name = (
377-
self._task_queues[0].routing_key or self._task_queues[0].name
378-
)
379-
else:
380-
routing_key_name = (
381-
parse_val(
382-
str,
383-
message.labels.get(self._label_for_routing),
384-
)
385-
or ""
386-
)
383+
# An explicit routing label always takes precedence, regardless of how
384+
# many queues are in _task_queues. This is important when with_queues()
385+
# has been used to narrow the listening scope to a single queue: without
386+
# this check the len==1 shortcut below would silently override the label
387+
# and route every outgoing message to the worker's own queue.
388+
routing_key_from_label = parse_val(
389+
str,
390+
message.labels.get(self._label_for_routing),
391+
)
392+
if routing_key_from_label:
393+
routing_key_name = routing_key_from_label
387394
if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in {
388395
queue.routing_key or queue.name for queue in self._task_queues
389396
}:
390397
raise IncorrectRoutingKeyError(
391398
f"Routing key '{routing_key_name}' is not valid. "
392399
f"Check routing keys and queue names in broker queues.",
393400
)
401+
elif len(self._task_queues) == 1:
402+
# Backward-compatible shortcut: single-queue brokers don't need to
403+
# annotate every task with an explicit queue_name label.
404+
routing_key_name = (
405+
self._task_queues[0].routing_key or self._task_queues[0].name
406+
)
407+
else:
408+
routing_key_name = ""
394409

395410
if delay is None:
396411
exchange = await self.write_channel.get_exchange(

0 commit comments

Comments
 (0)