Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1111.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Preserve dataset-build failure details when Modal serializes Stage 1 command errors.
21 changes: 19 additions & 2 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@
step_reusable as _step_reusable,
write_run_meta,
)
from policyengine_us_data.build_datasets.commands import ( # noqa: E402
DatasetCommandError,
)
from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402
from policyengine_us_data.utils.error_redaction import ( # noqa: E402
redacted_bounded_error_text,
Expand Down Expand Up @@ -217,6 +220,17 @@ def _record_pipeline_failure(
return None


def _traceback_text_for_pipeline_failure(
exc: BaseException,
fallback_traceback: str,
) -> str:
"""Prefer captured Stage 1 command output when available."""

if isinstance(exc, DatasetCommandError) and exc.result.error is not None:
return exc.result.error.traceback_text()
return fallback_traceback


def _pipeline_error_summary(
exc: BaseException,
*,
Expand Down Expand Up @@ -1793,7 +1807,7 @@ def run_pipeline(
return run_id

except Exception as e:
traceback_text = traceback.format_exc()
traceback_text = _traceback_text_for_pipeline_failure(e, traceback.format_exc())
traceback_ref = _record_pipeline_failure(
e,
run_id=run_id,
Expand Down Expand Up @@ -2029,7 +2043,10 @@ def promote_run(
)
write_run_meta(meta, pipeline_volume)
except Exception as exc:
traceback_text = traceback.format_exc()
traceback_text = _traceback_text_for_pipeline_failure(
exc,
traceback.format_exc(),
)
traceback_ref = _record_pipeline_failure(
exc,
run_id=run_id,
Expand Down
25 changes: 22 additions & 3 deletions modal_app/step_manifests/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ def _error_payload(
return error_record.to_status_dict()


def _latest_error_payload(run_dir: Path) -> dict[str, Any] | None:
try:
return _error_payload(read_latest_pipeline_error(run_dir))
except Exception as exc:
message = redacted_bounded_error_text(
f"{type(exc).__name__}: {exc}",
max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS,
).text
return {
"source": "latest_error.json",
"surface": "error_record_read",
"stage_id": None,
"substage_id": None,
"error_type": type(exc).__name__,
"message": message,
"traceback_available": False,
}


def _run_manifest_error_payload(error_text: str | None) -> dict[str, Any] | None:
if not error_text:
return None
Expand Down Expand Up @@ -368,9 +387,9 @@ def build_pipeline_status_payload(
missing_expected = [
step_id for step_id in expected_ids if step_id not in present_ids
]
error = _error_payload(
read_latest_pipeline_error(run_dir),
) or _run_manifest_error_payload(run_manifest.error)
error = _latest_error_payload(run_dir) or _run_manifest_error_payload(
run_manifest.error
)
status = run_manifest.status
return {
"schema_version": PIPELINE_STATUS_SCHEMA_VERSION,
Expand Down
5 changes: 5 additions & 0 deletions policyengine_us_data/build_datasets/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def __init__(self, result: DatasetCommandResult):
self.result = result
super().__init__(f"Command failed ({result.returncode}): {result.command_name}")

def __reduce__(self):
"""Preserve the structured result when Modal pickles this exception."""

return (self.__class__, (self.result,))


@dataclass
class SubprocessLogCapture:
Expand Down
7 changes: 6 additions & 1 deletion policyengine_us_data/build_datasets/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def to_dict(self) -> dict[str, Any]:
"metadata": dict(self.metadata),
}

def traceback_text(self) -> str:
"""Return traceback-like text from captured command context."""

return _pipeline_traceback_text(self)

def to_pipeline_error_record(
self,
*,
Expand All @@ -126,7 +131,7 @@ def to_pipeline_error_record(
stage_id=STAGE_1_BUILD_DATASETS,
substage_id=self.substep_id,
surface=surface,
traceback_text=_pipeline_traceback_text(self),
traceback_text=self.traceback_text(),
occurred_at=self.created_at,
env=env,
)
Expand Down
17 changes: 17 additions & 0 deletions tests/unit/test_build_dataset_commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pickle
import sys

import pytest
Expand Down Expand Up @@ -82,6 +83,22 @@ def test_command_runner_raises_structured_failure():
assert result.combined_output_tail == ("structured failure\n",)


def test_dataset_command_error_round_trips_through_pickle():
command = DatasetCommand(
name="failing command",
argv=(sys.executable, "-c", "import sys; sys.exit(7)"),
)

with pytest.raises(DatasetCommandError) as exc_info:
CommandRunner().run(command)

restored = pickle.loads(pickle.dumps(exc_info.value))

assert str(restored) == "Command failed (7): failing command"
assert restored.result.command_name == "failing command"
assert restored.result.returncode == 7


def test_command_runner_can_return_structured_failure_without_raising():
command = DatasetCommand(
name="nonraising command",
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
_new_run_metadata,
_pipeline_error_summary,
_run_required_promotion_subprocess,
_traceback_text_for_pipeline_failure,
_try_reload_pipeline_volume_after_h5_builds,
)
from modal_app.step_manifests.state import RunMetadata # noqa: E402
from modal_app.step_manifests.store import ( # noqa: E402
read_run_meta,
write_run_meta,
)
from policyengine_us_data.build_datasets.commands import DatasetCommandError # noqa: E402
from policyengine_us_data.build_datasets.results import DatasetCommandResult # noqa: E402
from policyengine_us_data.build_datasets.status import Stage1ErrorRecord # noqa: E402
from policyengine_us_data.utils.run_context import RunContext # noqa: E402
from policyengine_us_data.utils.step_manifest import ArtifactReference # noqa: E402

Expand Down Expand Up @@ -128,6 +132,43 @@ def test_pipeline_error_summary_falls_back_to_bounded_traceback(monkeypatch):
assert "old traceback" not in summary


def test_pipeline_failure_traceback_prefers_stage_1_command_tail():
result = DatasetCommandResult(
command_name="policyengine_us_data/datasets/cps/extended_cps.py",
argv=("python", "-m", "policyengine_us_data.datasets.cps.extended_cps"),
status="failed",
returncode=1,
started_at="2026-05-22T12:00:00Z",
completed_at="2026-05-22T12:00:01Z",
duration_s=1.0,
combined_output_tail=("actual ecps failure\n",),
error=Stage1ErrorRecord(
substep_id="1c_extended_cps_puf_clone",
command_name="policyengine_us_data/datasets/cps/extended_cps.py",
error_type="RuntimeError",
message="Command failed",
returncode=1,
metadata={
"argv": [
"python",
"-m",
"policyengine_us_data.datasets.cps.extended_cps",
],
"output_tail": ["actual ecps failure\n"],
},
),
)

traceback_text = _traceback_text_for_pipeline_failure(
DatasetCommandError(result),
"fallback traceback",
)

assert "fallback traceback" not in traceback_text
assert "policyengine_us_data.datasets.cps.extended_cps" in traceback_text
assert "actual ecps failure" in traceback_text


def test_new_run_metadata_accepts_release_context_fields_once():
context = RunContext.from_mapping(
{
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/test_pipeline_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,35 @@ def test_status_payload_truncates_oldest_traceback_text(tmp_path):
assert "oldest" not in payload["error"]["traceback"]


def test_status_payload_survives_unreadable_latest_error_record(tmp_path):
runs_dir = tmp_path / "runs"
run_dir = runs_dir / "run-1"
write_run_manifest(
run_manifest_path(run_dir),
RunManifest(
run_id="run-1",
branch="main",
sha="abc123",
version="1.0.0",
status="failed",
started_at="2026-05-12T12:00:00+00:00",
known_step_ids=[BUILD_DATASETS.id],
error="fallback manifest error",
),
)
latest_path = run_dir / "errors" / "latest_error.json"
latest_path.parent.mkdir(parents=True)
latest_path.write_text("{not-json")

payload = build_pipeline_status_payload("run-1", runs_dir=runs_dir)

assert payload["status"] == "failed"
assert payload["error"]["source"] == "latest_error.json"
assert payload["error"]["surface"] == "error_record_read"
assert payload["error"]["traceback_available"] is False
assert "JSONDecodeError" in payload["error"]["message"]


def test_clear_latest_pipeline_error_is_best_effort(tmp_path):
run_dir = tmp_path / "runs" / "run-1"
latest_path = run_dir / "errors" / "latest_error.json"
Expand Down