From e2a5340f35728719e0bf8eb04bc94dd026b57893 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Fri, 22 May 2026 15:45:30 +0200 Subject: [PATCH] Preserve dataset build failure details --- changelog.d/1111.fixed.md | 1 + modal_app/pipeline.py | 21 +++++++++- modal_app/step_manifests/status.py | 25 +++++++++-- .../build_datasets/commands.py | 5 +++ policyengine_us_data/build_datasets/status.py | 7 +++- tests/unit/test_build_dataset_commands.py | 17 ++++++++ tests/unit/test_pipeline.py | 41 +++++++++++++++++++ tests/unit/test_pipeline_status.py | 29 +++++++++++++ 8 files changed, 140 insertions(+), 6 deletions(-) create mode 100644 changelog.d/1111.fixed.md diff --git a/changelog.d/1111.fixed.md b/changelog.d/1111.fixed.md new file mode 100644 index 000000000..2b17b0ae5 --- /dev/null +++ b/changelog.d/1111.fixed.md @@ -0,0 +1 @@ +Preserve dataset-build failure details when Modal serializes Stage 1 command errors. diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index acd7bb26b..3b8b35a4f 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -90,6 +90,9 @@ step_reusable as _step_reusable, write_run_meta, ) +from policyengine_us_data.build_datasets.commands import ( # noqa: E402 + DatasetCommandError, +) from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402 from policyengine_us_data.utils.error_redaction import ( # noqa: E402 redacted_bounded_error_text, @@ -217,6 +220,17 @@ def _record_pipeline_failure( return None +def _traceback_text_for_pipeline_failure( + exc: BaseException, + fallback_traceback: str, +) -> str: + """Prefer captured Stage 1 command output when available.""" + + if isinstance(exc, DatasetCommandError) and exc.result.error is not None: + return exc.result.error.traceback_text() + return fallback_traceback + + def _pipeline_error_summary( exc: BaseException, *, @@ -1793,7 +1807,7 @@ def run_pipeline( return run_id except Exception as e: - traceback_text = traceback.format_exc() + traceback_text = _traceback_text_for_pipeline_failure(e, traceback.format_exc()) traceback_ref = _record_pipeline_failure( e, run_id=run_id, @@ -2029,7 +2043,10 @@ def promote_run( ) write_run_meta(meta, pipeline_volume) except Exception as exc: - traceback_text = traceback.format_exc() + traceback_text = _traceback_text_for_pipeline_failure( + exc, + traceback.format_exc(), + ) traceback_ref = _record_pipeline_failure( exc, run_id=run_id, diff --git a/modal_app/step_manifests/status.py b/modal_app/step_manifests/status.py index a9d66324a..4379fd00c 100644 --- a/modal_app/step_manifests/status.py +++ b/modal_app/step_manifests/status.py @@ -52,6 +52,25 @@ def _error_payload( return error_record.to_status_dict() +def _latest_error_payload(run_dir: Path) -> dict[str, Any] | None: + try: + return _error_payload(read_latest_pipeline_error(run_dir)) + except Exception as exc: + message = redacted_bounded_error_text( + f"{type(exc).__name__}: {exc}", + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ).text + return { + "source": "latest_error.json", + "surface": "error_record_read", + "stage_id": None, + "substage_id": None, + "error_type": type(exc).__name__, + "message": message, + "traceback_available": False, + } + + def _run_manifest_error_payload(error_text: str | None) -> dict[str, Any] | None: if not error_text: return None @@ -368,9 +387,9 @@ def build_pipeline_status_payload( missing_expected = [ step_id for step_id in expected_ids if step_id not in present_ids ] - error = _error_payload( - read_latest_pipeline_error(run_dir), - ) or _run_manifest_error_payload(run_manifest.error) + error = _latest_error_payload(run_dir) or _run_manifest_error_payload( + run_manifest.error + ) status = run_manifest.status return { "schema_version": PIPELINE_STATUS_SCHEMA_VERSION, diff --git a/policyengine_us_data/build_datasets/commands.py b/policyengine_us_data/build_datasets/commands.py index bdbaeca85..b3184138e 100644 --- a/policyengine_us_data/build_datasets/commands.py +++ b/policyengine_us_data/build_datasets/commands.py @@ -67,6 +67,11 @@ def __init__(self, result: DatasetCommandResult): self.result = result super().__init__(f"Command failed ({result.returncode}): {result.command_name}") + def __reduce__(self): + """Preserve the structured result when Modal pickles this exception.""" + + return (self.__class__, (self.result,)) + @dataclass class SubprocessLogCapture: diff --git a/policyengine_us_data/build_datasets/status.py b/policyengine_us_data/build_datasets/status.py index 61c9758bd..b7776dde1 100644 --- a/policyengine_us_data/build_datasets/status.py +++ b/policyengine_us_data/build_datasets/status.py @@ -101,6 +101,11 @@ def to_dict(self) -> dict[str, Any]: "metadata": dict(self.metadata), } + def traceback_text(self) -> str: + """Return traceback-like text from captured command context.""" + + return _pipeline_traceback_text(self) + def to_pipeline_error_record( self, *, @@ -126,7 +131,7 @@ def to_pipeline_error_record( stage_id=STAGE_1_BUILD_DATASETS, substage_id=self.substep_id, surface=surface, - traceback_text=_pipeline_traceback_text(self), + traceback_text=self.traceback_text(), occurred_at=self.created_at, env=env, ) diff --git a/tests/unit/test_build_dataset_commands.py b/tests/unit/test_build_dataset_commands.py index c12461b5e..f27dc2a97 100644 --- a/tests/unit/test_build_dataset_commands.py +++ b/tests/unit/test_build_dataset_commands.py @@ -1,3 +1,4 @@ +import pickle import sys import pytest @@ -82,6 +83,22 @@ def test_command_runner_raises_structured_failure(): assert result.combined_output_tail == ("structured failure\n",) +def test_dataset_command_error_round_trips_through_pickle(): + command = DatasetCommand( + name="failing command", + argv=(sys.executable, "-c", "import sys; sys.exit(7)"), + ) + + with pytest.raises(DatasetCommandError) as exc_info: + CommandRunner().run(command) + + restored = pickle.loads(pickle.dumps(exc_info.value)) + + assert str(restored) == "Command failed (7): failing command" + assert restored.result.command_name == "failing command" + assert restored.result.returncode == 7 + + def test_command_runner_can_return_structured_failure_without_raising(): command = DatasetCommand( name="nonraising command", diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 69f67bb82..95235e3ba 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -16,6 +16,7 @@ _new_run_metadata, _pipeline_error_summary, _run_required_promotion_subprocess, + _traceback_text_for_pipeline_failure, _try_reload_pipeline_volume_after_h5_builds, ) from modal_app.step_manifests.state import RunMetadata # noqa: E402 @@ -23,6 +24,9 @@ read_run_meta, write_run_meta, ) +from policyengine_us_data.build_datasets.commands import DatasetCommandError # noqa: E402 +from policyengine_us_data.build_datasets.results import DatasetCommandResult # noqa: E402 +from policyengine_us_data.build_datasets.status import Stage1ErrorRecord # noqa: E402 from policyengine_us_data.utils.run_context import RunContext # noqa: E402 from policyengine_us_data.utils.step_manifest import ArtifactReference # noqa: E402 @@ -128,6 +132,43 @@ def test_pipeline_error_summary_falls_back_to_bounded_traceback(monkeypatch): assert "old traceback" not in summary +def test_pipeline_failure_traceback_prefers_stage_1_command_tail(): + result = DatasetCommandResult( + command_name="policyengine_us_data/datasets/cps/extended_cps.py", + argv=("python", "-m", "policyengine_us_data.datasets.cps.extended_cps"), + status="failed", + returncode=1, + started_at="2026-05-22T12:00:00Z", + completed_at="2026-05-22T12:00:01Z", + duration_s=1.0, + combined_output_tail=("actual ecps failure\n",), + error=Stage1ErrorRecord( + substep_id="1c_extended_cps_puf_clone", + command_name="policyengine_us_data/datasets/cps/extended_cps.py", + error_type="RuntimeError", + message="Command failed", + returncode=1, + metadata={ + "argv": [ + "python", + "-m", + "policyengine_us_data.datasets.cps.extended_cps", + ], + "output_tail": ["actual ecps failure\n"], + }, + ), + ) + + traceback_text = _traceback_text_for_pipeline_failure( + DatasetCommandError(result), + "fallback traceback", + ) + + assert "fallback traceback" not in traceback_text + assert "policyengine_us_data.datasets.cps.extended_cps" in traceback_text + assert "actual ecps failure" in traceback_text + + def test_new_run_metadata_accepts_release_context_fields_once(): context = RunContext.from_mapping( { diff --git a/tests/unit/test_pipeline_status.py b/tests/unit/test_pipeline_status.py index 6c872b22a..e57c1d1ef 100644 --- a/tests/unit/test_pipeline_status.py +++ b/tests/unit/test_pipeline_status.py @@ -361,6 +361,35 @@ def test_status_payload_truncates_oldest_traceback_text(tmp_path): assert "oldest" not in payload["error"]["traceback"] +def test_status_payload_survives_unreadable_latest_error_record(tmp_path): + runs_dir = tmp_path / "runs" + run_dir = runs_dir / "run-1" + write_run_manifest( + run_manifest_path(run_dir), + RunManifest( + run_id="run-1", + branch="main", + sha="abc123", + version="1.0.0", + status="failed", + started_at="2026-05-12T12:00:00+00:00", + known_step_ids=[BUILD_DATASETS.id], + error="fallback manifest error", + ), + ) + latest_path = run_dir / "errors" / "latest_error.json" + latest_path.parent.mkdir(parents=True) + latest_path.write_text("{not-json") + + payload = build_pipeline_status_payload("run-1", runs_dir=runs_dir) + + assert payload["status"] == "failed" + assert payload["error"]["source"] == "latest_error.json" + assert payload["error"]["surface"] == "error_record_read" + assert payload["error"]["traceback_available"] is False + assert "JSONDecodeError" in payload["error"]["message"] + + def test_clear_latest_pipeline_error_is_best_effort(tmp_path): run_dir = tmp_path / "runs" / "run-1" latest_path = run_dir / "errors" / "latest_error.json"