Skip to content

Commit cf89301

Browse files
authored
Merge pull request #48 from taskiq-python/multique-support
2 parents ed4a0f9 + d7c5d82 commit cf89301

22 files changed

+1437
-753
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
lint:
1818
strategy:
1919
matrix:
20-
cmd: ["black", "ruff", "mypy"]
20+
cmd: ["ruff", "ty"]
2121
runs-on: ubuntu-latest
2222
steps:
2323
- uses: actions/checkout@v5
@@ -33,7 +33,7 @@ jobs:
3333
- name: Install deps
3434
run: uv sync --all-extras
3535
- name: Run lint check
36-
run: uv run pre-commit run -a ${{ matrix.cmd }}
36+
run: uv run prek run -a ${{ matrix.cmd }}
3737
pytest:
3838
services:
3939
rabbit:

.pre-commit-config.yaml

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
# See https://pre-commit.com for more information
2-
# See https://pre-commit.com/hooks.html for more hooks
31
repos:
4-
- repo: https://github.com/pre-commit/pre-commit-hooks
5-
rev: v6.0.0
2+
- repo: builtin
63
hooks:
7-
- id: check-ast
84
- id: trailing-whitespace
95
- id: check-toml
106
- id: end-of-file-fixer
@@ -47,12 +43,13 @@ repos:
4743
- "taskiq_aio_pika"
4844
- "tests"
4945

50-
- id: mypy
51-
name: Validate types with MyPy
52-
entry: uv run mypy
46+
- id: ty
47+
name: Validate types with ty
48+
entry: uv run ty
5349
language: system
5450
pass_filenames: false
5551
types: [python]
5652
args:
53+
- "check"
5754
- ./taskiq_aio_pika
5855
- ./tests

