Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1114.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Expose run-scoped Stage 1 substep status in pipeline status payloads.
6 changes: 6 additions & 0 deletions modal_app/data_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
DatasetBuildOutputContractBuilder,
PipelineArtifactStager,
Stage1Coordinator,
Stage1StatusRecorder,
stage_1_artifact_specs,
stage_1_script_outputs,
stage_1_substep_id_for_script,
Expand Down Expand Up @@ -716,6 +717,11 @@ def build_datasets(
)
log_file.flush()
coordinator = Stage1Coordinator()
if run_id:
coordinator.status_recorder = Stage1StatusRecorder(
Path(PIPELINE_MOUNT) / "runs" / run_id,
commit_callback=pipeline_volume.commit,
)
recorded_skips: set[tuple[str, str]] = set()

def record_skipped_script(script: str, reason: str) -> None:
Expand Down
36 changes: 35 additions & 1 deletion modal_app/step_manifests/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
run_manifest_path,
step_manifest_dir,
)
from policyengine_us_data.build_datasets import (
empty_stage_1_status_snapshot,
read_stage_1_status_snapshot,
)
from policyengine_us_data.utils.error_redaction import (
DEFAULT_ERROR_MESSAGE_MAX_CHARS,
bound_error_text,
Expand All @@ -24,7 +28,11 @@
read_latest_pipeline_error,
stage_ids_for_manifest,
)
from modal_app.step_manifests.specs import RUN_MANIFEST_STEP_IDS, step_title
from modal_app.step_manifests.specs import (
BUILD_DATASETS,
RUN_MANIFEST_STEP_IDS,
step_title,
)

