Skip to content

Commit fb063d5

Browse files
committed
add pipeline utils
1 parent 5c7732a commit fb063d5

1 file changed

Lines changed: 54 additions & 0 deletions

File tree

utils/pipeline.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from dataclasses import dataclass, field
2+
from __future__ import annotations
3+
from enum import StrEnum
4+
from typing import Callable, Generic, TypeVar
5+
6+
T = TypeVar("T")
7+
U = TypeVar("U")
8+
9+
@dataclass(frozen=True)
10+
class Phase:
11+
name: StrEnum
12+
value: object
13+
14+
@dataclass
15+
class PipelineResult(Generic[T]):
16+
value: T | None
17+
trace: list[Phase] = field(default_factory=list)
18+
errors: list[Exception] = field(default_factory=list)
19+
20+
@property
21+
def ok(self) -> bool:
22+
return not self.errors
23+
24+
def start_pipeline(name: StrEnum, value: T) -> PipelineResult[T]:
25+
return PipelineResult(
26+
value=value,
27+
trace=[Phase(name, value)],
28+
)
29+
30+
def add_phase(
31+
pipeline: PipelineResult[T],
32+
name: StrEnum,
33+
fn: Callable[[T], U],
34+
) -> PipelineResult[U]:
35+
if not pipeline.ok:
36+
return PipelineResult(
37+
value=None,
38+
trace=pipeline.trace,
39+
errors=pipeline.errors,
40+
)
41+
42+
try:
43+
assert pipeline.value is not None
44+
new_value = fn(pipeline.value)
45+
return PipelineResult(
46+
value=new_value,
47+
trace=pipeline.trace + [Phase(name, new_value)],
48+
)
49+
except Exception as exc:
50+
return PipelineResult(
51+
value=None,
52+
trace=pipeline.trace,
53+
errors=pipeline.errors + [exc],
54+
)

0 commit comments

Comments
 (0)