-
-
Notifications
You must be signed in to change notification settings - Fork 116
Expand file tree
/
Copy pathshared_broker.py
More file actions
86 lines (64 loc) · 2.41 KB
/
shared_broker.py
File metadata and controls
86 lines (64 loc) · 2.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import sys
from typing import Any, AsyncGenerator, Optional, TypeVar
from taskiq.abc.broker import AsyncBroker
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.exceptions import SharedBrokerListenError, SharedBrokerSendTaskError
from taskiq.kicker import AsyncKicker
from taskiq.message import BrokerMessage
if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec
_ReturnType = TypeVar("_ReturnType")
_Params = ParamSpec("_Params")
class SharedDecoratedTask(AsyncTaskiqDecoratedTask[_Params, _ReturnType]):
"""Decorator that is used with shared broker."""
def kicker(self) -> AsyncKicker[_Params, _ReturnType]:
"""
This method updates getting default kicker.
In this method we want to get default broker from
our shared broker and send task to it, instead
of shared_broker.
:return: new kicker.
"""
broker = getattr(self.broker, "_default_broker", None) or self.broker
return AsyncKicker(
task_name=self.task_name,
broker=broker,
labels=self.labels,
return_type=self.return_type,
)
class AsyncSharedBroker(AsyncBroker):
"""Broker for creating shared tasks."""
def __init__(self) -> None:
super().__init__(None)
self._default_broker: Optional[AsyncBroker] = None
self.decorator_class = SharedDecoratedTask
def default_broker(self, new_broker: AsyncBroker) -> None:
"""
Updates default broker.
:param new_broker: new async broker to kick tasks with.
"""
self._default_broker = new_broker
async def kick(self, message: BrokerMessage) -> None:
"""
Shared broker cannot kick tasks.
:param message: message to send.
:raises TaskiqError: if called.
"""
raise SharedBrokerSendTaskError
async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore
"""
Shared broker cannot listen to tasks.
This method will throw an exception.
:raises TaskiqError: if called.
"""
raise SharedBrokerListenError
def _register_task(
self,
task_name: str,
task: AsyncTaskiqDecoratedTask[Any, Any],
) -> None:
self.global_task_registry[task_name] = task
async_shared_broker = AsyncSharedBroker()
shared_task = async_shared_broker.task