Skip to content

Commit e8646a2

Browse files
committed
Preserve dataset build failure details
1 parent f422d93 commit e8646a2

8 files changed

Lines changed: 140 additions & 6 deletions

File tree

changelog.d/1111.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Preserve dataset-build failure details when Modal serializes Stage 1 command errors.

modal_app/pipeline.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@
9090
step_reusable as _step_reusable,
9191
write_run_meta,
9292
)
93+
from policyengine_us_data.build_datasets.commands import ( # noqa: E402
94+
DatasetCommandError,
95+
)
9396
from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402
9497
from policyengine_us_data.utils.error_redaction import ( # noqa: E402
9598
redacted_bounded_error_text,
@@ -217,6 +220,17 @@ def _record_pipeline_failure(
217220
return None
218221

219222

223+
def _traceback_text_for_pipeline_failure(
224+
exc: BaseException,
225+
fallback_traceback: str,
226+
) -> str:
227+
"""Prefer captured Stage 1 command output when available."""
228+
229+
if isinstance(exc, DatasetCommandError) and exc.result.error is not None:
230+
return exc.result.error.traceback_text()
231+
return fallback_traceback
232+
233+
220234
def _pipeline_error_summary(
221235
exc: BaseException,
222236
*,
@@ -1793,7 +1807,7 @@ def run_pipeline(
17931807
return run_id
17941808

17951809
except Exception as e:
1796-
traceback_text = traceback.format_exc()
1810+
traceback_text = _traceback_text_for_pipeline_failure(e, traceback.format_exc())
17971811
traceback_ref = _record_pipeline_failure(
17981812
e,
17991813
run_id=run_id,
@@ -2029,7 +2043,10 @@ def promote_run(
20292043
)
20302044
write_run_meta(meta, pipeline_volume)
20312045
except Exception as exc:
2032-
traceback_text = traceback.format_exc()
2046+
traceback_text = _traceback_text_for_pipeline_failure(
2047+
exc,
2048+
traceback.format_exc(),
2049+
)
20332050
traceback_ref = _record_pipeline_failure(
20342051
exc,
20352052
run_id=run_id,

modal_app/step_manifests/status.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,25 @@ def _error_payload(
5252
return error_record.to_status_dict()
5353

5454

55+
def _latest_error_payload(run_dir: Path) -> dict[str, Any] | None:
56+
try:
57+
return _error_payload(read_latest_pipeline_error(run_dir))
58+
except Exception as exc:
59+
message = redacted_bounded_error_text(
60+
f"{type(exc).__name__}: {exc}",
61+
max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS,
62+
).text
63+
return {
64+
"source": "latest_error.json",
65+
"surface": "error_record_read",
66+
"stage_id": None,
67+
"substage_id": None,
68+
"error_type": type(exc).__name__,
69+
"message": message,
70+
"traceback_available": False,
71+
}
72+
73+
5574
def _run_manifest_error_payload(error_text: str | None) -> dict[str, Any] | None:
5675
if not error_text:
5776
return None
@@ -368,9 +387,9 @@ def build_pipeline_status_payload(
368387
missing_expected = [
369388
step_id for step_id in expected_ids if step_id not in present_ids
370389
]
371-
error = _error_payload(
372-
read_latest_pipeline_error(run_dir),
373-
) or _run_manifest_error_payload(run_manifest.error)
390+
error = _latest_error_payload(run_dir) or _run_manifest_error_payload(
391+
run_manifest.error
392+
)
374393
status = run_manifest.status
375394
return {
376395
"schema_version": PIPELINE_STATUS_SCHEMA_VERSION,

policyengine_us_data/build_datasets/commands.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ def __init__(self, result: DatasetCommandResult):
6767
self.result = result
6868
super().__init__(f"Command failed ({result.returncode}): {result.command_name}")
6969

70+
def __reduce__(self):
71+
"""Preserve the structured result when Modal pickles this exception."""
72+
73+
return (self.__class__, (self.result,))
74+
7075

7176
@dataclass
7277
class SubprocessLogCapture:

policyengine_us_data/build_datasets/status.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ def to_dict(self) -> dict[str, Any]:
101101
"metadata": dict(self.metadata),
102102
}
103103

104+
def traceback_text(self) -> str:
105+
"""Return traceback-like text from captured command context."""
106+
107+
return _pipeline_traceback_text(self)
108+
104109
def to_pipeline_error_record(
105110
self,
106111
*,
@@ -126,7 +131,7 @@ def to_pipeline_error_record(
126131
stage_id=STAGE_1_BUILD_DATASETS,
127132
substage_id=self.substep_id,
128133
surface=surface,
129-
traceback_text=_pipeline_traceback_text(self),
134+
traceback_text=self.traceback_text(),
130135
occurred_at=self.created_at,
131136
env=env,
132137
)

tests/unit/test_build_dataset_commands.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pickle
12
import sys
23

34
import pytest
@@ -82,6 +83,22 @@ def test_command_runner_raises_structured_failure():
8283
assert result.combined_output_tail == ("structured failure\n",)
8384

8485

86+
def test_dataset_command_error_round_trips_through_pickle():
87+
command = DatasetCommand(
88+
name="failing command",
89+
argv=(sys.executable, "-c", "import sys; sys.exit(7)"),
90+
)
91+
92+
with pytest.raises(DatasetCommandError) as exc_info:
93+
CommandRunner().run(command)
94+
95+
restored = pickle.loads(pickle.dumps(exc_info.value))
96+
97+
assert str(restored) == "Command failed (7): failing command"
98+
assert restored.result.command_name == "failing command"
99+
assert restored.result.returncode == 7
100+
101+
85102
def test_command_runner_can_return_structured_failure_without_raising():
86103
command = DatasetCommand(
87104
name="nonraising command",

tests/unit/test_pipeline.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
_new_run_metadata,
1717
_pipeline_error_summary,
1818
_run_required_promotion_subprocess,
19+
_traceback_text_for_pipeline_failure,
1920
_try_reload_pipeline_volume_after_h5_builds,
2021
)
2122
from modal_app.step_manifests.state import RunMetadata # noqa: E402
2223
from modal_app.step_manifests.store import ( # noqa: E402
2324
read_run_meta,
2425
write_run_meta,
2526
)
27+
from policyengine_us_data.build_datasets.commands import DatasetCommandError # noqa: E402
28+
from policyengine_us_data.build_datasets.results import DatasetCommandResult # noqa: E402
29+
from policyengine_us_data.build_datasets.status import Stage1ErrorRecord # noqa: E402
2630
from policyengine_us_data.utils.run_context import RunContext # noqa: E402
2731
from policyengine_us_data.utils.step_manifest import ArtifactReference # noqa: E402
2832

@@ -128,6 +132,43 @@ def test_pipeline_error_summary_falls_back_to_bounded_traceback(monkeypatch):
128132
assert "old traceback" not in summary
129133

130134

135+
def test_pipeline_failure_traceback_prefers_stage_1_command_tail():
136+
result = DatasetCommandResult(
137+
command_name="policyengine_us_data/datasets/cps/extended_cps.py",
138+
argv=("python", "-m", "policyengine_us_data.datasets.cps.extended_cps"),
139+
status="failed",
140+
returncode=1,
141+
started_at="2026-05-22T12:00:00Z",
142+
completed_at="2026-05-22T12:00:01Z",
143+
duration_s=1.0,
144+
combined_output_tail=("actual ecps failure\n",),
145+
error=Stage1ErrorRecord(
146+
substep_id="1c_extended_cps_puf_clone",
147+
command_name="policyengine_us_data/datasets/cps/extended_cps.py",
148+
error_type="RuntimeError",
149+
message="Command failed",
150+
returncode=1,
151+
metadata={
152+
"argv": [
153+
"python",
154+
"-m",
155+
"policyengine_us_data.datasets.cps.extended_cps",
156+
],
157+
"output_tail": ["actual ecps failure\n"],
158+
},
159+
),
160+
)
161+
162+
traceback_text = _traceback_text_for_pipeline_failure(
163+
DatasetCommandError(result),
164+
"fallback traceback",
165+
)
166+
167+
assert "fallback traceback" not in traceback_text
168+
assert "policyengine_us_data.datasets.cps.extended_cps" in traceback_text
169+
assert "actual ecps failure" in traceback_text
170+
171+
131172
def test_new_run_metadata_accepts_release_context_fields_once():
132173
context = RunContext.from_mapping(
133174
{

tests/unit/test_pipeline_status.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,35 @@ def test_status_payload_truncates_oldest_traceback_text(tmp_path):
361361
assert "oldest" not in payload["error"]["traceback"]
362362

363363

364+
def test_status_payload_survives_unreadable_latest_error_record(tmp_path):
365+
runs_dir = tmp_path / "runs"
366+
run_dir = runs_dir / "run-1"
367+
write_run_manifest(
368+
run_manifest_path(run_dir),
369+
RunManifest(
370+
run_id="run-1",
371+
branch="main",
372+
sha="abc123",
373+
version="1.0.0",
374+
status="failed",
375+
started_at="2026-05-12T12:00:00+00:00",
376+
known_step_ids=[BUILD_DATASETS.id],
377+
error="fallback manifest error",
378+
),
379+
)
380+
latest_path = run_dir / "errors" / "latest_error.json"
381+
latest_path.parent.mkdir(parents=True)
382+
latest_path.write_text("{not-json")
383+
384+
payload = build_pipeline_status_payload("run-1", runs_dir=runs_dir)
385+
386+
assert payload["status"] == "failed"
387+
assert payload["error"]["source"] == "latest_error.json"
388+
assert payload["error"]["surface"] == "error_record_read"
389+
assert payload["error"]["traceback_available"] is False
390+
assert "JSONDecodeError" in payload["error"]["message"]
391+
392+
364393
def test_clear_latest_pipeline_error_is_best_effort(tmp_path):
365394
run_dir = tmp_path / "runs" / "run-1"
366395
latest_path = run_dir / "errors" / "latest_error.json"

0 commit comments

Comments
 (0)