Skip to content

Commit 5bbe34a

Browse files
committed
Expose Stage 1 substep status
1 parent 981624d commit 5bbe34a

9 files changed

Lines changed: 609 additions & 18 deletions

File tree

changelog.d/1114.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Expose run-scoped Stage 1 substep status in pipeline status payloads.

modal_app/data_build.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
DatasetBuildOutputContractBuilder,
3131
PipelineArtifactStager,
3232
Stage1Coordinator,
33+
Stage1StatusRecorder,
3334
stage_1_artifact_specs,
3435
stage_1_script_outputs,
3536
stage_1_substep_id_for_script,
@@ -716,6 +717,11 @@ def build_datasets(
716717
)
717718
log_file.flush()
718719
coordinator = Stage1Coordinator()
720+
if run_id:
721+
coordinator.status_recorder = Stage1StatusRecorder(
722+
Path(PIPELINE_MOUNT) / "runs" / run_id,
723+
commit_callback=pipeline_volume.commit,
724+
)
719725
recorded_skips: set[tuple[str, str]] = set()
720726

721727
def record_skipped_script(script: str, reason: str) -> None:

modal_app/step_manifests/status.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
run_manifest_path,
1212
step_manifest_dir,
1313
)
14+
from policyengine_us_data.build_datasets import (
15+
empty_stage_1_status_snapshot,
16+
read_stage_1_status_snapshot,
17+
)
1418
from policyengine_us_data.utils.error_redaction import (
1519
DEFAULT_ERROR_MESSAGE_MAX_CHARS,
1620
bound_error_text,
@@ -24,7 +28,11 @@
2428
read_latest_pipeline_error,
2529
stage_ids_for_manifest,
2630
)
27-
from modal_app.step_manifests.specs import RUN_MANIFEST_STEP_IDS, step_title
31+
from modal_app.step_manifests.specs import (
32+
BUILD_DATASETS,
33+
RUN_MANIFEST_STEP_IDS,
34+
step_title,
35+
)
2836

