File tree Expand file tree Collapse file tree 1 file changed +48
-1
lines changed
Expand file tree Collapse file tree 1 file changed +48
-1
lines changed Original file line number Diff line number Diff line change @@ -167,9 +167,56 @@ You can add filters by calling `.filter` method of the pipeline.
167167### Group steps
168168
169169This step groups together multiple tasks and sends them after the previous steps.
170+ All tasks will be executed in parallel to each other and collected to a single tuple.
170171
171172To create a group you need to use ` Group ` class from ` taskiq_pipelienes ` like this:
172173
173- ```
174+ ``` python
175+ import asyncio
176+ from typing import Any
177+
178+ from taskiq.brokers.inmemory_broker import InMemoryBroker
179+
180+ from taskiq_pipelines import Group, Pipeline, PipelineMiddleware
181+
182+ broker = InMemoryBroker()
183+ broker.add_middlewares(PipelineMiddleware())
184+
185+
186+ @broker.task
187+ def add_one (value : int ) -> int :
188+ return value + 1
189+
190+
191+ @broker.task
192+ def mul_two (val : int ) -> int :
193+ return val * 2
194+
195+
196+ @broker.task
197+ def to_string (val : Any) -> str :
198+ return str (val)
199+
200+
201+ async def main ():
202+ pipe = (
203+ Pipeline(broker).group(
204+ Group()
205+ # Here we start task that adds 1 to 1
206+ .add(add_one, 1 )
207+ # Here's a task that multiplies 2 by 2
208+ .add(mul_two, 2 )
209+ # Here we map all results to string
210+ ).map(to_string)
211+ )
212+ task = await pipe.kiq()
213+ result = await task.wait_result()
214+ # Here it should output
215+ # Calculated value: ['2', '4']
216+ print (" Calculated value:" , result.return_value)
217+
218+
219+ if __name__ == " __main__" :
220+ asyncio.run(main())
174221
175222```
You can’t perform that action at this time.
0 commit comments