Makefile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
.DEFAULT:
2+
@echo "No such command (or you pass two or many targets to ). List of possible commands: make help"
3+
4+
.DEFAULT_GOAL := help
5+
6+
##@ Local development
7+
8+
.PHONY: help
9+
help: ## Show this help
10+
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target> <arg=value>\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m %s\033[0m\n\n", substr($$0, 5) } ' $(MAKEFILE_LIST)
11+
12+
.PHONY: clear_rabbit
13+
clear_rabbit: ## Clear RabbitMQ data volume and restart container
14+
@docker stop taskiq_aio_pika_rabbitmq && docker rm taskiq_aio_pika_rabbitmq && docker volume rm taskiq-aio-pika_rabbitmq_data && docker compose up -d

README.md

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ This library provides you with aio-pika broker for taskiq.
99
Features:
1010
- Supports delayed messages using dead-letter queues or RabbitMQ delayed message exchange plugin.
1111
- Supports message priorities.
12+
- Supports multiple queues and custom routing.
1213

1314
Usage example:
1415

1516
```python
1617
from taskiq_aio_pika import AioPikaBroker
1718

18-
broker = AioPikaBroker()
19+
broker = AioPikaBroker(...)
1920

2021
@broker.task
2122
async def test() -> None:
@@ -27,12 +28,23 @@ async def test() -> None:
2728

2829
### Default delays
2930

30-
To send delayed message, you have to specify delay label. You can do it with `task` decorator, or by using kicker.
31+
To send delayed message, you need to specify queue for delayed messages. You can do it by passing `delay_queue` parameter to the broker. For example:
32+
33+
```python
34+
from taskiq_aio_pika import AioPikaBroker, Queue, QueueType
35+
36+
broker = AioPikaBroker(
37+
...,
38+
delay_queue=Queue(name="taskiq.delay_queue"),
39+
)
40+
```
41+
42+
After that you have to specify delay label. You can do it with `task` decorator, or by using kicker.
3143

3244
In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example:
3345

3446
```python
35-
broker = AioPikaBroker()
47+
broker = AioPikaBroker(...)
3648

3749
@broker.task(delay=3)
3850
async def delayed_task() -> int:
@@ -86,13 +98,12 @@ async def main():
8698
## Priorities
8799

88100
You can define priorities for messages using `priority` label. Messages with higher priorities are delivered faster.
89-
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init. This parameter sets maximum priority for the queue and declares it as the priority queue.
90101

91102
Before doing so please read the [documentation](https://www.rabbitmq.com/priority.html#behaviour) about what
92103
downsides you get by using prioritized queues.
93104

94105
```python
95-
broker = AioPikaBroker(max_priority=10)
106+
broker = AioPikaBroker(...)
96107

97108
# We can define default priority for tasks.
98109
@broker.task(priority=2)
@@ -111,42 +122,43 @@ async def main():
111122
await prio_task.kicker().with_labels(priority=None).kiq()
112123
```
113124

114-
## Configuration
115-
116-
AioPikaBroker parameters:
125+
## Custom Queue and Exchange arguments
117126

118-
* `url` - url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.
119-
* `result_backend` - custom result backend.
120-
* `task_id_generator` - custom task_id genertaor.
121-
* `exchange_name` - name of exchange that used to send messages.
122-
* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.
123-
* `queue_name` - queue that used to get incoming messages.
124-
* `routing_key` - that used to bind that queue to the exchange.
125-
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
126-
* `max_priority` - maximum priority for messages.
127-
* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
128-
* `dead_letter_queue_name` - custom dead letter queue name.
129-
This queue is used to receive negatively acknowledged messages from the main queue.
130-
* `qos` - number of messages that worker can prefetch.
131-
* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistence.
132-
* `declare_queues_kwargs` - see [Custom Queue Arguments](#custom-queue-arguments) for more details.
133-
134-
## Custom Queue Arguments
135-
136-
You can pass custom arguments to the underlying RabbitMQ queue declaration by using the `declare_queues_kwargs` parameter of `AioPikaBroker`. If you want to set specific queue arguments (such as RabbitMQ extensions or custom behaviors), provide them in the `arguments` dictionary inside `declare_queues_kwargs`.
127+
You can pass custom arguments to the underlying RabbitMQ queues and exchange declaration by using the `Queue`/`Exchange` classes from `taskiq_aio_pika`. If you used `faststream` before you are probably familiar with this concept.
137128

138129
These arguments will be merged with the default arguments used by the broker
139130
(such as dead-lettering and priority settings). If there are any conflicts, the values you provide will take precedence over the broker's defaults. Example:
140131

141132
```python
133+
from taskiq_aio_pika import AioPikaBroker, Queue, QueueType, Exchange
134+
from aio_pika.abc import ExchangeType
135+
142136
broker = AioPikaBroker(
143-
declare_queues_kwargs={
144-
"arguments": {
145-
"x-message-ttl": 60000, # Set message TTL to 60 seconds
146-
"x-queue-type": "quorum", # Use quorum queue type
147-
}
148-
}
137+
exchange=Exchange(
138+
name="custom_exchange",
139+
type=ExchangeType.TOPIC,
140+
declare=True,
141+
durable=True,
142+
auto_delete=False,
143+
),
144+
task_queues=[
145+
Queue(
146+
name="custom_queue",
147+
type=QueueType.CLASSIC,
148+
declare=True,
149+
durable=True,
150+
max_priority=10,
151+
routing_key="custom_queue",
152+
)
153+
]
149154
)
150155
```
151156

152157
This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.
158+
159+
160+
## Multiqueue support
161+
162+
You can define multiple queues for your tasks. Each queue can have its own routing key and other settings. And your workers can listen to multiple queues (or specific queue) as well.
163+
164+
You can check [multiqueue usage example](./examples/topic_with_two_queues.py) in examples folder for more details.

docker-compose.yaml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ services:
22
rabbitmq:
33
container_name: taskiq_aio_pika_rabbitmq
44
image: heidiks/rabbitmq-delayed-message-exchange:latest
5+
# image: rabbitmq:3.13.7-management # rabbit with management UI for debugging
56
environment:
67
RABBITMQ_DEFAULT_USER: "guest"
78
RABBITMQ_DEFAULT_PASS: "guest"
@@ -14,4 +15,13 @@ services:
1415
ports:
1516
- "5672:5672"
1617
- "15672:15672"
17-
- "61613:61613"
18+
volumes:
19+
- rabbitmq_data:/var/lib/rabbitmq
20+
redis:
21+
container_name: taskiq_aio_pika_redis
22+
image: redis:latest
23+
ports:
24+
- "6379:6379"
25+
26+
volumes:
27+
rabbitmq_data:

examples/basic.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""
2+
Basic example of using Taskiq with AioPika broker.
3+
4+
How to run:
5+
1. Run worker: taskiq worker examples.basic:broker -w 1
6+
2. Run broker: uv run examples/basic.py
7+
"""
8+
9+
import asyncio
10+
11+
from taskiq_redis import RedisAsyncResultBackend
12+
13+
from taskiq_aio_pika import AioPikaBroker
14+
15+
broker = AioPikaBroker(
16+
"amqp://guest:guest@localhost:5672/",
17+
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))
18+
19+
20+
@broker.task
21+
async def add_one(value: int) -> int:
22+
return value + 1
23+
24+
25+
async def main() -> None:
26+
await broker.startup()
27+
# Send the task to the broker.
28+
task = await add_one.kiq(1)
29+
# Wait for the result.
30+
result = await task.wait_result(timeout=2)
31+
print(f"Task execution took: {result.execution_time} seconds.")
32+
if not result.is_err:
33+
print(f"Returned value: {result.return_value}")
34+
else:
35+
print("Error found while executing task.")
36+
await broker.shutdown()
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

examples/delayed_task.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""
2+
Example of delayed task execution using Taskiq with AioPika broker.
3+
4+
How to run:
5+
1. Run worker: taskiq worker examples.delayed_task:broker -w 1
6+
2. Run broker: uv run examples/delayed_task.py
7+
"""
8+
9+
import asyncio
10+
11+
from taskiq_redis import RedisAsyncResultBackend
12+
13+
from taskiq_aio_pika import AioPikaBroker
14+
15+
broker = AioPikaBroker(
16+
"amqp://guest:guest@localhost:5672/",
17+
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))
18+
19+
20+
@broker.task
21+
async def add_one(value: int) -> int:
22+
return value + 1
23+
24+
25+
async def main() -> None:
26+
await broker.startup()
27+
# Send the task to the broker.
28+
task = await add_one.kicker().with_labels(delay=2).kiq(1)
29+
print("Task sent with 2 seconds delay.")
30+
# Wait for the result.
31+
result = await task.wait_result(timeout=3)
32+
print(f"Task execution took: {result.execution_time} seconds.")
33+
if not result.is_err:
34+
print(f"Returned value: {result.return_value}")
35+
else:
36+
print("Error found while executing task.")
37+
await broker.shutdown()
38+
39+
40+
if __name__ == "__main__":
41+
asyncio.run(main())

examples/topic_with_two_queues.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""
2+
Example with two queues for different workers and one topic exchange.
3+
4+
It can be useful when you want to have two worker
5+
6+
How to run:
7+
1. Run worker for queue_1: taskiq worker examples.topic_with_two_queues:get_broker_for_queue_1 -w 1
8+
2. Run worker for queue_2: taskiq worker examples.topic_with_two_queues:get_broker_for_queue_2 -w 1
9+
3. Run broker to send a task: uv run examples/topic_with_two_queues.py --queue 1
10+
4. Optionally run broker to send a task to other queue: uv run examples/topic_with_two_queues.py --queue 2
11+
"""
12+
13+
import argparse
14+
import asyncio
15+
import uuid
16+
17+
from aio_pika.abc import ExchangeType
18+
from taskiq_redis import RedisAsyncResultBackend
19+
20+
from taskiq_aio_pika import AioPikaBroker, Exchange, Queue, QueueType
21+
22+
broker = AioPikaBroker(
23+
"amqp://guest:guest@localhost:5672/",
24+
exchange=Exchange(
25+
name="topic_exchange",
26+
type=ExchangeType.TOPIC,
27+
),
28+
delay_queue=Queue(
29+
name="taskiq.delay",
30+
routing_key="queue1",
31+
), # send delayed messages to queue1
32+
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))
33+
34+
35+
@broker.task
36+
async def add_one(value: int) -> int:
37+
return value + 1
38+
39+
40+
queue_1 = Queue(
41+
name="queue1",
42+
type=QueueType.CLASSIC,
43+
durable=False,
44+
)
45+
queue_2 = Queue(
46+
name="queue2",
47+
type=QueueType.CLASSIC,
48+
durable=False,
49+
)
50+
51+
52+
def get_broker_for_queue_1() -> AioPikaBroker:
53+
print("This broker will listen to queue1")
54+
return broker.with_queue(queue_1)
55+
56+
57+
def get_broker_for_queue_2() -> AioPikaBroker:
58+
print("This broker will listen to queue2")
59+
return broker.with_queue(queue_2)
60+
61+
62+
async def main() -> None:
63+
parser = argparse.ArgumentParser()
64+
parser.add_argument(
65+
"--queue",
66+
choices=["1", "2"],
67+
required=True,
68+
help="Queue to send the task to.",
69+
)
70+
args = parser.parse_args()
71+
72+
queue_name = queue_1.name if args.queue == "1" else queue_2.name
73+
74+
broker.with_queues(
75+
queue_1,
76+
queue_2,
77+
) # declare both queues to know about them during publishing
78+
await broker.startup()
79+
80+
task = (
81+
await add_one.kicker()
82+
.with_labels(queue_name=queue_name) # or it can be routing_key from queue_1
83+
.with_task_id(uuid.uuid4().hex)
84+
.kiq(2)
85+
)
86+
result = await task.wait_result(timeout=2)
87+
print(f"Task execution took: {result.execution_time} seconds.")
88+
if not result.is_err:
89+
print(f"Returned value: {result.return_value}")
90+
else:
91+
print("Error found while executing task.")
92+
await broker.shutdown()
93+
94+
95+
if __name__ == "__main__":
96+
asyncio.run(main())

0 commit comments

Comments
 (0)