-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathexceptions.py
More file actions
47 lines (28 loc) · 922 Bytes
/
exceptions.py
File metadata and controls
47 lines (28 loc) · 922 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from typing import ClassVar
from taskiq import TaskiqError
class PipelineError(TaskiqError):
"""Generic pipeline error."""
class StepError(PipelineError):
"""Error found while mapping step."""
__template__ = (
"Task {task_id} returned an error. {_STEP_NAME} failed. Reason: {error}"
)
_STEP_NAME: ClassVar[str]
task_id: str
error: BaseException | None
class MappingError(StepError):
"""Error found while mapping step."""
_STEP_NAME = "mapping"
class FilterError(StepError):
"""Error found while filtering step."""
_STEP_NAME = "filtering"
class AbortPipeline(PipelineError): # noqa: N818
"""
Abort curret pipeline execution.
This error can be thrown from
act method of a step.
It imediately aborts current pipeline
execution.
"""
__template__ = "Pipeline was aborted. {reason}"
reason: str = "No reason provided."