Skip to content

Commit 5a941fd

Browse files
committed
Fixed a bug, added docs.
1 parent b89b8bc commit 5a941fd

File tree

3 files changed

+67
-3
lines changed

3 files changed

+67
-3
lines changed

README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,3 +227,66 @@ if __name__ == "__main__":
227227
asyncio.run(main())
228228

229229
```
230+
231+
Aletrnatively, you can use `GroupWithArgs` class to pass arguments of previous step
232+
as an argument to called functions. Here's an example:
233+
234+
```python
235+
import asyncio
236+
from typing import Any
237+
238+
from taskiq.brokers.inmemory_broker import InMemoryBroker
239+
240+
from taskiq_pipelines import GroupWithArgs, Pipeline, PipelineMiddleware
241+
242+
broker = InMemoryBroker()
243+
broker.add_middlewares(PipelineMiddleware())
244+
245+
246+
@broker.task
247+
def add_one(value: int) -> int:
248+
return value + 1
249+
250+
251+
@broker.task
252+
def mul_two(val: int) -> int:
253+
return val * 2
254+
255+
256+
@broker.task
257+
def to_string(val: Any) -> str:
258+
return str(val)
259+
260+
261+
async def main():
262+
pipe = (
263+
# The pipelines starts with `add_one` task.
264+
Pipeline(broker, add_one)
265+
# All values are passed to the group
266+
.group(
267+
GroupWithArgs(
268+
# Aborts pipeline
269+
# if any of tasks fails
270+
skip_errors=False,
271+
# How often to check for completion.
272+
check_interval=0.1,
273+
)
274+
# Here we start task that adds 1 to result
275+
# of the previous task.
276+
# The result is passed as keyword argument "value"
277+
.add(add_one, param_name="value")
278+
# Here's a task that multiplies 2
279+
.add(mul_two)
280+
# Here we map all results to string
281+
).map(to_string)
282+
)
283+
task = await pipe.kiq(1)
284+
result = await task.wait_result()
285+
# Here it should output
286+
# Calculated value: ['3', '4']
287+
print("Calculated value:", result.return_value)
288+
289+
290+
if __name__ == "__main__":
291+
asyncio.run(main())
292+
```

taskiq_pipelines/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
from taskiq_pipelines.exceptions import AbortPipeline, PipelineError
44
from taskiq_pipelines.middleware import PipelineMiddleware
55
from taskiq_pipelines.pipeliner import Pipeline
6-
from taskiq_pipelines.task_group import Group
6+
from taskiq_pipelines.task_group import Group, GroupWithArgs
77

88
__all__ = [
99
"AbortPipeline",
1010
"Group",
11+
"GroupWithArgs",
1112
"Pipeline",
1213
"PipelineError",
1314
"PipelineMiddleware",

taskiq_pipelines/steps/group.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
async def wait_group_tasks(
2121
task_ids: List[str],
2222
check_interval: float,
23-
skip_errors: bool = True,
23+
skip_errors: bool = False,
2424
context: Context = TaskiqDepends(),
2525
) -> tuple[Any, ...]:
2626
"""Waits for subtasks to complete."""
@@ -130,7 +130,7 @@ async def act(
130130
)
131131
.kiq(
132132
task_ids=ids,
133-
skip_errors=True,
133+
skip_errors=self.skip_errors,
134134
check_interval=self.check_interval,
135135
)
136136
)

0 commit comments

Comments
 (0)