PIPELINE_STATUS_SCHEMA_VERSION = "1"
DEFAULT_RUNS_LIMIT = 25
Expand Down Expand Up @@ -112,6 +120,7 @@ def _message(
status: str,
stage_manifests: list[dict[str, Any]],
error: dict[str, Any] | None,
stage_1_status: dict[str, Any] | None = None,
) -> str:
if error:
location = (
Expand All @@ -125,6 +134,19 @@ def _message(
return "Pipeline run not found."
if stage_manifests:
latest = stage_manifests[-1]
current_stage_1 = (stage_1_status or {}).get("current") or {}
if (
latest["step_id"] == BUILD_DATASETS.id
and latest["status"] == "running"
and current_stage_1
):
substep_id = current_stage_1.get("substep_id")
title = current_stage_1.get("title") or substep_id
substep_status = current_stage_1.get("status", "unknown")
return (
f"Pipeline {status}; current Stage 1 substep "
f"{substep_id} ({title}) is {substep_status}."
)
return (
f"Pipeline {status}; latest manifest "
f"{latest['substage_id'] or latest['stage_id']} is {latest['status']}."
Expand Down Expand Up @@ -215,6 +237,11 @@ def _latest_manifest_payload(
}


def _stage_1_status_payload(run_dir: Path) -> dict[str, Any]:
snapshot = read_stage_1_status_snapshot(run_dir)
return _sanitize_error_value(snapshot.to_dict())


def _run_index_item(
run_id: str,
*,
Expand All @@ -241,6 +268,7 @@ def _run_index_item(
"hf_staging_prefix": run_manifest.get("hf_staging_prefix"),
"github_run_url": (run_manifest.get("run_context") or {}).get("github_run_url"),
"latest_manifest": _latest_manifest_payload(stage_manifests),
"stage_1_current": (payload.get("stage_1_status") or {}).get("current"),
"progress": {
"expected_manifests": len(expected),
"present_manifests": len(stage_manifests),
Expand Down Expand Up @@ -271,6 +299,7 @@ def _unreadable_run_index_item(run_id: str, exc: BaseException) -> dict[str, Any
"hf_staging_prefix": None,
"github_run_url": None,
"latest_manifest": None,
"stage_1_current": None,
"progress": {
"expected_manifests": 0,
"present_manifests": 0,
Expand Down Expand Up @@ -357,6 +386,7 @@ def build_pipeline_status_payload(
"stage_manifests": [],
"missing_expected_manifest_ids": [],
"error": None,
"stage_1_status": empty_stage_1_status_snapshot().to_dict(),
}

run_dir = _run_dir(run_id, runs_dir)
Expand All @@ -371,6 +401,7 @@ def build_pipeline_status_payload(
"stage_manifests": [],
"missing_expected_manifest_ids": list(RUN_MANIFEST_STEP_IDS),
"error": None,
"stage_1_status": empty_stage_1_status_snapshot().to_dict(),
}

run_manifest = read_run_manifest(manifest_path)
Expand All @@ -391,6 +422,7 @@ def build_pipeline_status_payload(
run_manifest.error
)
status = run_manifest.status
stage_1_status = _stage_1_status_payload(run_dir)
return {
"schema_version": PIPELINE_STATUS_SCHEMA_VERSION,
"run_id": run_id,
Expand All @@ -399,11 +431,13 @@ def build_pipeline_status_payload(
status=status,
stage_manifests=stage_manifests,
error=error,
stage_1_status=stage_1_status,
),
"run_manifest": _run_manifest_payload(run_manifest),
"stage_manifests": stage_manifests,
"missing_expected_manifest_ids": missing_expected,
"error": error,
"stage_1_status": stage_1_status,
"updated_at": run_manifest.updated_at,
"modal_app_name": run_manifest.modal_app_name,
"modal_environment": run_manifest.modal_environment,
Expand Down
16 changes: 16 additions & 0 deletions policyengine_us_data/build_datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .coordinator import (
CommandBackedSubstepRunner,
Stage1Coordinator,
Stage1StatusSink,
Stage1SubstepRunner,
stage_1_substep_id_for_script,
stage_1_substep_title,
Expand All @@ -40,6 +41,14 @@
from .results import DatasetCommandResult, DatasetSubstepResult
from .staging import PipelineArtifactStager
from .status import Stage1ErrorRecord, Stage1StatusEvent
from .status_store import (
Stage1StatusRecorder,
Stage1StatusReadError,
Stage1StatusSnapshot,
Stage1StoredStatusEvent,
empty_stage_1_status_snapshot,
read_stage_1_status_snapshot,
)

__all__ = [
"ARTIFACT_SCHEMA_VERSION",
Expand All @@ -61,10 +70,17 @@
"SourceDatasetSchemaSummaryWriter",
"Stage1Coordinator",
"Stage1ErrorRecord",
"Stage1StatusRecorder",
"Stage1StatusReadError",
"Stage1StatusEvent",
"Stage1StatusSink",
"Stage1StatusSnapshot",
"Stage1StoredStatusEvent",
"Stage1SubstepRunner",
"SubprocessLogCapture",
"TargetDatabaseSchemaSummaryWriter",
"empty_stage_1_status_snapshot",
"read_stage_1_status_snapshot",
"stage_1_artifact_specs",
"stage_1_contract_artifact_specs",
"stage_1_diagnostic_artifact_specs",
Expand Down
52 changes: 35 additions & 17 deletions policyengine_us_data/build_datasets/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ def run(self) -> Any:
"""Run the substep action."""


class Stage1StatusSink(Protocol):
"""Persistence sink for Stage 1 status transitions and results."""

def record_event(self, event: Stage1StatusEvent) -> None:
"""Persist one status event."""

def record_result(self, result: DatasetSubstepResult) -> None:
"""Persist one substep result."""


@dataclass(frozen=True, kw_only=True)
class CommandBackedSubstepRunner:
"""Run a Stage 1 substep backed by existing side-effecting commands."""
Expand Down Expand Up @@ -60,6 +70,7 @@ class _SubstepAggregate:
class Stage1Coordinator:
"""Collect Stage 1 substep status events, errors, and results."""

status_recorder: Stage1StatusSink | None = None
results: list[DatasetSubstepResult] = field(default_factory=list)
status_events: list[Stage1StatusEvent] = field(default_factory=list)
error_records: list[Stage1ErrorRecord] = field(default_factory=list)
Expand Down Expand Up @@ -244,22 +255,24 @@ def _record_aggregate_start(
*,
metadata: Mapping[str, Any] | None,
) -> None:
event: Stage1StatusEvent | None = None
with self._lock:
state = self._aggregate_state(runner)
if state.started_dt is None:
state.started_dt = started_dt
self.status_events.append(
Stage1StatusEvent(
substep_id=runner.substep_id,
status="started",
created_at=utc_timestamp(started_dt),
message=f"Started {runner.title}",
metadata=dict(metadata or {}),
)
event = Stage1StatusEvent(
substep_id=runner.substep_id,
status="started",
created_at=utc_timestamp(started_dt),
message=f"Started {runner.title}",
metadata=dict(metadata or {}),
)
self.status_events.append(event)
elif started_dt < state.started_dt:
state.started_dt = started_dt
state.metadata.update(dict(metadata or {}))
if event is not None and self.status_recorder is not None:
self.status_recorder.record_event(event)

def _record_aggregate_skip(
self,
Expand Down Expand Up @@ -418,23 +431,27 @@ def _result(
)

def _record(self, result: DatasetSubstepResult) -> None:
event = Stage1StatusEvent(
substep_id=result.substep_id,
status=result.status,
created_at=result.completed_at,
message=f"{result.title}: {result.status}",
metadata=dict(result.metadata),
)
with self._lock:
self.results.append(result)
self.status_events.append(
Stage1StatusEvent(
substep_id=result.substep_id,
status=result.status,
created_at=result.completed_at,
message=f"{result.title}: {result.status}",
metadata=dict(result.metadata),
)
)
self.status_events.append(event)
if result.error is not None:
self.error_records.append(result.error)
if self.status_recorder is not None:
self.status_recorder.record_result(result)
self.status_recorder.record_event(event)

def _record_event(self, event: Stage1StatusEvent) -> None:
with self._lock:
self.status_events.append(event)
if self.status_recorder is not None:
self.status_recorder.record_event(event)


def stage_1_substep_id_for_script(script_path: str) -> str:
Expand Down Expand Up @@ -519,6 +536,7 @@ def _error_record_from_exception(
__all__ = [
"CommandBackedSubstepRunner",
"Stage1Coordinator",
"Stage1StatusSink",
"Stage1SubstepRunner",
"stage_1_substep_id_for_script",
"stage_1_substep_title",
Expand Down
Loading