Skip to content

Commit 4f90e77

Browse files
committed
feat: do not create delay queue by default
1 parent 0ccd7cc commit 4f90e77

File tree

7 files changed

+150
-229
lines changed

7 files changed

+150
-229
lines changed

.pre-commit-config.yaml

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
# See https://pre-commit.com for more information
2-
# See https://pre-commit.com/hooks.html for more hooks
31
repos:
4-
- repo: https://github.com/pre-commit/pre-commit-hooks
5-
rev: v6.0.0
2+
- repo: builtin
63
hooks:
7-
- id: check-ast
84
- id: trailing-whitespace
95
- id: check-toml
106
- id: end-of-file-fixer
@@ -47,12 +43,13 @@ repos:
4743
- "taskiq_aio_pika"
4844
- "tests"
4945

50-
- id: mypy
51-
name: Validate types with MyPy
52-
entry: uv run mypy
46+
- id: ty
47+
name: Validate types with ty
48+
entry: uv run ty
5349
language: system
5450
pass_filenames: false
5551
types: [python]
5652
args:
53+
- "check"
5754
- ./taskiq_aio_pika
5855
- ./tests

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,18 @@ async def test() -> None:
2828

2929
### Default delays
3030

31-
To send delayed message, you have to specify delay label. You can do it with `task` decorator, or by using kicker.
31+
To send delayed messagem, you need to specify queue for delayed messages. You can do it by passing `delay_queue` parameter to the broker. For example:
32+
33+
```python
34+
from taskiq_aio_pika import AioPikaBroker, Queue, QueueType
35+
36+
broker = AioPikaBroker(
37+
...,
38+
delay_queue=Queue(name="taskiq.delay_queue"),
39+
)
40+
```
41+
42+
After that you have to specify delay label. You can do it with `task` decorator, or by using kicker.
3243

3344
In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example:
3445

