11from dataclasses import asdict , is_dataclass
22from logging import getLogger
3- from typing import (
3+ from typing import ( # noqa: WPS235
44 TYPE_CHECKING ,
55 Any ,
66 Coroutine ,
77 Dict ,
88 Generic ,
9+ Optional ,
910 TypeVar ,
1011 Union ,
1112 overload ,
1415from pydantic import BaseModel
1516from typing_extensions import ParamSpec
1617
18+ from taskiq .abc .middleware import TaskiqMiddleware
1719from taskiq .exceptions import SendTaskError
1820from taskiq .message import TaskiqMessage
1921from taskiq .task import AsyncTaskiqTask , SyncTaskiqTask
@@ -41,6 +43,7 @@ def __init__(
4143 self .task_name = task_name
4244 self .broker = broker
4345 self .labels = labels
46+ self .custom_task_id : Optional [str ] = None
4447
4548 def with_labels (
4649 self ,
@@ -55,6 +58,19 @@ def with_labels(
5558 self .labels .update (labels )
5659 return self
5760
61+ def with_task_id (self , task_id : str ) -> "AsyncKicker[_FuncParams, _ReturnType]" :
62+ """
63+ Set task_id for current execution.
64+
65+ Please use this method with caution,
66+ because it may brake the logic of getting results.
67+
68+ :param task_id: custom task id.
69+ :return: kicker with custom task id.
70+ """
71+ self .custom_task_id = task_id
72+ return self
73+
5874 def with_broker (
5975 self ,
6076 broker : "AsyncBroker" ,
@@ -87,7 +103,7 @@ async def kiq( # noqa: D102
87103 ) -> AsyncTaskiqTask [_ReturnType ]:
88104 ...
89105
90- async def kiq (
106+ async def kiq ( # noqa: C901
91107 self ,
92108 * args : _FuncParams .args ,
93109 ** kwargs : _FuncParams .kwargs ,
@@ -110,15 +126,16 @@ async def kiq(
110126 )
111127 message = self ._prepare_message (* args , ** kwargs )
112128 for middleware in self .broker .middlewares :
113- message = await maybe_awaitable ( middleware .pre_send ( message ))
114-
129+ if middleware . __class__ . pre_send != TaskiqMiddleware .pre_send :
130+ message = await maybe_awaitable ( middleware . pre_send ( message ))
115131 try :
116132 await self .broker .kick (self .broker .formatter .dumps (message ))
117133 except Exception as exc :
118134 raise SendTaskError () from exc
119135
120136 for middleware in self .broker .middlewares :
121- await maybe_awaitable (middleware .post_send (message ))
137+ if middleware .__class__ .post_send != TaskiqMiddleware .post_send :
138+ await maybe_awaitable (middleware .post_send (message ))
122139
123140 return AsyncTaskiqTask (
124141 task_id = message .task_id ,
@@ -198,8 +215,12 @@ def _prepare_message( # noqa: WPS210
198215 for label , label_val in self .labels .items ():
199216 labels [label ] = str (label_val )
200217
218+ task_id = self .custom_task_id
219+ if task_id is None :
220+ task_id = self .broker .id_generator ()
221+
201222 return TaskiqMessage (
202- task_id = self . broker . id_generator () ,
223+ task_id = task_id ,
203224 task_name = self .task_name ,
204225 labels = labels ,
205226 args = formatted_args ,
0 commit comments