Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1044.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the Stage 5 published artifact index JSONL artifact.
16 changes: 16 additions & 0 deletions docs/engineering/pipeline-map.md
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,22 @@ def normalize_worker_response(*, worker_index: int, result: object) -> Coordinat

Normalize worker JSON into explicit fatal and nonfatal coordinator issues.

### `policyengine_us_data.release_promotion.published_index.build_published_artifact_index`

```python
def build_published_artifact_index(*, candidate_bundle: ReleaseCandidateInputBundle, promotion_result: FullPromotionResult, release_manifest: Mapping[str, Any] | None = None, diagnostic_artifacts: Sequence[ArtifactRef] = ()) -> tuple[PublishedArtifactIndexRow, ...]
```

Build deterministic published artifact rows for a promoted release.

### `policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow`

```python
class PublishedArtifactIndexRow
```

One row in the Stage 5 published artifact JSONL index.

### `policyengine_us_data.release_promotion.artifacts.ReleaseArtifactSpec`

```python
Expand Down
15 changes: 15 additions & 0 deletions docs/engineering/stages/release_promotion.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,18 @@ Runtime step manifests for `5_validate_and_promote_release` should include the
contract as a JSON `contract` output. They may still record legacy validated
input artifacts for compatibility, but the contract is the preferred semantic
entry point for Stage 5 status and lineage.

## Published Artifact Index

Stage 5 also writes `published_artifact_index.jsonl` under the run-local
`diagnostics/` directory. Each JSONL row describes one promoted artifact or
release metadata artifact with its canonical `run_id`, candidate version,
release version, source-stage metadata, final Hugging Face URI, and GCS URI
when the artifact is mirrored to GCS.

Build index rows from typed release candidate and promotion-result objects, not
from console logs. Release manifest entries may supply final checksum, size,
revision, and kind fields for promoted data artifacts; the index should leave
the release manifest schema unchanged. The release promotion contract must
reference the index as a `published_artifact_index` output so dashboards and AI
systems can discover the per-artifact rows from the Stage 5 contract.
74 changes: 71 additions & 3 deletions docs/generated/pipeline_api.json
Original file line number Diff line number Diff line change
Expand Up @@ -3086,7 +3086,7 @@
"docstring": "Promote a completed pipeline run to production.\n\n1. Verify run status is \"completed\"\n2. Promote every staged artifact in one Hugging Face commit\n3. Upload/copy every artifact to GCS\n4. Finalize release_manifest.json, tag the release, and update\n version_manifest.json\n5. Update run status to \"promoted\"\n\nArgs:\n run_id: The run ID to promote.\n candidate_version: Candidate staging scope used for staged source files.\n release_version: Stable version used for final release metadata.\n\nReturns:\n Summary message.",
"id": "promote_pipeline_run",
"kind": "function",
"line": 2091,
"line": 2136,
"metadata": {
"api_refs": [
"modal_app.pipeline.promote_run"
Expand Down Expand Up @@ -3117,6 +3117,74 @@
"signature": "def promote_run(run_id: str, candidate_version: str = '', release_version: str = '') -> str",
"source_file": "modal_app/pipeline.py"
},
"published_artifact_index_builder": {
"docstring": "Build deterministic published artifact rows for a promoted release.",
"id": "published_artifact_index_builder",
"kind": "function",
"line": 247,
"metadata": {
"api_refs": [
"policyengine_us_data.release_promotion.published_index.build_published_artifact_index"
],
"artifacts_in": [
"release candidate bundle",
"typed promotion result"
],
"artifacts_out": [
"published_artifact_index.jsonl"
],
"description": "Build the Stage 5 published artifact JSONL index.",
"id": "published_artifact_index_builder",
"label": "Published Artifact Index Builder",
"node_type": "library",
"pathways": [
"5_validate_and_promote_release"
],
"source_file": "policyengine_us_data/release_promotion/published_index.py",
"stability": "moving",
"status": "transitional",
"validation_commands": [
"uv run pytest tests/unit/release_promotion/test_published_index.py"
]
},
"object_path": "policyengine_us_data.release_promotion.published_index.build_published_artifact_index",
"signature": "def build_published_artifact_index(*, candidate_bundle: ReleaseCandidateInputBundle, promotion_result: FullPromotionResult, release_manifest: Mapping[str, Any] | None = None, diagnostic_artifacts: Sequence[ArtifactRef] = ()) -> tuple[PublishedArtifactIndexRow, ...]",
"source_file": "policyengine_us_data/release_promotion/published_index.py"
},
"published_artifact_index_row": {
"docstring": "One row in the Stage 5 published artifact JSONL index.",
"id": "published_artifact_index_row",
"kind": "class",
"line": 95,
"metadata": {
"api_refs": [
"policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow"
],
"artifacts_in": [
"release candidate bundle",
"release manifest"
],
"artifacts_out": [
"published_artifact_index.jsonl"
],
"description": "One published HF/GCS artifact row emitted by Stage 5.",
"id": "published_artifact_index_row",
"label": "PublishedArtifactIndexRow",
"node_type": "library",
"pathways": [
"5_validate_and_promote_release"
],
"source_file": "policyengine_us_data/release_promotion/published_index.py",
"stability": "moving",
"status": "transitional",
"validation_commands": [
"uv run pytest tests/unit/release_promotion/test_published_index.py"
]
},
"object_path": "policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow",
"signature": "class PublishedArtifactIndexRow",
"source_file": "policyengine_us_data/release_promotion/published_index.py"
},
"puf_qrf_pass": {
"docstring": "Run QRF imputation for PUF variables.\n\nStratified-subsamples PUF records (top 0.5% by AGI kept,\nrest randomly sampled to ~20K total), trains QRF, and\npredicts on CPS data.\n\nArgs:\n data: CPS data dict.\n time_period: Tax year.\n puf_dataset: PUF dataset class or path.\n dataset_path: Path to CPS h5 for computing\n demographic predictors via Microsimulation.\n\nReturns:\n Tuple of (y_full_imputations, y_override_imputations)\n as dicts of {variable: np.ndarray}.",
"id": "puf_qrf_pass",
Expand Down Expand Up @@ -3541,7 +3609,7 @@
"docstring": "Run the full pipeline end-to-end.\n\nArgs:\n branch: Git branch to build from.\n gpu: GPU type for regional calibration.\n epochs: Training epochs for regional calibration.\n national_gpu: GPU type for national calibration.\n national_epochs: Training epochs for national.\n num_workers: Number of parallel H5 workers.\n n_clones: Number of clones for H5 building.\n skip_national: Skip national calibration/H5.\n resume_run_id: Resume a previously failed run.\n clear_checkpoints: Wipe ALL checkpoints before building\n (default False). Normally not needed \u2014 checkpoints are\n scoped by commit SHA, so stale ones from other commits\n are cleaned automatically. Use True only to force a\n full rebuild of the current commit.\n candidate_version: Candidate staging scope used for HF staging.\n release_version: Final stable release version. Usually empty until\n promotion.\n base_release_version: Stable release current when this candidate was\n built.\n release_bump: Intended SemVer bump for this candidate.\n sha_override: Exact source SHA deployed by GitHub Actions. When\n provided, this is recorded instead of reading the current\n branch tip.\n run_id: Cross-system run ID created by GitHub.\n run_context: Serialized run context from the launcher workflow.\n modal_app_name: Deployed Modal app name for this run.\n modal_environment: Modal environment used for this run.\n chunked_matrix: Build the calibration matrix in clone-household\n chunks instead of the non-chunked path. Opt-in; default off.\n chunk_size: Clone-household columns per chunk when\n ``chunked_matrix`` is True.\n parallel_matrix: Fan chunked matrix building across Modal\n workers via ``build_matrix_chunk_worker``. Only meaningful\n when ``chunked_matrix`` is True; ignored otherwise.\n num_matrix_workers: Number of Modal workers when\n ``parallel_matrix`` is True.\n\nReturns:\n The run ID for use with promote.",
"id": "run_modal_pipeline",
"kind": "function",
"line": 1113,
"line": 1158,
"metadata": {
"api_refs": [
"modal_app.pipeline.run_pipeline"
Expand Down Expand Up @@ -4479,7 +4547,7 @@
"docstring": "Verify deployed-image imports and subprocess seams.",
"id": "verify_runtime_seams",
"kind": "function",
"line": 739,
"line": 784,
"metadata": {
"api_refs": [
"modal_app.pipeline.verify_runtime_seams"
Expand Down
54 changes: 52 additions & 2 deletions docs/generated/pipeline_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,56 @@
"uv run pytest tests/unit/build_outputs/test_worker_responses.py"
]
},
{
"api_refs": [
"policyengine_us_data.release_promotion.published_index.build_published_artifact_index"
],
"artifacts_in": [
"release candidate bundle",
"typed promotion result"
],
"artifacts_out": [
"published_artifact_index.jsonl"
],
"description": "Build the Stage 5 published artifact JSONL index.",
"id": "published_artifact_index_builder",
"label": "Published Artifact Index Builder",
"node_type": "library",
"pathways": [
"5_validate_and_promote_release"
],
"source_file": "policyengine_us_data/release_promotion/published_index.py",
"stability": "moving",
"status": "transitional",
"validation_commands": [
"uv run pytest tests/unit/release_promotion/test_published_index.py"
]
},
{
"api_refs": [
"policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow"
],
"artifacts_in": [
"release candidate bundle",
"release manifest"
],
"artifacts_out": [
"published_artifact_index.jsonl"
],
"description": "One published HF/GCS artifact row emitted by Stage 5.",
"id": "published_artifact_index_row",
"label": "PublishedArtifactIndexRow",
"node_type": "library",
"pathways": [
"5_validate_and_promote_release"
],
"source_file": "policyengine_us_data/release_promotion/published_index.py",
"stability": "moving",
"status": "transitional",
"validation_commands": [
"uv run pytest tests/unit/release_promotion/test_published_index.py"
]
},
{
"api_refs": [
"policyengine_us_data.release_promotion.artifacts.ReleaseArtifactSpec"
Expand Down Expand Up @@ -1996,9 +2046,9 @@
}
],
"metadata": {
"api_node_count": 96,
"api_node_count": 98,
"canonical_stage_count": 5,
"decorated_object_count": 156,
"decorated_object_count": 158,
"mapped_decorated_node_count": 60,
"stage_count": 17,
"substage_count": 17
Expand Down
65 changes: 55 additions & 10 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,19 +671,26 @@ def _write_release_promotion_contract_for_run(
run_context: RunContext,
rel_paths: list[str],
promotion_result,
) -> ArtifactReference:
"""Write Stage 5's run-local contract and return its manifest reference."""
) -> tuple[ArtifactReference, ...]:
"""Write Stage 5's run-local index/contract and return manifest references."""

from policyengine_us_data.release_promotion import (
build_legacy_release_candidate_bundle,
build_published_artifact_index,
published_artifact_index_artifact_ref,
published_artifact_index_path,
release_promotion_contract_repo_path,
release_promotion_contract_path,
write_published_artifact_index,
write_release_promotion_contract,
)
from policyengine_us_data.stage_contracts import ArtifactRef

run_dir = _run_dir(run_context.run_id)
release_context = _release_promotion_context_from_run_context(run_context)
contract_path = release_promotion_contract_path(run_dir)
candidate_bundle = build_legacy_release_candidate_bundle(
context=_release_promotion_context_from_run_context(run_context),
context=release_context,
rel_paths=rel_paths,
artifact_metadata_by_path=_release_artifact_metadata_by_path(
run_context.run_id,
Expand All @@ -699,23 +706,61 @@ def _write_release_promotion_contract_for_run(
run_context.run_id
),
)
contract_artifact = ArtifactRef(
logical_name="release_promotion_contract",
uri=(
f"hf://{release_context.hf_repo_name}/"
f"{release_promotion_contract_repo_path(release_context.run_id)}"
),
media_type="application/json",
metadata={
"artifact_family": "stage_contract",
"source_stage_id": "5_validate_and_promote_release",
"relative_path": release_promotion_contract_repo_path(
release_context.run_id
),
},
)
published_index_path = published_artifact_index_path(run_dir)
published_index_rows = build_published_artifact_index(
candidate_bundle=candidate_bundle,
promotion_result=promotion_result,
diagnostic_artifacts=(contract_artifact,),
)
write_published_artifact_index(published_index_rows, published_index_path)
published_index_manifest_ref = ArtifactReference.from_path(
published_index_path,
role="index",
base_dir=run_dir,
media_type="application/jsonl",
)
published_index_artifact = published_artifact_index_artifact_ref(
release_context,
row_count=len(published_index_rows),
sha256=f"sha256:{published_index_manifest_ref.sha256}",
size_bytes=published_index_manifest_ref.size_bytes,
)
write_release_promotion_contract(
contract_path=contract_path,
candidate_bundle=candidate_bundle,
promotion_result=promotion_result,
created_at=datetime.now(timezone.utc).isoformat(),
code_sha=meta.sha,
package_version=meta.version,
published_artifact_index=published_index_artifact,
metadata={
"writer": "modal_app.pipeline.promote_run",
"branch": meta.branch,
},
)
return ArtifactReference.from_path(
contract_path,
role="contract",
base_dir=run_dir,
media_type="application/json",
return (
ArtifactReference.from_path(
contract_path,
role="contract",
base_dir=run_dir,
media_type="application/json",
),
published_index_manifest_ref,
)


Expand Down Expand Up @@ -2221,7 +2266,7 @@ def promote_run(
)
print(f" {promotion_stdout}")
promotion_result = _promotion_result_from_stdout(promotion_stdout)
release_promotion_contract_ref = _write_release_promotion_contract_for_run(
release_promotion_refs = _write_release_promotion_contract_for_run(
meta=meta,
run_context=promotion_context,
rel_paths=rel_paths,
Expand All @@ -2239,7 +2284,7 @@ def promote_run(
ArtifactReference.from_dict(artifact)
for artifact in promote_inputs["validated_step_outputs"]
],
release_promotion_contract_ref,
*release_promotion_refs,
],
reuse_decision="computed",
vol=pipeline_volume,
Expand Down
24 changes: 24 additions & 0 deletions policyengine_us_data/release_promotion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@
release_promotion_contract_repo_path,
write_release_promotion_contract,
)
from .published_index import (
PUBLISHED_ARTIFACT_INDEX_FILENAME,
PUBLISHED_ARTIFACT_INDEX_MEDIA_TYPE,
PublishedArtifactIndexRow,
build_published_artifact_index,
published_artifact_index_artifact_ref,
published_artifact_index_from_jsonl,
published_artifact_index_path,
published_artifact_index_repo_path,
published_artifact_index_to_jsonl,
read_published_artifact_index,
write_published_artifact_index,
)
from .results import (
CleanupPromotionResult,
CompletionMarkerPromotionResult,
Expand Down Expand Up @@ -60,11 +73,14 @@
"RELEASE_VALIDATION_SUBSTAGE_ID",
"RELEASE_PROMOTION_CONTRACT_FILENAME",
"RELEASE_PROMOTION_CONTRACT_TYPE",
"PUBLISHED_ARTIFACT_INDEX_FILENAME",
"PUBLISHED_ARTIFACT_INDEX_MEDIA_TYPE",
"CleanupPromotionResult",
"CompletionMarkerPromotionResult",
"FullPromotionResult",
"GcsPromotionResult",
"HuggingFacePromotionResult",
"PublishedArtifactIndexRow",
"ReleaseArtifactSpec",
"ReleaseCandidateInputBundle",
"ReleasePromotionContractBuilder",
Expand All @@ -76,6 +92,7 @@
"VALIDATION_REPORT_POLICY_PRESENCE_ONLY",
"VALIDATION_REPORT_POLICY_REQUIRE_PASSING",
"build_legacy_release_candidate_bundle",
"build_published_artifact_index",
"build_release_promotion_contract",
"build_release_candidate_bundle_from_stage4_contract",
"build_release_candidate_shape_report",
Expand All @@ -86,9 +103,16 @@
"logical_name_for_release_path",
"normalize_release_path",
"parse_full_promotion_result_json",
"published_artifact_index_artifact_ref",
"published_artifact_index_from_jsonl",
"published_artifact_index_path",
"published_artifact_index_repo_path",
"published_artifact_index_to_jsonl",
"release_promotion_contract_path",
"release_promotion_contract_repo_path",
"read_published_artifact_index",
"read_stage4_release_candidate_bundle",
"strip_staging_prefix",
"write_published_artifact_index",
"write_release_promotion_contract",
]
Loading