Skip to content

Commit 8f97d38

Browse files
committed
Add queue class
1 parent 0b53745 commit 8f97d38

File tree

12 files changed

+88
-0
lines changed

12 files changed

+88
-0
lines changed

taskiq/formatters/json_formatter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
1616
return BrokerMessage(
1717
task_id=message.task_id,
1818
task_name=message.task_name,
19+
queue=message.queue,
1920
message=model_dump_json(message).encode(),
2021
labels=message.labels,
2122
)

taskiq/formatters/proxy_formatter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
2424
return BrokerMessage(
2525
task_id=message.task_id,
2626
task_name=message.task_name,
27+
queue=message.queue,
2728
message=self.broker.serializer.dumpb(model_dump(message)),
2829
labels=message.labels,
2930
)

taskiq/kicker.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from taskiq.exceptions import SendTaskError
2424
from taskiq.labels import prepare_label
2525
from taskiq.message import TaskiqMessage
26+
from taskiq.queue import DEFAULT_QUEUE, Queue
2627
from taskiq.scheduler.created_schedule import CreatedSchedule
2728
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
2829
from taskiq.task import AsyncTaskiqTask
@@ -47,10 +48,12 @@ def __init__(
4748
task_name: str,
4849
broker: "AsyncBroker",
4950
labels: Dict[str, Any],
51+
queue: Union["Queue", str] = DEFAULT_QUEUE,
5052
return_type: Optional[Type[_ReturnType]] = None,
5153
) -> None:
5254
self.task_name = task_name
5355
self.broker = broker
56+
self.queue = Queue(queue)
5457
self.labels = labels
5558
self.custom_task_id: Optional[str] = None
5659
self.custom_schedule_id: Optional[str] = None
@@ -111,6 +114,19 @@ def with_broker(
111114
self.broker = broker
112115
return self
113116

117+
def with_queue(
118+
self,
119+
queue: Union["Queue", str],
120+
) -> "AsyncKicker[_FuncParams, _ReturnType]":
121+
"""
122+
Replace queue for the function.
123+
124+
:param queue: new queue instance or name.
125+
:return: Kicker with new queue.
126+
"""
127+
self.queue = Queue(queue)
128+
return self
129+
114130
@overload
115131
async def kiq(
116132
self: "AsyncKicker[_FuncParams, CoroutineType[Any, Any, _T]]",
@@ -296,6 +312,7 @@ def _prepare_message(
296312
return TaskiqMessage(
297313
task_id=task_id,
298314
task_name=self.task_name,
315+
queue=self.queue.name,
299316
labels=labels,
300317
labels_types=labels_types,
301318
args=formatted_args,

taskiq/message.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class TaskiqMessage(BaseModel):
1616

1717
task_id: str
1818
task_name: str
19+
queue: str
1920
labels: Dict[str, Any]
2021
labels_types: Optional[Dict[str, int]] = None
2122
args: List[Any]
@@ -40,5 +41,6 @@ class BrokerMessage(BaseModel):
4041

4142
task_id: str
4243
task_name: str
44+
queue: str
4345
message: bytes
4446
labels: Dict[str, Any]

taskiq/queue.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from __future__ import annotations
2+
3+
import dataclasses
4+
5+
DEFAULT_QUEUE = "taksiq"
6+
7+
8+
@dataclasses.dataclass(frozen=True, init=False, eq=True)
9+
class Queue:
10+
"""Represents an abstraction for dealing with queues in real brokers."""
11+
12+
name: str
13+
14+
def __init__(self, src: str | Queue) -> None:
15+
if isinstance(src, Queue):
16+
object.__setattr__(self, "name", src.name)
17+
elif isinstance(src, str):
18+
object.__setattr__(self, "name", src)
19+
else:
20+
raise TypeError(
21+
"Queue.__init__ expect str or Queue, "
22+
"{type(src).__name__!r} is recieved",
23+
)
24+
25+
def __repr__(self) -> str:
26+
return self.name

tests/cli/worker/test_parameters_parsing.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def test_parse_params_no_signature() -> None:
2525
task_id="",
2626
task_name="",
2727
labels={},
28+
queue="taksiq",
2829
args=[1, 2],
2930
kwargs={"a": 1},
3031
)
@@ -49,6 +50,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
4950
task_id="",
5051
task_name="",
5152
labels={},
53+
queue="taksiq",
5254
args=[{"field": "test_val"}],
5355
kwargs={},
5456
)
@@ -66,6 +68,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
6668
task_id="",
6769
task_name="",
6870
labels={},
71+
queue="taksiq",
6972
args=[],
7073
kwargs={"param": {"field": "test_val"}},
7174
)
@@ -91,6 +94,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
9194
task_id="",
9295
task_name="",
9396
labels={},
97+
queue="taksiq",
9498
args=[{"unknown": "unknown"}],
9599
kwargs={},
96100
)
@@ -107,6 +111,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
107111
task_id="",
108112
task_name="",
109113
labels={},
114+
queue="taksiq",
110115
args=[],
111116
kwargs={"param": {"unknown": "unknown"}},
112117
)
@@ -130,6 +135,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
130135
msg_with_args = TaskiqMessage(
131136
task_id="",
132137
task_name="",
138+
queue="taksiq",
133139
labels={},
134140
args=[None],
135141
kwargs={},
@@ -142,6 +148,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
142148
msg_with_kwargs = TaskiqMessage(
143149
task_id="",
144150
task_name="",
151+
queue="taksiq",
145152
labels={},
146153
args=[],
147154
kwargs={"param": None},

tests/depends/test_progress_tracker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def get_message(
5151
task_id=task_id or task.broker.id_generator(),
5252
task_name=task.task_name,
5353
labels=labels,
54+
queue="taksiq",
5455
args=list(args),
5556
kwargs=kwargs,
5657
)

tests/formatters/test_json_formatter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ async def test_json_dumps() -> None:
1212
msg = TaskiqMessage(
1313
task_id="task-id",
1414
task_name="task.name",
15+
queue="taksiq",
1516
labels={"label1": 1, "label2": "text"},
1617
args=[1, "a"],
1718
kwargs={"p1": "v1"},
1819
)
1920
expected = BrokerMessage(
2021
task_id="task-id",
2122
task_name="task.name",
23+
queue="taksiq",
2224
message=(
2325
b'{"task_id":"task-id","task_name":"task.name",'
2426
b'"labels":{"label1":1,"label2":"text"},'
@@ -46,6 +48,7 @@ async def test_json_loads() -> None:
4648
task_id="task-id",
4749
task_name="task.name",
4850
labels={"label1": 1, "label2": "text"},
51+
queue="taksiq",
4952
args=[1, "a"],
5053
kwargs={"p1": "v1"},
5154
)

tests/formatters/test_proxy_formatter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ async def test_proxy_dumps() -> None:
1111
msg = TaskiqMessage(
1212
task_id="task-id",
1313
task_name="task.name",
14+
queue="taksiq",
1415
labels={"label1": 1, "label2": "text"},
1516
args=[1, "a"],
1617
kwargs={"p1": "v1"},
1718
)
1819
expected = BrokerMessage(
1920
task_id="task-id",
2021
task_name="task.name",
22+
queue="taskiq",
2123
message=(
2224
b'{"task_id": "task-id", "task_name": "task.name", '
2325
b'"labels": {"label1": 1, "label2": "text"}, '
@@ -41,6 +43,7 @@ async def test_proxy_loads() -> None:
4143
expected = TaskiqMessage(
4244
task_id="task-id",
4345
task_name="task.name",
46+
queue="taskiq",
4447
labels={"label1": 1, "label2": "text"},
4548
args=[1, "a"],
4649
kwargs={"p1": "v1"},

tests/middlewares/test_simple_retry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ async def test_successful_retry(broker: AsyncMock) -> None:
2525
TaskiqMessage(
2626
task_id="test_id",
2727
task_name="meme",
28+
queue="taskiq",
2829
labels={
2930
"retry_on_error": "True",
3031
},
@@ -47,6 +48,7 @@ async def test_no_retry(broker: AsyncMock) -> None:
4748
TaskiqMessage(
4849
task_id="test_id",
4950
task_name="meme",
51+
queue="taskiq",
5052
labels={},
5153
args=[],
5254
kwargs={},
@@ -65,6 +67,7 @@ async def test_max_retries(broker: AsyncMock) -> None:
6567
TaskiqMessage(
6668
task_id="test_id",
6769
task_name="meme",
70+
queue="taskiq",
6871
labels={
6972
"retry_on_error": "True",
7073
"_retries": "2",

0 commit comments

Comments
 (0)