2937
PIPELINE_STATUS_SCHEMA_VERSION = "1"
3038
DEFAULT_RUNS_LIMIT = 25
@@ -112,6 +120,7 @@ def _message(
112120
status: str,
113121
stage_manifests: list[dict[str, Any]],
114122
error: dict[str, Any] | None,
123+
stage_1_status: dict[str, Any] | None = None,
115124
) -> str:
116125
if error:
117126
location = (
@@ -125,6 +134,19 @@ def _message(
125134
return "Pipeline run not found."
126135
if stage_manifests:
127136
latest = stage_manifests[-1]
137+
current_stage_1 = (stage_1_status or {}).get("current") or {}
138+
if (
139+
latest["step_id"] == BUILD_DATASETS.id
140+
and latest["status"] == "running"
141+
and current_stage_1
142+
):
143+
substep_id = current_stage_1.get("substep_id")
144+
title = current_stage_1.get("title") or substep_id
145+
substep_status = current_stage_1.get("status", "unknown")
146+
return (
147+
f"Pipeline {status}; current Stage 1 substep "
148+
f"{substep_id} ({title}) is {substep_status}."
149+
)
128150
return (
129151
f"Pipeline {status}; latest manifest "
130152
f"{latest['substage_id'] or latest['stage_id']} is {latest['status']}."
@@ -215,6 +237,11 @@ def _latest_manifest_payload(
215237
}
216238

217239

240+
def _stage_1_status_payload(run_dir: Path) -> dict[str, Any]:
241+
snapshot = read_stage_1_status_snapshot(run_dir)
242+
return _sanitize_error_value(snapshot.to_dict())
243+
244+
218245
def _run_index_item(
219246
run_id: str,
220247
*,
@@ -241,6 +268,7 @@ def _run_index_item(
241268
"hf_staging_prefix": run_manifest.get("hf_staging_prefix"),
242269
"github_run_url": (run_manifest.get("run_context") or {}).get("github_run_url"),
243270
"latest_manifest": _latest_manifest_payload(stage_manifests),
271+
"stage_1_current": (payload.get("stage_1_status") or {}).get("current"),
244272
"progress": {
245273
"expected_manifests": len(expected),
246274
"present_manifests": len(stage_manifests),
@@ -271,6 +299,7 @@ def _unreadable_run_index_item(run_id: str, exc: BaseException) -> dict[str, Any
271299
"hf_staging_prefix": None,
272300
"github_run_url": None,
273301
"latest_manifest": None,
302+
"stage_1_current": None,
274303
"progress": {
275304
"expected_manifests": 0,
276305
"present_manifests": 0,
@@ -357,6 +386,7 @@ def build_pipeline_status_payload(
357386
"stage_manifests": [],
358387
"missing_expected_manifest_ids": [],
359388
"error": None,
389+
"stage_1_status": empty_stage_1_status_snapshot().to_dict(),
360390
}
361391

362392
run_dir = _run_dir(run_id, runs_dir)
@@ -371,6 +401,7 @@ def build_pipeline_status_payload(
371401
"stage_manifests": [],
372402
"missing_expected_manifest_ids": list(RUN_MANIFEST_STEP_IDS),
373403
"error": None,
404+
"stage_1_status": empty_stage_1_status_snapshot().to_dict(),
374405
}
375406

376407
run_manifest = read_run_manifest(manifest_path)
@@ -391,6 +422,7 @@ def build_pipeline_status_payload(
391422
run_manifest.error
392423
)
393424
status = run_manifest.status
425+
stage_1_status = _stage_1_status_payload(run_dir)
394426
return {
395427
"schema_version": PIPELINE_STATUS_SCHEMA_VERSION,
396428
"run_id": run_id,
@@ -399,11 +431,13 @@ def build_pipeline_status_payload(
399431
status=status,
400432
stage_manifests=stage_manifests,
401433
error=error,
434+
stage_1_status=stage_1_status,
402435
),
403436
"run_manifest": _run_manifest_payload(run_manifest),
404437
"stage_manifests": stage_manifests,
405438
"missing_expected_manifest_ids": missing_expected,
406439
"error": error,
440+
"stage_1_status": stage_1_status,
407441
"updated_at": run_manifest.updated_at,
408442
"modal_app_name": run_manifest.modal_app_name,
409443
"modal_environment": run_manifest.modal_environment,

policyengine_us_data/build_datasets/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from .coordinator import (
2121
CommandBackedSubstepRunner,
2222
Stage1Coordinator,
23+
Stage1StatusSink,
2324
Stage1SubstepRunner,
2425
stage_1_substep_id_for_script,
2526
stage_1_substep_title,
@@ -40,6 +41,12 @@
4041
from .results import DatasetCommandResult, DatasetSubstepResult
4142
from .staging import PipelineArtifactStager
4243
from .status import Stage1ErrorRecord, Stage1StatusEvent
44+
from .status_store import (
45+
Stage1StatusRecorder,
46+
Stage1StatusSnapshot,
47+
empty_stage_1_status_snapshot,
48+
read_stage_1_status_snapshot,
49+
)
4350

4451
__all__ = [
4552
"ARTIFACT_SCHEMA_VERSION",
@@ -61,10 +68,15 @@
6168
"SourceDatasetSchemaSummaryWriter",
6269
"Stage1Coordinator",
6370
"Stage1ErrorRecord",
71+
"Stage1StatusRecorder",
6472
"Stage1StatusEvent",
73+
"Stage1StatusSink",
74+
"Stage1StatusSnapshot",
6575
"Stage1SubstepRunner",
6676
"SubprocessLogCapture",
6777
"TargetDatabaseSchemaSummaryWriter",
78+
"empty_stage_1_status_snapshot",
79+
"read_stage_1_status_snapshot",
6880
"stage_1_artifact_specs",
6981
"stage_1_contract_artifact_specs",
7082
"stage_1_diagnostic_artifact_specs",

policyengine_us_data/build_datasets/coordinator.py

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ def run(self) -> Any:
2626
"""Run the substep action."""
2727

2828

29+
class Stage1StatusSink(Protocol):
30+
"""Persistence sink for Stage 1 status transitions and results."""
31+
32+
def record_event(self, event: Stage1StatusEvent) -> None:
33+
"""Persist one status event."""
34+
35+
def record_result(self, result: DatasetSubstepResult) -> None:
36+
"""Persist one substep result."""
37+
38+
2939
@dataclass(frozen=True, kw_only=True)
3040
class CommandBackedSubstepRunner:
3141
"""Run a Stage 1 substep backed by existing side-effecting commands."""
@@ -60,6 +70,7 @@ class _SubstepAggregate:
6070
class Stage1Coordinator:
6171
"""Collect Stage 1 substep status events, errors, and results."""
6272

73+
status_recorder: Stage1StatusSink | None = None
6374
results: list[DatasetSubstepResult] = field(default_factory=list)
6475
status_events: list[Stage1StatusEvent] = field(default_factory=list)
6576
error_records: list[Stage1ErrorRecord] = field(default_factory=list)
@@ -244,22 +255,24 @@ def _record_aggregate_start(
244255
*,
245256
metadata: Mapping[str, Any] | None,
246257
) -> None:
258+
event: Stage1StatusEvent | None = None
247259
with self._lock:
248260
state = self._aggregate_state(runner)
249261
if state.started_dt is None:
250262
state.started_dt = started_dt
251-
self.status_events.append(
252-
Stage1StatusEvent(
253-
substep_id=runner.substep_id,
254-
status="started",
255-
created_at=utc_timestamp(started_dt),
256-
message=f"Started {runner.title}",
257-
metadata=dict(metadata or {}),
258-
)
263+
event = Stage1StatusEvent(
264+
substep_id=runner.substep_id,
265+
status="started",
266+
created_at=utc_timestamp(started_dt),
267+
message=f"Started {runner.title}",
268+
metadata=dict(metadata or {}),
259269
)
270+
self.status_events.append(event)
260271
elif started_dt < state.started_dt:
261272
state.started_dt = started_dt
262273
state.metadata.update(dict(metadata or {}))
274+
if event is not None and self.status_recorder is not None:
275+
self.status_recorder.record_event(event)
263276

264277
def _record_aggregate_skip(
265278
self,
@@ -418,23 +431,27 @@ def _result(
418431
)
419432

420433
def _record(self, result: DatasetSubstepResult) -> None:
434+
event = Stage1StatusEvent(
435+
substep_id=result.substep_id,
436+
status=result.status,
437+
created_at=result.completed_at,
438+
message=f"{result.title}: {result.status}",
439+
metadata=dict(result.metadata),
440+
)
421441
with self._lock:
422442
self.results.append(result)
423-
self.status_events.append(
424-
Stage1StatusEvent(
425-
substep_id=result.substep_id,
426-
status=result.status,
427-
created_at=result.completed_at,
428-
message=f"{result.title}: {result.status}",
429-
metadata=dict(result.metadata),
430-
)
431-
)
443+
self.status_events.append(event)
432444
if result.error is not None:
433445
self.error_records.append(result.error)
446+
if self.status_recorder is not None:
447+
self.status_recorder.record_result(result)
448+
self.status_recorder.record_event(event)
434449

435450
def _record_event(self, event: Stage1StatusEvent) -> None:
436451
with self._lock:
437452
self.status_events.append(event)
453+
if self.status_recorder is not None:
454+
self.status_recorder.record_event(event)
438455

439456

440457
def stage_1_substep_id_for_script(script_path: str) -> str:
@@ -519,6 +536,7 @@ def _error_record_from_exception(
519536
__all__ = [
520537
"CommandBackedSubstepRunner",
521538
"Stage1Coordinator",
539+
"Stage1StatusSink",
522540
"Stage1SubstepRunner",
523541
"stage_1_substep_id_for_script",
524542
"stage_1_substep_title",

0 commit comments

Comments
 (0)