pyproject.toml

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "taskiq-aio-pika"
3-
version = "0.0.0"
3+
version = "0.6.0"
44
description = "RabbitMQ broker for taskiq"
55
authors = [
66
{name = "Pavel Kirilin", email = "win10@list.ru"}
@@ -37,36 +37,30 @@ dependencies = [
3737

3838
[dependency-groups]
3939
dev = [
40-
"pre-commit>=4.4.0",
41-
# lint
40+
"prek>=0.3.0",
41+
{include-group = "test"},
42+
{include-group = "lint"},
43+
{include-group = "typecheck"},
44+
{include-group = "examples"},
45+
]
46+
lint = [
4247
"ruff>=0.14.5",
4348
"black>=25.11.0",
44-
# type check
45-
"mypy>=1.18.2",
46-
# tests
49+
]
50+
test = [
4751
"pytest>=9.0.1",
4852
"pytest-cov>=7.0.0",
4953
"coverage>=7.11.3",
5054
"pytest-xdist[psutil]>=3.8.0",
5155
"anyio>=4.11.0",
52-
{include-group = "examples"},
56+
]
57+
typecheck = [
58+
"ty>=0.0.13",
5359
]
5460
examples = [
5561
"taskiq-redis>=1.1.2",
5662
]
5763

58-
[tool.mypy]
59-
python_version = "3.10"
60-
strict = true
61-
ignore_missing_imports = true
62-
allow_subclassing_any = true
63-
allow_untyped_calls = true
64-
pretty = true
65-
show_error_codes = true
66-
implicit_reexport = true
67-
allow_untyped_decorators = true
68-
warn_return_any = false
69-
7064
[build-system]
7165
requires = ["uv_build>=0.9.9,<0.10.0"]
7266
build-backend = "uv_build"

taskiq_aio_pika/broker.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def __init__(
102102
self._label_for_routing = label_for_routing
103103
self._label_for_priority = label_for_priority
104104

105-
self._delay_queue = delay_queue or Queue(name="taskiq.delay")
105+
self._delay_queue = delay_queue
106106

107107
self._delayed_message_exchange_plugin = delayed_message_exchange_plugin
108108
if self._delayed_message_exchange_plugin:
@@ -261,15 +261,15 @@ async def _declare_queues(
261261
self._task_queues.append(Queue())
262262

263263
queues = self._task_queues.copy()
264-
if not self._delayed_message_exchange_plugin:
264+
if not self._delayed_message_exchange_plugin and self._delay_queue:
265265
queues.append(self._delay_queue)
266266

267267
for queue in filter(lambda queue: queue.declare, queues):
268268
per_queue_arguments: FieldTable = queue_default_arguments.copy()
269269
if queue.max_priority is not None:
270270
per_queue_arguments["x-max-priority"] = queue.max_priority
271271
per_queue_arguments["x-queue-type"] = queue.type.value
272-
if queue.name == self._delay_queue.name:
272+
if self._delay_queue and queue.name == self._delay_queue.name:
273273
per_queue_arguments["x-dead-letter-exchange"] = self._exchange.name
274274
per_queue_arguments["x-dead-letter-routing-key"] = (
275275
self._delay_queue.routing_key
@@ -292,7 +292,7 @@ async def _declare_queues(
292292
"Bind queue to exchange with routing key '%s'",
293293
queue.routing_key or queue.name,
294294
)
295-
if queue.name != self._delay_queue.name:
295+
if not self._delay_queue or queue.name != self._delay_queue.name:
296296
await declared_queue.bind(
297297
exchange=self._exchange.name,
298298
routing_key=queue.routing_key or queue.name,
@@ -401,12 +401,17 @@ async def kick(self, message: BrokerMessage) -> None:
401401
self._delayed_message_exchange.name,
402402
)
403403
await exchange.publish(rmq_message, routing_key=routing_key_name)
404-
else:
404+
elif self._delay_queue:
405405
rmq_message.expiration = timedelta(seconds=delay)
406406
await self.write_channel.default_exchange.publish(
407407
rmq_message,
408408
routing_key=self._delay_queue.routing_key or self._delay_queue.name,
409409
)
410+
else:
411+
raise IncorrectRoutingKeyError(
412+
"Delay requested but no delay queue or delayed-message-exchange "
413+
"is configured in the broker.",
414+
)
410415

411416
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
412417
"""
@@ -442,7 +447,7 @@ async def body(
442447
*[
443448
body(queue, consumer_args)
444449
for queue, consumer_args in queue_with_consumer_args_list
445-
if queue.name != self._delay_queue.name
450+
if not self._delay_queue or queue.name != self._delay_queue.name
446451
],
447452
)
448453

tests/test_routing.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ async def cleanup_class_broker(self, amqp_url: str) -> AsyncGenerator[None, None
2323
yield
2424
if self.broker is not None:
2525
await self.broker.shutdown()
26+
queue_names = [queue.name for queue in self.broker._task_queues] + [
27+
self.broker._dead_letter_queue.name,
28+
]
29+
if self.broker._delay_queue is not None:
30+
queue_names.append(self.broker._delay_queue.name)
2631
await _cleanup_amqp_resources(
2732
amqp_url,
2833
[self.broker._exchange.name],
29-
[queue.name for queue in self.broker._task_queues]
30-
+ [
31-
self.broker._dead_letter_queue.name,
32-
self.broker._delay_queue.name,
33-
],
34+
queue_names,
3435
)
3536

3637
async def test_when_message_has_wrong_format__then_message_still_can_be_received(

tests/test_startup.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ async def cleanup_class_broker(self, amqp_url: str) -> AsyncGenerator[None, None
2020
yield
2121
if self.broker is not None:
2222
await self.broker.shutdown()
23+
queue_names = [queue.name for queue in self.broker._task_queues] + [
24+
self.broker._dead_letter_queue.name,
25+
]
26+
if self.broker._delay_queue is not None:
27+
queue_names.append(self.broker._delay_queue.name)
2328
await _cleanup_amqp_resources(
2429
amqp_url,
2530
[self.broker._exchange.name],
26-
[queue.name for queue in self.broker._task_queues]
27-
+ [
28-
self.broker._dead_letter_queue.name,
29-
self.broker._delay_queue.name,
30-
],
31+
queue_names,
3132
)
3233

3334
async def test_when_declare_flag_passed_to_queue__broker_declare_queue_on_startup(
@@ -227,3 +228,26 @@ async def test_when_delayed_message_exchange_plugin_enabled_and_custom_exchange_
227228
match=f"Exchange '{delayed_message_exchange_name}' was not declared and does not exist.",
228229
):
229230
await self.broker.startup()
231+
232+
async def test_when_delay_queue_not_specified__broker_does_not_create_delay_queue(
233+
self,
234+
amqp_url: str,
235+
test_channel: Channel,
236+
exchange_name: str,
237+
) -> None:
238+
# given
239+
self.broker = AioPikaBroker(
240+
url=amqp_url,
241+
exchange=Exchange(
242+
name=exchange_name,
243+
declare=True,
244+
),
245+
)
246+
247+
# when
248+
await self.broker.startup()
249+
250+
# then
251+
assert self.broker._delay_queue is None
252+
with pytest.raises(aiormq.exceptions.ChannelNotFoundEntity):
253+
await test_channel.get_queue("taskiq.delay", ensure=True)

0 commit comments

Comments
 (0)