-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathstatus.py
More file actions
167 lines (137 loc) · 5.07 KB
/
status.py
File metadata and controls
167 lines (137 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Structured status records for Stage 1 dataset-build execution."""
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass, field, replace
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING:
from modal_app.step_manifests.errors import PipelineErrorRecord
Stage1SubstepStatus = Literal["started", "completed", "skipped", "failed"]
def utc_timestamp(value: datetime | None = None) -> str:
"""Render a UTC timestamp for Stage 1 execution status records."""
value = value or datetime.now(timezone.utc)
return (
value.astimezone(timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
@dataclass(frozen=True, kw_only=True)
class Stage1StatusEvent:
"""A timestamped status transition for a Stage 1 substep or command."""
substep_id: str
status: Stage1SubstepStatus
created_at: str
message: str | None = None
command_name: str | None = None
metadata: Mapping[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-compatible status event payload."""
return {
"substep_id": self.substep_id,
"status": self.status,
"created_at": self.created_at,
"message": self.message,
"command_name": self.command_name,
"metadata": dict(self.metadata),
}
@dataclass(frozen=True, kw_only=True)
class Stage1ErrorRecord:
"""Structured command or substep failure details.
Stage 1 keeps this as an in-memory adapter surface during the refactor.
Durable pipeline status should use :class:`PipelineErrorRecord`, available
through ``to_pipeline_error_record``.
"""
substep_id: str | None
command_name: str | None
error_type: str
message: str
returncode: int | None = None
created_at: str = field(default_factory=utc_timestamp)
metadata: Mapping[str, Any] = field(default_factory=dict)
@classmethod
def from_exception(
cls,
exc: BaseException,
*,
substep_id: str | None = None,
command_name: str | None = None,
returncode: int | None = None,
metadata: Mapping[str, Any] | None = None,
) -> "Stage1ErrorRecord":
"""Build an error record from an exception without parsing logs."""
return cls(
substep_id=substep_id,
command_name=command_name,
error_type=type(exc).__name__,
message=str(exc),
returncode=returncode,
metadata=dict(metadata or {}),
)
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-compatible error payload."""
return {
"substep_id": self.substep_id,
"command_name": self.command_name,
"error_type": self.error_type,
"message": self.message,
"returncode": self.returncode,
"created_at": self.created_at,
"metadata": dict(self.metadata),
}
def traceback_text(self) -> str:
"""Return traceback-like text from captured command context."""
return _pipeline_traceback_text(self)
def to_pipeline_error_record(
self,
*,
run_id: str,
branch: str | None = None,
sha: str | None = None,
version: str | None = None,
modal_app_name: str | None = None,
modal_environment: str | None = None,
surface: str = "stage_1_dataset_build",
env: Mapping[str, str] | None = None,
) -> "PipelineErrorRecord":
"""Adapt this Stage 1 error into the durable pipeline error schema."""
from modal_app.step_manifests.errors import build_pipeline_error_record
from policyengine_us_data.stage_contracts.stages import (
STAGE_1_BUILD_DATASETS,
)
record = build_pipeline_error_record(
RuntimeError(self.message),
run_id=run_id,
stage_id=STAGE_1_BUILD_DATASETS,
substage_id=self.substep_id,
surface=surface,
traceback_text=self.traceback_text(),
occurred_at=self.created_at,
env=env,
)
return replace(
record,
error_type=self.error_type,
branch=branch,
sha=sha,
version=version,
modal_app_name=modal_app_name,
modal_environment=modal_environment,
)
def _pipeline_traceback_text(error: Stage1ErrorRecord) -> str:
parts: list[str] = []
argv = error.metadata.get("argv")
if isinstance(argv, list) and argv:
parts.append("Command argv: " + " ".join(str(part) for part in argv))
output_tail = error.metadata.get("output_tail")
if isinstance(output_tail, list) and output_tail:
parts.append("Output tail:\n" + "".join(str(line) for line in output_tail))
if not parts:
parts.append(error.message)
return "\n\n".join(parts)
__all__ = [
"Stage1ErrorRecord",
"Stage1StatusEvent",
"Stage1SubstepStatus",
"utc_timestamp",
]