From f9c90ed29f52251d7f1ff83d0a09ddef1a20aec2 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 19 May 2026 21:51:08 +0200 Subject: [PATCH] Add Stage 5 published artifact index --- changelog.d/1044.added | 1 + docs/engineering/pipeline-map.md | 16 + docs/engineering/stages/release_promotion.md | 15 + docs/generated/pipeline_api.json | 74 ++- docs/generated/pipeline_map.json | 54 +- modal_app/pipeline.py | 65 ++- .../release_promotion/__init__.py | 24 + .../release_promotion/contract.py | 68 ++- .../release_promotion/published_index.py | 539 ++++++++++++++++++ tests/unit/release_promotion/test_contract.py | 15 + .../release_promotion/test_published_index.py | 286 ++++++++++ tests/unit/test_pipeline_source_contracts.py | 7 +- 12 files changed, 1137 insertions(+), 27 deletions(-) create mode 100644 changelog.d/1044.added create mode 100644 policyengine_us_data/release_promotion/published_index.py create mode 100644 tests/unit/release_promotion/test_published_index.py diff --git a/changelog.d/1044.added b/changelog.d/1044.added new file mode 100644 index 000000000..f9c04d62a --- /dev/null +++ b/changelog.d/1044.added @@ -0,0 +1 @@ +Added the Stage 5 published artifact index JSONL artifact. diff --git a/docs/engineering/pipeline-map.md b/docs/engineering/pipeline-map.md index b1f630718..11be1a508 100644 --- a/docs/engineering/pipeline-map.md +++ b/docs/engineering/pipeline-map.md @@ -1364,6 +1364,22 @@ def normalize_worker_response(*, worker_index: int, result: object) -> Coordinat Normalize worker JSON into explicit fatal and nonfatal coordinator issues. +### `policyengine_us_data.release_promotion.published_index.build_published_artifact_index` + +```python +def build_published_artifact_index(*, candidate_bundle: ReleaseCandidateInputBundle, promotion_result: FullPromotionResult, release_manifest: Mapping[str, Any] | None = None, diagnostic_artifacts: Sequence[ArtifactRef] = ()) -> tuple[PublishedArtifactIndexRow, ...] +``` + +Build deterministic published artifact rows for a promoted release. + +### `policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow` + +```python +class PublishedArtifactIndexRow +``` + +One row in the Stage 5 published artifact JSONL index. + ### `policyengine_us_data.release_promotion.artifacts.ReleaseArtifactSpec` ```python diff --git a/docs/engineering/stages/release_promotion.md b/docs/engineering/stages/release_promotion.md index 0276634bd..464e16e19 100644 --- a/docs/engineering/stages/release_promotion.md +++ b/docs/engineering/stages/release_promotion.md @@ -147,3 +147,18 @@ Runtime step manifests for `5_validate_and_promote_release` should include the contract as a JSON `contract` output. They may still record legacy validated input artifacts for compatibility, but the contract is the preferred semantic entry point for Stage 5 status and lineage. + +## Published Artifact Index + +Stage 5 also writes `published_artifact_index.jsonl` under the run-local +`diagnostics/` directory. Each JSONL row describes one promoted artifact or +release metadata artifact with its canonical `run_id`, candidate version, +release version, source-stage metadata, final Hugging Face URI, and GCS URI +when the artifact is mirrored to GCS. + +Build index rows from typed release candidate and promotion-result objects, not +from console logs. Release manifest entries may supply final checksum, size, +revision, and kind fields for promoted data artifacts; the index should leave +the release manifest schema unchanged. The release promotion contract must +reference the index as a `published_artifact_index` output so dashboards and AI +systems can discover the per-artifact rows from the Stage 5 contract. diff --git a/docs/generated/pipeline_api.json b/docs/generated/pipeline_api.json index d9905e140..5cd6f4458 100644 --- a/docs/generated/pipeline_api.json +++ b/docs/generated/pipeline_api.json @@ -3086,7 +3086,7 @@ "docstring": "Promote a completed pipeline run to production.\n\n1. Verify run status is \"completed\"\n2. Promote every staged artifact in one Hugging Face commit\n3. Upload/copy every artifact to GCS\n4. Finalize release_manifest.json, tag the release, and update\n version_manifest.json\n5. Update run status to \"promoted\"\n\nArgs:\n run_id: The run ID to promote.\n candidate_version: Candidate staging scope used for staged source files.\n release_version: Stable version used for final release metadata.\n\nReturns:\n Summary message.", "id": "promote_pipeline_run", "kind": "function", - "line": 2091, + "line": 2136, "metadata": { "api_refs": [ "modal_app.pipeline.promote_run" @@ -3117,6 +3117,74 @@ "signature": "def promote_run(run_id: str, candidate_version: str = '', release_version: str = '') -> str", "source_file": "modal_app/pipeline.py" }, + "published_artifact_index_builder": { + "docstring": "Build deterministic published artifact rows for a promoted release.", + "id": "published_artifact_index_builder", + "kind": "function", + "line": 247, + "metadata": { + "api_refs": [ + "policyengine_us_data.release_promotion.published_index.build_published_artifact_index" + ], + "artifacts_in": [ + "release candidate bundle", + "typed promotion result" + ], + "artifacts_out": [ + "published_artifact_index.jsonl" + ], + "description": "Build the Stage 5 published artifact JSONL index.", + "id": "published_artifact_index_builder", + "label": "Published Artifact Index Builder", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/published_index.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_published_index.py" + ] + }, + "object_path": "policyengine_us_data.release_promotion.published_index.build_published_artifact_index", + "signature": "def build_published_artifact_index(*, candidate_bundle: ReleaseCandidateInputBundle, promotion_result: FullPromotionResult, release_manifest: Mapping[str, Any] | None = None, diagnostic_artifacts: Sequence[ArtifactRef] = ()) -> tuple[PublishedArtifactIndexRow, ...]", + "source_file": "policyengine_us_data/release_promotion/published_index.py" + }, + "published_artifact_index_row": { + "docstring": "One row in the Stage 5 published artifact JSONL index.", + "id": "published_artifact_index_row", + "kind": "class", + "line": 95, + "metadata": { + "api_refs": [ + "policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow" + ], + "artifacts_in": [ + "release candidate bundle", + "release manifest" + ], + "artifacts_out": [ + "published_artifact_index.jsonl" + ], + "description": "One published HF/GCS artifact row emitted by Stage 5.", + "id": "published_artifact_index_row", + "label": "PublishedArtifactIndexRow", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/published_index.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_published_index.py" + ] + }, + "object_path": "policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow", + "signature": "class PublishedArtifactIndexRow", + "source_file": "policyengine_us_data/release_promotion/published_index.py" + }, "puf_qrf_pass": { "docstring": "Run QRF imputation for PUF variables.\n\nStratified-subsamples PUF records (top 0.5% by AGI kept,\nrest randomly sampled to ~20K total), trains QRF, and\npredicts on CPS data.\n\nArgs:\n data: CPS data dict.\n time_period: Tax year.\n puf_dataset: PUF dataset class or path.\n dataset_path: Path to CPS h5 for computing\n demographic predictors via Microsimulation.\n\nReturns:\n Tuple of (y_full_imputations, y_override_imputations)\n as dicts of {variable: np.ndarray}.", "id": "puf_qrf_pass", @@ -3541,7 +3609,7 @@ "docstring": "Run the full pipeline end-to-end.\n\nArgs:\n branch: Git branch to build from.\n gpu: GPU type for regional calibration.\n epochs: Training epochs for regional calibration.\n national_gpu: GPU type for national calibration.\n national_epochs: Training epochs for national.\n num_workers: Number of parallel H5 workers.\n n_clones: Number of clones for H5 building.\n skip_national: Skip national calibration/H5.\n resume_run_id: Resume a previously failed run.\n clear_checkpoints: Wipe ALL checkpoints before building\n (default False). Normally not needed \u2014 checkpoints are\n scoped by commit SHA, so stale ones from other commits\n are cleaned automatically. Use True only to force a\n full rebuild of the current commit.\n candidate_version: Candidate staging scope used for HF staging.\n release_version: Final stable release version. Usually empty until\n promotion.\n base_release_version: Stable release current when this candidate was\n built.\n release_bump: Intended SemVer bump for this candidate.\n sha_override: Exact source SHA deployed by GitHub Actions. When\n provided, this is recorded instead of reading the current\n branch tip.\n run_id: Cross-system run ID created by GitHub.\n run_context: Serialized run context from the launcher workflow.\n modal_app_name: Deployed Modal app name for this run.\n modal_environment: Modal environment used for this run.\n chunked_matrix: Build the calibration matrix in clone-household\n chunks instead of the non-chunked path. Opt-in; default off.\n chunk_size: Clone-household columns per chunk when\n ``chunked_matrix`` is True.\n parallel_matrix: Fan chunked matrix building across Modal\n workers via ``build_matrix_chunk_worker``. Only meaningful\n when ``chunked_matrix`` is True; ignored otherwise.\n num_matrix_workers: Number of Modal workers when\n ``parallel_matrix`` is True.\n\nReturns:\n The run ID for use with promote.", "id": "run_modal_pipeline", "kind": "function", - "line": 1113, + "line": 1158, "metadata": { "api_refs": [ "modal_app.pipeline.run_pipeline" @@ -4479,7 +4547,7 @@ "docstring": "Verify deployed-image imports and subprocess seams.", "id": "verify_runtime_seams", "kind": "function", - "line": 739, + "line": 784, "metadata": { "api_refs": [ "modal_app.pipeline.verify_runtime_seams" diff --git a/docs/generated/pipeline_map.json b/docs/generated/pipeline_map.json index ea6f4fb0e..e86bd8a2e 100644 --- a/docs/generated/pipeline_map.json +++ b/docs/generated/pipeline_map.json @@ -1407,6 +1407,56 @@ "uv run pytest tests/unit/build_outputs/test_worker_responses.py" ] }, + { + "api_refs": [ + "policyengine_us_data.release_promotion.published_index.build_published_artifact_index" + ], + "artifacts_in": [ + "release candidate bundle", + "typed promotion result" + ], + "artifacts_out": [ + "published_artifact_index.jsonl" + ], + "description": "Build the Stage 5 published artifact JSONL index.", + "id": "published_artifact_index_builder", + "label": "Published Artifact Index Builder", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/published_index.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_published_index.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.release_promotion.published_index.PublishedArtifactIndexRow" + ], + "artifacts_in": [ + "release candidate bundle", + "release manifest" + ], + "artifacts_out": [ + "published_artifact_index.jsonl" + ], + "description": "One published HF/GCS artifact row emitted by Stage 5.", + "id": "published_artifact_index_row", + "label": "PublishedArtifactIndexRow", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/published_index.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_published_index.py" + ] + }, { "api_refs": [ "policyengine_us_data.release_promotion.artifacts.ReleaseArtifactSpec" @@ -1996,9 +2046,9 @@ } ], "metadata": { - "api_node_count": 96, + "api_node_count": 98, "canonical_stage_count": 5, - "decorated_object_count": 156, + "decorated_object_count": 158, "mapped_decorated_node_count": 60, "stage_count": 17, "substage_count": 17 diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index 266892ac4..d498eea7a 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -671,19 +671,26 @@ def _write_release_promotion_contract_for_run( run_context: RunContext, rel_paths: list[str], promotion_result, -) -> ArtifactReference: - """Write Stage 5's run-local contract and return its manifest reference.""" +) -> tuple[ArtifactReference, ...]: + """Write Stage 5's run-local index/contract and return manifest references.""" from policyengine_us_data.release_promotion import ( build_legacy_release_candidate_bundle, + build_published_artifact_index, + published_artifact_index_artifact_ref, + published_artifact_index_path, + release_promotion_contract_repo_path, release_promotion_contract_path, + write_published_artifact_index, write_release_promotion_contract, ) + from policyengine_us_data.stage_contracts import ArtifactRef run_dir = _run_dir(run_context.run_id) + release_context = _release_promotion_context_from_run_context(run_context) contract_path = release_promotion_contract_path(run_dir) candidate_bundle = build_legacy_release_candidate_bundle( - context=_release_promotion_context_from_run_context(run_context), + context=release_context, rel_paths=rel_paths, artifact_metadata_by_path=_release_artifact_metadata_by_path( run_context.run_id, @@ -699,6 +706,40 @@ def _write_release_promotion_contract_for_run( run_context.run_id ), ) + contract_artifact = ArtifactRef( + logical_name="release_promotion_contract", + uri=( + f"hf://{release_context.hf_repo_name}/" + f"{release_promotion_contract_repo_path(release_context.run_id)}" + ), + media_type="application/json", + metadata={ + "artifact_family": "stage_contract", + "source_stage_id": "5_validate_and_promote_release", + "relative_path": release_promotion_contract_repo_path( + release_context.run_id + ), + }, + ) + published_index_path = published_artifact_index_path(run_dir) + published_index_rows = build_published_artifact_index( + candidate_bundle=candidate_bundle, + promotion_result=promotion_result, + diagnostic_artifacts=(contract_artifact,), + ) + write_published_artifact_index(published_index_rows, published_index_path) + published_index_manifest_ref = ArtifactReference.from_path( + published_index_path, + role="index", + base_dir=run_dir, + media_type="application/jsonl", + ) + published_index_artifact = published_artifact_index_artifact_ref( + release_context, + row_count=len(published_index_rows), + sha256=f"sha256:{published_index_manifest_ref.sha256}", + size_bytes=published_index_manifest_ref.size_bytes, + ) write_release_promotion_contract( contract_path=contract_path, candidate_bundle=candidate_bundle, @@ -706,16 +747,20 @@ def _write_release_promotion_contract_for_run( created_at=datetime.now(timezone.utc).isoformat(), code_sha=meta.sha, package_version=meta.version, + published_artifact_index=published_index_artifact, metadata={ "writer": "modal_app.pipeline.promote_run", "branch": meta.branch, }, ) - return ArtifactReference.from_path( - contract_path, - role="contract", - base_dir=run_dir, - media_type="application/json", + return ( + ArtifactReference.from_path( + contract_path, + role="contract", + base_dir=run_dir, + media_type="application/json", + ), + published_index_manifest_ref, ) @@ -2221,7 +2266,7 @@ def promote_run( ) print(f" {promotion_stdout}") promotion_result = _promotion_result_from_stdout(promotion_stdout) - release_promotion_contract_ref = _write_release_promotion_contract_for_run( + release_promotion_refs = _write_release_promotion_contract_for_run( meta=meta, run_context=promotion_context, rel_paths=rel_paths, @@ -2239,7 +2284,7 @@ def promote_run( ArtifactReference.from_dict(artifact) for artifact in promote_inputs["validated_step_outputs"] ], - release_promotion_contract_ref, + *release_promotion_refs, ], reuse_decision="computed", vol=pipeline_volume, diff --git a/policyengine_us_data/release_promotion/__init__.py b/policyengine_us_data/release_promotion/__init__.py index 20c0c9124..db8a73e05 100644 --- a/policyengine_us_data/release_promotion/__init__.py +++ b/policyengine_us_data/release_promotion/__init__.py @@ -33,6 +33,19 @@ release_promotion_contract_repo_path, write_release_promotion_contract, ) +from .published_index import ( + PUBLISHED_ARTIFACT_INDEX_FILENAME, + PUBLISHED_ARTIFACT_INDEX_MEDIA_TYPE, + PublishedArtifactIndexRow, + build_published_artifact_index, + published_artifact_index_artifact_ref, + published_artifact_index_from_jsonl, + published_artifact_index_path, + published_artifact_index_repo_path, + published_artifact_index_to_jsonl, + read_published_artifact_index, + write_published_artifact_index, +) from .results import ( CleanupPromotionResult, CompletionMarkerPromotionResult, @@ -60,11 +73,14 @@ "RELEASE_VALIDATION_SUBSTAGE_ID", "RELEASE_PROMOTION_CONTRACT_FILENAME", "RELEASE_PROMOTION_CONTRACT_TYPE", + "PUBLISHED_ARTIFACT_INDEX_FILENAME", + "PUBLISHED_ARTIFACT_INDEX_MEDIA_TYPE", "CleanupPromotionResult", "CompletionMarkerPromotionResult", "FullPromotionResult", "GcsPromotionResult", "HuggingFacePromotionResult", + "PublishedArtifactIndexRow", "ReleaseArtifactSpec", "ReleaseCandidateInputBundle", "ReleasePromotionContractBuilder", @@ -76,6 +92,7 @@ "VALIDATION_REPORT_POLICY_PRESENCE_ONLY", "VALIDATION_REPORT_POLICY_REQUIRE_PASSING", "build_legacy_release_candidate_bundle", + "build_published_artifact_index", "build_release_promotion_contract", "build_release_candidate_bundle_from_stage4_contract", "build_release_candidate_shape_report", @@ -86,9 +103,16 @@ "logical_name_for_release_path", "normalize_release_path", "parse_full_promotion_result_json", + "published_artifact_index_artifact_ref", + "published_artifact_index_from_jsonl", + "published_artifact_index_path", + "published_artifact_index_repo_path", + "published_artifact_index_to_jsonl", "release_promotion_contract_path", "release_promotion_contract_repo_path", + "read_published_artifact_index", "read_stage4_release_candidate_bundle", "strip_staging_prefix", + "write_published_artifact_index", "write_release_promotion_contract", ] diff --git a/policyengine_us_data/release_promotion/contract.py b/policyengine_us_data/release_promotion/contract.py index 1a7cdc112..6d50076dd 100644 --- a/policyengine_us_data/release_promotion/contract.py +++ b/policyengine_us_data/release_promotion/contract.py @@ -78,6 +78,7 @@ class ReleasePromotionContractBuilder: package_version: str | None = None validation: ValidationReport | None = None diagnostics: Sequence[DiagnosticRef] = () + published_artifact_index: ArtifactRef | None = None metadata: Mapping[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: @@ -85,6 +86,10 @@ def __post_init__(self) -> None: raise ValueError("candidate_bundle must be ReleaseCandidateInputBundle") if not isinstance(self.promotion_result, FullPromotionResult): raise ValueError("promotion_result must be FullPromotionResult") + if self.published_artifact_index is not None and not isinstance( + self.published_artifact_index, ArtifactRef + ): + raise ValueError("published_artifact_index must be ArtifactRef") object.__setattr__( self, "diagnostics", @@ -100,10 +105,14 @@ def build(self) -> StageContract: context = self.candidate_bundle.context inputs = _contract_inputs(self.candidate_bundle) - outputs = _contract_outputs(self.promotion_result) + outputs = _contract_outputs( + self.promotion_result, + published_artifact_index=self.published_artifact_index, + ) parameters = _contract_parameters( self.candidate_bundle, self.promotion_result, + published_artifact_index=self.published_artifact_index, ) return StageContract( contract_type=RELEASE_PROMOTION_CONTRACT_TYPE, @@ -122,6 +131,11 @@ def build(self) -> StageContract: "context": context.to_dict(), "candidate_bundle": self.candidate_bundle.to_dict(), "promotion_result": self.promotion_result.to_dict(), + "published_artifact_index": ( + self.published_artifact_index.to_dict() + if self.published_artifact_index is not None + else None + ), "outputs": [output.to_dict() for output in outputs], } ), @@ -152,6 +166,7 @@ def build_release_promotion_contract( package_version: str | None = None, validation: ValidationReport | None = None, diagnostics: Sequence[DiagnosticRef] = (), + published_artifact_index: ArtifactRef | None = None, metadata: Mapping[str, Any] | None = None, ) -> StageContract: """Build the Stage 5 release promotion contract.""" @@ -164,6 +179,7 @@ def build_release_promotion_contract( package_version=package_version, validation=validation, diagnostics=diagnostics, + published_artifact_index=published_artifact_index, metadata=metadata or {}, ).build() @@ -178,6 +194,7 @@ def write_release_promotion_contract( package_version: str | None = None, validation: ValidationReport | None = None, diagnostics: Sequence[DiagnosticRef] = (), + published_artifact_index: ArtifactRef | None = None, metadata: Mapping[str, Any] | None = None, ) -> StageContract: """Build, write, and return the Stage 5 release promotion contract.""" @@ -190,6 +207,7 @@ def write_release_promotion_contract( package_version=package_version, validation=validation, diagnostics=diagnostics, + published_artifact_index=published_artifact_index, metadata=metadata, ) write_contract(contract, contract_path) @@ -281,9 +299,13 @@ def _diagnostic_media_type(path: str) -> str: return "application/json" -def _contract_outputs(result: FullPromotionResult) -> tuple[ArtifactRef, ...]: +def _contract_outputs( + result: FullPromotionResult, + *, + published_artifact_index: ArtifactRef | None = None, +) -> tuple[ArtifactRef, ...]: hf_base = f"hf://{result.hf.repo_name}" - return ( + outputs = ( ArtifactRef( logical_name="huggingface_release_artifacts", uri=f"{hf_base}/", @@ -370,6 +392,9 @@ def _contract_outputs(result: FullPromotionResult) -> tuple[ArtifactRef, ...]: metadata={"artifact_family": "release_completion_marker"}, ), ) + if published_artifact_index is not None: + outputs = (*outputs, published_artifact_index) + return outputs def _hf_artifact_uri(repo_name: str, repo_path: str) -> str: @@ -381,6 +406,8 @@ def _hf_artifact_uri(repo_name: str, repo_path: str) -> str: def _contract_parameters( candidate_bundle: ReleaseCandidateInputBundle, result: FullPromotionResult, + *, + published_artifact_index: ArtifactRef | None = None, ) -> dict[str, Any]: context = candidate_bundle.context return { @@ -400,6 +427,9 @@ def _contract_parameters( "source_output_contract_path": candidate_bundle.source_output_contract_path, "validation_report_paths": list(candidate_bundle.validation_report_paths), "diagnostics_manifest_path": candidate_bundle.diagnostics_manifest_path, + "published_artifact_index_path": _artifact_relative_path( + published_artifact_index + ), } @@ -411,6 +441,7 @@ def _contract_metadata( outputs: Sequence[ArtifactRef], extra: Mapping[str, Any], ) -> dict[str, Any]: + outputs_by_name = {output.logical_name: output for output in outputs} return { **dict(extra), "contract_file": RELEASE_PROMOTION_CONTRACT_FILENAME, @@ -420,10 +451,22 @@ def _contract_metadata( "cleanup": promotion_result.cleanup.to_dict(), "already_finalized": promotion_result.already_finalized, "promotion_result": promotion_result.to_dict(), + "published_artifact_index": ( + outputs_by_name["published_artifact_index"].to_dict() + if "published_artifact_index" in outputs_by_name + else None + ), "public_refs": {output.logical_name: output.uri for output in outputs}, } +def _artifact_relative_path(artifact: ArtifactRef | None) -> str | None: + if artifact is None: + return None + relative_path = artifact.metadata.get("relative_path") + return relative_path if isinstance(relative_path, str) and relative_path else None + + def _execution_record(result: FullPromotionResult) -> ExecutionRecord: return ExecutionRecord( status="completed", @@ -448,6 +491,16 @@ def _substage_records( promotion_result: FullPromotionResult, ) -> tuple[SubstageRecord, ...]: outputs_by_name = {artifact.logical_name: artifact for artifact in public_outputs} + finalization_outputs = [ + outputs_by_name["release_manifest"], + outputs_by_name["versioned_release_manifest"], + outputs_by_name["trace_tro"], + outputs_by_name["versioned_trace_tro"], + outputs_by_name["version_manifest"], + outputs_by_name["release_completion_marker"], + ] + if "published_artifact_index" in outputs_by_name: + finalization_outputs.append(outputs_by_name["published_artifact_index"]) return ( SubstageRecord( substage_id="5a_validate_outputs", @@ -479,14 +532,7 @@ def _substage_records( SubstageRecord( substage_id="5d_write_version_manifest", status="completed", - outputs=( - outputs_by_name["release_manifest"], - outputs_by_name["versioned_release_manifest"], - outputs_by_name["trace_tro"], - outputs_by_name["versioned_trace_tro"], - outputs_by_name["version_manifest"], - outputs_by_name["release_completion_marker"], - ), + outputs=tuple(finalization_outputs), reuse_mode="handoff", metadata={ "version_manifest_updated": promotion_result.version_manifest.updated, diff --git a/policyengine_us_data/release_promotion/published_index.py b/policyengine_us_data/release_promotion/published_index.py new file mode 100644 index 000000000..c8ed9b78c --- /dev/null +++ b/policyengine_us_data/release_promotion/published_index.py @@ -0,0 +1,539 @@ +"""Published artifact index rows for Stage 5 release promotion.""" + +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from pathlib import Path, PurePosixPath +from typing import Any + +from policyengine_us_data.pipeline_metadata import pipeline_node +from policyengine_us_data.stage_contracts import ArtifactRef +from policyengine_us_data.stage_contracts._coercion import ( + freeze_mapping, + jsonable_value, + mapping_value, + optional_int_value, + optional_string, + optional_string_value, + require_non_empty, + required_string, + schema_version, + validate_optional_int, + validate_schema_version, +) +from policyengine_us_data.stage_contracts.constants import CONTRACT_SCHEMA_VERSION +from policyengine_us_data.stage_contracts.stages import ( + STAGE_5_VALIDATE_AND_PROMOTE_RELEASE, +) +from policyengine_us_data.utils.canonical_json import canonical_json_dumps + +from .candidate import ReleaseCandidateInputBundle +from .context import ReleasePromotionContext +from .results import FullPromotionResult + +PUBLISHED_ARTIFACT_INDEX_FILENAME = "published_artifact_index.jsonl" +PUBLISHED_ARTIFACT_INDEX_MEDIA_TYPE = "application/jsonl" + + +def published_artifact_index_repo_path(run_id: str) -> str: + """Return the run-scoped repository path for the published artifact index.""" + + return f"calibration/runs/{run_id}/diagnostics/{PUBLISHED_ARTIFACT_INDEX_FILENAME}" + + +def published_artifact_index_path(run_dir: str | Path) -> Path: + """Return the run-local diagnostics path for the published artifact index.""" + + return Path(run_dir) / "diagnostics" / PUBLISHED_ARTIFACT_INDEX_FILENAME + + +def published_artifact_index_artifact_ref( + context: ReleasePromotionContext, + *, + row_count: int | None = None, + sha256: str | None = None, + size_bytes: int | None = None, +) -> ArtifactRef: + """Return a stage-contract reference to the published artifact index.""" + + metadata: dict[str, Any] = { + "artifact_family": "published_artifact_index", + "source_stage_id": STAGE_5_VALIDATE_AND_PROMOTE_RELEASE, + "relative_path": published_artifact_index_repo_path(context.run_id), + } + if row_count is not None: + metadata["row_count"] = row_count + return ArtifactRef( + logical_name="published_artifact_index", + uri=( + f"hf://{context.hf_repo_name}/" + f"{published_artifact_index_repo_path(context.run_id)}" + ), + sha256=sha256, + size_bytes=size_bytes, + media_type=PUBLISHED_ARTIFACT_INDEX_MEDIA_TYPE, + metadata=metadata, + ) + + +@pipeline_node( + id="published_artifact_index_row", + label="PublishedArtifactIndexRow", + node_type="library", + description="One published HF/GCS artifact row emitted by Stage 5.", + status="transitional", + stability="moving", + pathways=["5_validate_and_promote_release"], + artifacts_in=["release candidate bundle", "release manifest"], + artifacts_out=["published_artifact_index.jsonl"], + validation_commands=[ + "uv run pytest tests/unit/release_promotion/test_published_index.py" + ], +) +@dataclass(frozen=True, kw_only=True) +class PublishedArtifactIndexRow: + """One row in the Stage 5 published artifact JSONL index.""" + + run_id: str + candidate_version: str + release_version: str + logical_name: str + relative_path: str + artifact_role: str + artifact_family: str + source_stage_id: str + hf_uri: str + gcs_uri: str | None = None + sha256: str | None = None + size_bytes: int | None = None + area_type: str | None = None + area_id: str | None = None + release_manifest_key: str | None = None + release_manifest_revision: str | None = None + metadata: Mapping[str, Any] = field(default_factory=dict) + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + for field_name in ( + "run_id", + "candidate_version", + "release_version", + "logical_name", + "relative_path", + "artifact_role", + "artifact_family", + "source_stage_id", + "hf_uri", + ): + object.__setattr__( + self, + field_name, + require_non_empty(getattr(self, field_name), field_name), + ) + object.__setattr__( + self, + "gcs_uri", + optional_string_value(self.gcs_uri, "gcs_uri"), + ) + object.__setattr__( + self, + "sha256", + optional_string_value(self.sha256, "sha256"), + ) + validate_optional_int(self.size_bytes, "size_bytes") + if self.size_bytes is not None and self.size_bytes < 0: + raise ValueError("size_bytes must be non-negative") + object.__setattr__( + self, + "area_type", + optional_string_value(self.area_type, "area_type"), + ) + object.__setattr__( + self, + "area_id", + optional_string_value(self.area_id, "area_id"), + ) + object.__setattr__( + self, + "release_manifest_key", + optional_string_value( + self.release_manifest_key, + "release_manifest_key", + ), + ) + object.__setattr__( + self, + "release_manifest_revision", + optional_string_value( + self.release_manifest_revision, + "release_manifest_revision", + ), + ) + object.__setattr__( + self, + "metadata", + freeze_mapping(self.metadata, "metadata"), + ) + + def to_dict(self) -> dict[str, Any]: + """Serialize the row to JSON-compatible primitives.""" + + return { + "run_id": self.run_id, + "candidate_version": self.candidate_version, + "release_version": self.release_version, + "logical_name": self.logical_name, + "relative_path": self.relative_path, + "artifact_role": self.artifact_role, + "artifact_family": self.artifact_family, + "source_stage_id": self.source_stage_id, + "hf_uri": self.hf_uri, + "gcs_uri": self.gcs_uri, + "sha256": self.sha256, + "size_bytes": self.size_bytes, + "area_type": self.area_type, + "area_id": self.area_id, + "release_manifest_key": self.release_manifest_key, + "release_manifest_revision": self.release_manifest_revision, + "metadata": jsonable_value(self.metadata), + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "PublishedArtifactIndexRow": + """Restore an index row from serialized data.""" + + return cls( + run_id=required_string(data, "run_id"), + candidate_version=required_string(data, "candidate_version"), + release_version=required_string(data, "release_version"), + logical_name=required_string(data, "logical_name"), + relative_path=required_string(data, "relative_path"), + artifact_role=required_string(data, "artifact_role"), + artifact_family=required_string(data, "artifact_family"), + source_stage_id=required_string(data, "source_stage_id"), + hf_uri=required_string(data, "hf_uri"), + gcs_uri=optional_string(data, "gcs_uri"), + sha256=optional_string(data, "sha256"), + size_bytes=optional_int_value(data, "size_bytes"), + area_type=optional_string(data, "area_type"), + area_id=optional_string(data, "area_id"), + release_manifest_key=optional_string(data, "release_manifest_key"), + release_manifest_revision=optional_string( + data, + "release_manifest_revision", + ), + metadata=mapping_value(data, "metadata"), + schema_version=schema_version(data), + ) + + +@pipeline_node( + id="published_artifact_index_builder", + label="Published Artifact Index Builder", + node_type="library", + description="Build the Stage 5 published artifact JSONL index.", + status="transitional", + stability="moving", + pathways=["5_validate_and_promote_release"], + artifacts_in=["release candidate bundle", "typed promotion result"], + artifacts_out=["published_artifact_index.jsonl"], + validation_commands=[ + "uv run pytest tests/unit/release_promotion/test_published_index.py" + ], +) +def build_published_artifact_index( + *, + candidate_bundle: ReleaseCandidateInputBundle, + promotion_result: FullPromotionResult, + release_manifest: Mapping[str, Any] | None = None, + diagnostic_artifacts: Sequence[ArtifactRef] = (), +) -> tuple[PublishedArtifactIndexRow, ...]: + """Build deterministic published artifact rows for a promoted release.""" + + _validate_result_matches_candidate(promotion_result, candidate_bundle) + context = candidate_bundle.context + manifest_artifacts = _release_manifest_artifacts(release_manifest) + rows = [ + *( + _candidate_artifact_row( + context=context, + artifact=artifact, + manifest_artifacts=manifest_artifacts, + ) + for artifact in candidate_bundle.artifacts + ), + *_release_metadata_rows(context, promotion_result), + *( + _diagnostic_artifact_row(context=context, artifact=artifact) + for artifact in sorted( + diagnostic_artifacts, + key=lambda item: item.logical_name, + ) + ), + ] + return tuple(sorted(rows, key=lambda row: (row.artifact_role, row.relative_path))) + + +def published_artifact_index_to_jsonl( + rows: Sequence[PublishedArtifactIndexRow], +) -> str: + """Serialize published artifact rows to deterministic JSONL.""" + + return "".join( + canonical_json_dumps( + row.to_dict(), + compact=True, + trailing_newline=False, + ) + + "\n" + for row in rows + ) + + +def published_artifact_index_from_jsonl( + payload: str, +) -> tuple[PublishedArtifactIndexRow, ...]: + """Read published artifact rows from JSONL text.""" + + import json + + return tuple( + PublishedArtifactIndexRow.from_dict(json.loads(line)) + for line in payload.splitlines() + if line.strip() + ) + + +def write_published_artifact_index( + rows: Sequence[PublishedArtifactIndexRow], + path: str | Path, +) -> tuple[PublishedArtifactIndexRow, ...]: + """Write published artifact rows to an explicit JSONL path.""" + + frozen_rows = tuple(rows) + index_path = Path(path) + index_path.parent.mkdir(parents=True, exist_ok=True) + index_path.write_text( + published_artifact_index_to_jsonl(frozen_rows), + encoding="utf-8", + ) + return frozen_rows + + +def read_published_artifact_index( + path: str | Path, +) -> tuple[PublishedArtifactIndexRow, ...]: + """Read published artifact rows from a JSONL path.""" + + return published_artifact_index_from_jsonl(Path(path).read_text(encoding="utf-8")) + + +def _validate_result_matches_candidate( + result: FullPromotionResult, + candidate_bundle: ReleaseCandidateInputBundle, +) -> None: + context = candidate_bundle.context + if result.run_id != context.run_id: + raise ValueError("promotion_result.run_id must match context.run_id") + if result.candidate_version != context.candidate_version: + raise ValueError( + "promotion_result.candidate_version must match context.candidate_version" + ) + if result.release_version != context.release_version: + raise ValueError( + "promotion_result.release_version must match context.release_version" + ) + if result.artifact_count != len(candidate_bundle.artifacts): + raise ValueError( + "promotion_result.artifact_count must match candidate artifacts" + ) + + +def _release_manifest_artifacts( + release_manifest: Mapping[str, Any] | None, +) -> dict[str, Mapping[str, Any]]: + if not release_manifest: + return {} + artifacts = release_manifest.get("artifacts", {}) + if not isinstance(artifacts, Mapping): + return {} + return { + str(key): artifact + for key, artifact in artifacts.items() + if isinstance(artifact, Mapping) + } + + +def _candidate_artifact_row( + *, + context: ReleasePromotionContext, + artifact, + manifest_artifacts: Mapping[str, Mapping[str, Any]], +) -> PublishedArtifactIndexRow: + manifest_key = _release_manifest_key(artifact.relative_path) + manifest_entry = manifest_artifacts.get(manifest_key, {}) + relative_path = str(manifest_entry.get("path") or artifact.relative_path) + manifest_size_bytes = _optional_manifest_int(manifest_entry, "size_bytes") + return PublishedArtifactIndexRow( + run_id=context.run_id, + candidate_version=context.candidate_version, + release_version=context.release_version, + logical_name=artifact.logical_name, + relative_path=relative_path, + artifact_role="release_artifact", + artifact_family=artifact.artifact_family, + source_stage_id=artifact.source_stage_id, + hf_uri=f"hf://{context.hf_repo_name}/{relative_path}", + gcs_uri=f"gs://{context.gcs_bucket_name}/{relative_path}", + sha256=_optional_manifest_string(manifest_entry, "sha256") or artifact.sha256, + size_bytes=manifest_size_bytes + if manifest_size_bytes is not None + else artifact.size_bytes, + area_type=artifact.area_type, + area_id=artifact.area_id, + release_manifest_key=manifest_key, + release_manifest_revision=_optional_manifest_string( + manifest_entry, + "revision", + ), + metadata={ + "release_manifest_kind": _optional_manifest_string( + manifest_entry, + "kind", + ), + "candidate_metadata": jsonable_value(artifact.metadata), + }, + ) + + +def _release_metadata_rows( + context: ReleasePromotionContext, + result: FullPromotionResult, +) -> tuple[PublishedArtifactIndexRow, ...]: + artifacts = ( + ( + "release_manifest", + result.release_manifest.root_path, + "release_manifest", + "application/json", + result.release_manifest.manifest_sha256, + ), + ( + "versioned_release_manifest", + result.release_manifest.versioned_path, + "release_manifest", + "application/json", + result.release_manifest.manifest_sha256, + ), + ( + "trace_tro", + result.release_manifest.trace_tro_path, + "trace_tro", + "application/ld+json", + None, + ), + ( + "versioned_trace_tro", + result.release_manifest.versioned_trace_tro_path, + "trace_tro", + "application/ld+json", + None, + ), + ( + "version_manifest", + result.version_manifest.path, + "version_manifest", + "application/json", + None, + ), + ( + "release_completion_marker", + result.completion_marker.marker_path, + "release_completion_marker", + "application/json", + None, + ), + ) + return tuple( + PublishedArtifactIndexRow( + run_id=context.run_id, + candidate_version=context.candidate_version, + release_version=context.release_version, + logical_name=logical_name, + relative_path=relative_path, + artifact_role="release_metadata", + artifact_family=artifact_family, + source_stage_id=STAGE_5_VALIDATE_AND_PROMOTE_RELEASE, + hf_uri=f"hf://{context.hf_repo_name}/{relative_path}", + sha256=sha256, + metadata={"media_type": media_type}, + ) + for logical_name, relative_path, artifact_family, media_type, sha256 in artifacts + ) + + +def _diagnostic_artifact_row( + *, + context: ReleasePromotionContext, + artifact: ArtifactRef, +) -> PublishedArtifactIndexRow: + relative_path = _artifact_relative_path(artifact, context) + return PublishedArtifactIndexRow( + run_id=context.run_id, + candidate_version=context.candidate_version, + release_version=context.release_version, + logical_name=artifact.logical_name, + relative_path=relative_path, + artifact_role="diagnostic", + artifact_family=str(artifact.metadata.get("artifact_family") or "diagnostic"), + source_stage_id=str( + artifact.metadata.get("source_stage_id") + or STAGE_5_VALIDATE_AND_PROMOTE_RELEASE + ), + hf_uri=artifact.uri, + sha256=artifact.sha256, + size_bytes=artifact.size_bytes, + metadata={ + key: value + for key, value in jsonable_value(artifact.metadata).items() + if key not in {"artifact_family", "source_stage_id"} + }, + ) + + +def _artifact_relative_path( + artifact: ArtifactRef, + context: ReleasePromotionContext, +) -> str: + metadata_path = artifact.metadata.get("relative_path") + if isinstance(metadata_path, str) and metadata_path: + return metadata_path + hf_prefix = f"hf://{context.hf_repo_name}/" + if artifact.uri.startswith(hf_prefix): + return artifact.uri[len(hf_prefix) :] + return artifact.uri + + +def _release_manifest_key(relative_path: str) -> str: + return PurePosixPath(relative_path).with_suffix("").as_posix() + + +def _optional_manifest_string( + manifest_entry: Mapping[str, Any], + key: str, +) -> str | None: + value = manifest_entry.get(key) + return value if isinstance(value, str) and value else None + + +def _optional_manifest_int( + manifest_entry: Mapping[str, Any], + key: str, +) -> int | None: + value = manifest_entry.get(key) + if isinstance(value, bool): + return None + return value if isinstance(value, int) else None diff --git a/tests/unit/release_promotion/test_contract.py b/tests/unit/release_promotion/test_contract.py index 334f3ba3d..f9f7a28ac 100644 --- a/tests/unit/release_promotion/test_contract.py +++ b/tests/unit/release_promotion/test_contract.py @@ -9,6 +9,7 @@ ReleasePromotionContext, build_legacy_release_candidate_bundle, build_release_promotion_contract, + published_artifact_index_artifact_ref, release_promotion_contract_path, release_promotion_contract_repo_path, write_release_promotion_contract, @@ -140,6 +141,12 @@ def test_release_promotion_contract_records_candidate_and_public_refs() -> None: code_sha="abc123", package_version="1.73.0", validation=_validation_report(), + published_artifact_index=published_artifact_index_artifact_ref( + _context(), + row_count=9, + sha256="sha256:index", + size_bytes=123, + ), metadata={"writer": "test"}, ) @@ -161,6 +168,7 @@ def test_release_promotion_contract_records_candidate_and_public_refs() -> None: "versioned_trace_tro", "version_manifest", "release_completion_marker", + "published_artifact_index", } assert contract.execution.status == "completed" assert contract.execution.reuse_decision == "computed" @@ -169,12 +177,19 @@ def test_release_promotion_contract_records_candidate_and_public_refs() -> None: assert contract.parameters["source_output_contract_path"] == ( "calibration/runs/run-123/diagnostics/contracts/output_build_contract.json" ) + assert contract.parameters["published_artifact_index_path"] == ( + "calibration/runs/run-123/diagnostics/published_artifact_index.jsonl" + ) assert contract.metadata["contract_file"] == RELEASE_PROMOTION_CONTRACT_FILENAME assert contract.metadata["already_finalized"] is False assert contract.metadata["cleanup"]["cleaned_count"] == 3 + assert contract.metadata["published_artifact_index"]["metadata"]["row_count"] == 9 assert contract.metadata["public_refs"]["release_manifest"] == ( "hf://policyengine/policyengine-us-data/release_manifest.json" ) + assert contract.metadata["public_refs"]["published_artifact_index"].endswith( + "published_artifact_index.jsonl" + ) assert [substage.substage_id for substage in contract.substages] == [ "5a_validate_outputs", "5b_promote_huggingface", diff --git a/tests/unit/release_promotion/test_published_index.py b/tests/unit/release_promotion/test_published_index.py new file mode 100644 index 000000000..4bebc49de --- /dev/null +++ b/tests/unit/release_promotion/test_published_index.py @@ -0,0 +1,286 @@ +import json + +import pytest + +from policyengine_us_data.release_promotion import ( + FullPromotionResult, + ReleasePromotionContext, + build_legacy_release_candidate_bundle, + build_published_artifact_index, + published_artifact_index_artifact_ref, + published_artifact_index_from_jsonl, + published_artifact_index_path, + published_artifact_index_repo_path, + published_artifact_index_to_jsonl, + read_published_artifact_index, + write_published_artifact_index, +) +from policyengine_us_data.stage_contracts import ArtifactRef + + +def _context() -> ReleasePromotionContext: + return ReleasePromotionContext( + run_id="run-123", + candidate_version="1.73.0rc1", + release_version="1.73.0", + hf_repo_name="policyengine/policyengine-us-data", + gcs_bucket_name="policyengine-us-data", + base_release_version="1.72.0", + release_bump="minor", + ) + + +def _rel_paths() -> list[str]: + return [ + "policy_data.db", + "states/AL.h5", + "districts/NC-01.h5", + "cities/NYC.h5", + "national/US.h5", + ] + + +def _candidate_bundle(): + return build_legacy_release_candidate_bundle( + context=_context(), + rel_paths=_rel_paths(), + artifact_metadata_by_path={ + path: {"sha256": f"sha256:candidate-{index}", "size_bytes": index} + for index, path in enumerate(_rel_paths(), start=1) + }, + ) + + +def _legacy_promotion_payload(**overrides): + rel_paths = tuple(_rel_paths()) + payload = { + "run_id": "run-123", + "candidate_version": "1.73.0rc1", + "release_version": "1.73.0", + "rel_paths": rel_paths, + "artifact_count": len(rel_paths), + "hf_repo_name": "policyengine/policyengine-us-data", + "hf_repo_type": "model", + "hf_staging_prefix": "staging/1.73.0rc1-run-123", + "hf_promoted": len(rel_paths), + "hf_promoted_paths": rel_paths, + "hf_commit_id": None, + "hf_noop_paths": (), + "gcs_bucket_name": "policyengine-us-data", + "gcs_uploaded": len(rel_paths), + "gcs_object_paths": rel_paths, + "gcs_skipped_paths": (), + "gcs_failures": (), + "release_manifest_path": "release_manifest.json", + "versioned_release_manifest_path": "releases/1.73.0/release_manifest.json", + "trace_tro_path": "trace.tro.jsonld", + "versioned_trace_tro_path": "releases/1.73.0/trace.tro.jsonld", + "release_manifest_sha256": None, + "release_manifest_artifacts": len(rel_paths), + "version_manifest_path": "version_manifest.json", + "version_manifest_version": "1.73.0", + "version_manifest_current_version": "1.73.0", + "version_manifest_updated": True, + "release_completion_marker": "releases/1.73.0/release-complete.json", + "release_completion_tag": "1.73.0", + "release_completion_valid": True, + "staging_cleaned": len(rel_paths) + 1, + "staging_cleanup_attempted": True, + "staging_cleanup_status": "completed", + } + payload.update(overrides) + return payload + + +def _promotion_result() -> FullPromotionResult: + return FullPromotionResult.from_legacy_dict(_legacy_promotion_payload()) + + +def _release_manifest() -> dict: + return { + "artifacts": { + path.removesuffix(".h5").removesuffix(".db"): { + "kind": "database" if path.endswith(".db") else "microdata", + "path": path, + "repo_id": "policyengine/policyengine-us-data", + "revision": "1.73.0", + "sha256": f"manifest-{index}", + "size_bytes": 100 + index, + } + for index, path in enumerate(_rel_paths(), start=1) + } + } + + +def _diagnostic_artifact() -> ArtifactRef: + return ArtifactRef( + logical_name="release_promotion_contract", + uri=( + "hf://policyengine/policyengine-us-data/calibration/runs/" + "run-123/diagnostics/contracts/release_promotion_contract.json" + ), + media_type="application/json", + metadata={ + "artifact_family": "stage_contract", + "source_stage_id": "5_validate_and_promote_release", + "relative_path": ( + "calibration/runs/run-123/diagnostics/contracts/" + "release_promotion_contract.json" + ), + }, + ) + + +def _rows(): + return build_published_artifact_index( + candidate_bundle=_candidate_bundle(), + promotion_result=_promotion_result(), + release_manifest=_release_manifest(), + diagnostic_artifacts=(_diagnostic_artifact(),), + ) + + +def _by_path(rows): + return {row.relative_path: row for row in rows} + + +def test_published_artifact_index_records_release_artifact_destinations() -> None: + rows = _by_path(_rows()) + + state = rows["states/AL.h5"] + district = rows["districts/NC-01.h5"] + city = rows["cities/NYC.h5"] + national = rows["national/US.h5"] + base = rows["policy_data.db"] + + assert state.artifact_family == "state_h5" + assert state.area_type == "state" + assert state.area_id == "AL" + assert state.hf_uri == "hf://policyengine/policyengine-us-data/states/AL.h5" + assert state.gcs_uri == "gs://policyengine-us-data/states/AL.h5" + assert state.release_manifest_key == "states/AL" + assert state.release_manifest_revision == "1.73.0" + assert state.sha256 == "manifest-2" + assert district.artifact_family == "district_h5" + assert district.area_id == "NC-01" + assert city.artifact_family == "city_h5" + assert city.area_id == "NYC" + assert national.artifact_family == "national_h5" + assert national.area_type == "national" + assert base.artifact_family == "base_dataset" + assert base.source_stage_id == "1_build_datasets" + assert base.gcs_uri == "gs://policyengine-us-data/policy_data.db" + + +def test_published_artifact_index_records_manifest_and_diagnostic_rows() -> None: + rows = _by_path(_rows()) + + release_manifest = rows["release_manifest.json"] + version_manifest = rows["version_manifest.json"] + completion_marker = rows["releases/1.73.0/release-complete.json"] + diagnostic = rows[ + "calibration/runs/run-123/diagnostics/contracts/release_promotion_contract.json" + ] + + assert release_manifest.artifact_role == "release_metadata" + assert release_manifest.artifact_family == "release_manifest" + assert release_manifest.gcs_uri is None + assert version_manifest.artifact_family == "version_manifest" + assert completion_marker.artifact_family == "release_completion_marker" + assert diagnostic.artifact_role == "diagnostic" + assert diagnostic.artifact_family == "stage_contract" + assert diagnostic.hf_uri.endswith("release_promotion_contract.json") + + +def test_published_artifact_index_uses_typed_promotion_metadata_paths() -> None: + result = FullPromotionResult.from_legacy_dict( + _legacy_promotion_payload( + release_manifest_path="manifests/root-release.json", + versioned_release_manifest_path="release-ledger/1.73.0/root-release.json", + trace_tro_path="manifests/root-trace.jsonld", + versioned_trace_tro_path="release-ledger/1.73.0/root-trace.jsonld", + release_manifest_sha256="sha256:manifest", + version_manifest_path="registries/version-index.json", + release_completion_marker="release-ledger/1.73.0/complete.json", + ) + ) + + rows = build_published_artifact_index( + candidate_bundle=_candidate_bundle(), + promotion_result=result, + ) + metadata_paths = { + row.logical_name: row.relative_path + for row in rows + if row.artifact_role == "release_metadata" + } + + assert metadata_paths == { + "release_manifest": "manifests/root-release.json", + "versioned_release_manifest": "release-ledger/1.73.0/root-release.json", + "trace_tro": "manifests/root-trace.jsonld", + "versioned_trace_tro": "release-ledger/1.73.0/root-trace.jsonld", + "version_manifest": "registries/version-index.json", + "release_completion_marker": "release-ledger/1.73.0/complete.json", + } + release_manifest = next( + row for row in rows if row.logical_name == "release_manifest" + ) + assert release_manifest.sha256 == "sha256:manifest" + assert release_manifest.hf_uri == ( + "hf://policyengine/policyengine-us-data/manifests/root-release.json" + ) + + +def test_published_artifact_index_jsonl_round_trips_deterministically() -> None: + rows = _rows() + + payload = published_artifact_index_to_jsonl(rows) + restored = published_artifact_index_from_jsonl(payload) + + assert payload.endswith("\n") + assert len(payload.splitlines()) == len(rows) + assert restored == tuple(rows) + assert published_artifact_index_to_jsonl(restored) == payload + + +def test_write_published_artifact_index_writes_explicit_path(tmp_path) -> None: + path = published_artifact_index_path(tmp_path / "run-123") + rows = _rows() + + written = write_published_artifact_index(rows, path) + + assert written == tuple(rows) + assert read_published_artifact_index(path) == tuple(rows) + assert json.loads(path.read_text(encoding="utf-8").splitlines()[0]) + assert published_artifact_index_repo_path("run-123") == ( + "calibration/runs/run-123/diagnostics/published_artifact_index.jsonl" + ) + + +def test_published_artifact_index_artifact_ref_records_row_count() -> None: + artifact = published_artifact_index_artifact_ref( + _context(), + row_count=12, + sha256="sha256:index", + size_bytes=123, + ) + + assert artifact.logical_name == "published_artifact_index" + assert artifact.media_type == "application/jsonl" + assert artifact.metadata["row_count"] == 12 + assert artifact.metadata["relative_path"] == ( + "calibration/runs/run-123/diagnostics/published_artifact_index.jsonl" + ) + + +def test_published_artifact_index_rejects_mismatched_result_identity() -> None: + result = FullPromotionResult.from_legacy_dict( + _legacy_promotion_payload(run_id="other-run") + ) + + with pytest.raises(ValueError, match="run_id"): + build_published_artifact_index( + candidate_bundle=_candidate_bundle(), + promotion_result=result, + ) diff --git a/tests/unit/test_pipeline_source_contracts.py b/tests/unit/test_pipeline_source_contracts.py index 350a5043c..59de3a60d 100644 --- a/tests/unit/test_pipeline_source_contracts.py +++ b/tests/unit/test_pipeline_source_contracts.py @@ -31,7 +31,7 @@ def test_promote_run_uses_single_full_release_promotion() -> None: assert "promotion_context.to_dict()" in source assert "_promotion_result_from_stdout(promotion_stdout)" in source assert "_write_release_promotion_contract_for_run(" in source - assert "release_promotion_contract_ref" in source + assert "release_promotion_refs" in source assert "promote_publish.remote(" not in source assert "promote_national_publish.remote(" not in source assert "upload_datasets(" not in source @@ -150,8 +150,11 @@ def test_promote_run_writes_release_promotion_contract_output() -> None: assert "release_promotion_contract_path(run_dir)" in helper_source assert "build_legacy_release_candidate_bundle(" in helper_source + assert "build_published_artifact_index(" in helper_source + assert "write_published_artifact_index(" in helper_source assert "write_release_promotion_contract(" in helper_source assert 'role="contract"' in helper_source + assert 'role="index"' in helper_source assert 'media_type="application/json"' in helper_source assert "validation_report_paths=_run_validation_report_repo_paths_if_available" in ( helper_source @@ -160,10 +163,12 @@ def test_promote_run_writes_release_promotion_contract_output() -> None: "diagnostics_manifest_path=_run_diagnostics_manifest_repo_path_if_available" in helper_source ) + assert 'media_type="application/jsonl"' in helper_source assert ( "source_output_contract_path=_stage4_output_contract_repo_path_if_available" in (helper_source) ) + assert "published_artifact_index=published_index_artifact" in helper_source assert 'diagnostics" / "contracts" / "output_build_contract.json"' in stage4_source assert "calibration/runs/{run_id}/" in stage4_source