From 7c1c3e540dab8ec2400e761af66a2feff53aabb5 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 20 May 2026 15:46:43 +0200 Subject: [PATCH] Add Stage 2 input artifact bundles --- changelog.d/1065.changed | 1 + docs/engineering/pipeline-map.md | 15 +- docs/generated/pipeline_api.json | 74 ++- docs/generated/pipeline_map.json | 169 +++++-- docs/pipeline_map.yaml | 31 ++ modal_app/pipeline.py | 26 +- modal_app/remote_calibration_runner.py | 20 +- .../calibration_package/__init__.py | 26 + .../calibration_package/specs.py | 443 +++++++++++++++++- tests/unit/calibration_package/test_specs.py | 207 ++++++++ tests/unit/test_pipeline_docs_extractor.py | 2 + tests/unit/test_pipeline_source_contracts.py | 5 +- 12 files changed, 940 insertions(+), 79 deletions(-) create mode 100644 changelog.d/1065.changed diff --git a/changelog.d/1065.changed b/changelog.d/1065.changed new file mode 100644 index 000000000..a8e81f607 --- /dev/null +++ b/changelog.d/1065.changed @@ -0,0 +1 @@ +Stage 2 calibration package construction now resolves its inputs and outputs through run-scoped artifact bundles. diff --git a/docs/engineering/pipeline-map.md b/docs/engineering/pipeline-map.md index b70d3156e..b1f630718 100644 --- a/docs/engineering/pipeline-map.md +++ b/docs/engineering/pipeline-map.md @@ -367,6 +367,7 @@ Build sparse calibration matrix (targets x households x clones) | Node | Type | Status | Stability | API refs | | --- | --- | --- | --- | --- | +| `in_stage1_contract_s2` dataset_build_output.json | `artifact` | `unknown` | `unknown` | | | `in_cps_s5` source_imputed_stratified_extended_cps.h5 | `artifact` | `unknown` | `unknown` | | | `in_db_s5` policy_data.db | `external` | `unknown` | `unknown` | | | `in_config_s5` target_config.yaml | `artifact` | `unknown` | `unknown` | | @@ -383,6 +384,10 @@ Build sparse calibration matrix (targets x households x clones) | `util_pool` ProcessPoolExecutor | `utility` | `unknown` | `unknown` | | | `util_takeup_s5` compute_block_takeup_for_entities() | `utility` | `unknown` | `unknown` | | | `util_scipy` scipy.sparse | `utility` | `unknown` | `unknown` | | +| `stage2_input_bundle` Stage 2 Input Bundle | `library` | `current` | `moving` | `policyengine_us_data.calibration_package.specs.stage2_input_bundle_from_artifacts_dir` | +| `stage2_build_context` Stage 2 Build Context | `library` | `current` | `moving` | `policyengine_us_data.calibration_package.specs.stage2_build_context_for_run` | +| `stage2_artifact_specs` Stage 2 Artifact Specs | `library` | `current` | `moving` | `policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths` | +| `stage2_calibration_package_writer` Stage 2 Package Writer | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_calibration.save_calibration_package` | | `stage2_target_config_identity` Stage 2 Target Config Identity | `library` | `current` | `moving` | `policyengine_us_data.calibration_package.specs.resolve_target_config_identity` | | `stage2_target_config_load` Load Stage 2 Target Config | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_calibration.load_target_config` | | `stage2_target_config_apply` Apply Stage 2 Target Config | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_calibration.apply_target_config_to_targets` | @@ -390,14 +395,18 @@ Build sparse calibration matrix (targets x households x clones) | `clone_assembly` Clone Value Assembly | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_matrix_builder._assemble_clone_values_standalone` | | `build_matrix` Build Calibration Matrix | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix` | | `build_matrix_chunked` Build Calibration Matrix In Chunks | `library` | `current` | `experimental` | `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix_chunked` | -| `stage2_calibration_package_writer` Stage 2 Package Writer | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_calibration.save_calibration_package` | -| `stage2_artifact_specs` Stage 2 Artifact Specs | `library` | `current` | `moving` | `policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths` | | `stage2_calibration_package_contract_writer` Stage 2 Contract Writer | `library` | `current` | `moving` | `policyengine_us_data.stage_contracts.calibration_package.write_calibration_package_contract` | | `stage2_calibration_package_contract_validator` Stage 2 Contract Validator | `validation` | `current` | `moving` | `policyengine_us_data.stage_contracts.calibration_package.validate_calibration_package_contract` | #### Edges -- `in_cps_s5` -> `target_resolve` `data_flow` +- `in_stage1_contract_s2` -> `stage2_input_bundle` `data_flow` (preferred input contract) +- `in_cps_s5` -> `stage2_input_bundle` `data_flow` (compatibility fallback) +- `in_db_s5` -> `stage2_input_bundle` `external_source` (compatibility fallback) +- `stage2_input_bundle` -> `stage2_build_context` `data_flow` (validated inputs) +- `stage2_artifact_specs` -> `stage2_build_context` `uses_utility` (output bundle paths) +- `stage2_build_context` -> `target_resolve` `data_flow` (dataset and database paths) +- `stage2_build_context` -> `stage2_calibration_package_writer` `uses_utility` (package output bundle) - `in_db_s5` -> `target_resolve` `external_source` (SQL targets) - `in_config_s5` -> `stage2_target_config_identity` `data_flow` (config file) - `stage2_target_config_identity` -> `stage2_target_config_load` `data_flow` (resolved path and checksum) diff --git a/docs/generated/pipeline_api.json b/docs/generated/pipeline_api.json index 66c912d4c..e3fcb1abc 100644 --- a/docs/generated/pipeline_api.json +++ b/docs/generated/pipeline_api.json @@ -3086,7 +3086,7 @@ "docstring": "Promote a completed pipeline run to production.\n\n1. Verify run status is \"completed\"\n2. Promote every staged artifact in one Hugging Face commit\n3. Upload/copy every artifact to GCS\n4. Finalize release_manifest.json, tag the release, and update\n version_manifest.json\n5. Update run status to \"promoted\"\n\nArgs:\n run_id: The run ID to promote.\n candidate_version: Candidate staging scope used for staged source files.\n release_version: Stable version used for final release metadata.\n\nReturns:\n Summary message.", "id": "promote_pipeline_run", "kind": "function", - "line": 2079, + "line": 2091, "metadata": { "api_refs": [ "modal_app.pipeline.promote_run" @@ -3541,7 +3541,7 @@ "docstring": "Run the full pipeline end-to-end.\n\nArgs:\n branch: Git branch to build from.\n gpu: GPU type for regional calibration.\n epochs: Training epochs for regional calibration.\n national_gpu: GPU type for national calibration.\n national_epochs: Training epochs for national.\n num_workers: Number of parallel H5 workers.\n n_clones: Number of clones for H5 building.\n skip_national: Skip national calibration/H5.\n resume_run_id: Resume a previously failed run.\n clear_checkpoints: Wipe ALL checkpoints before building\n (default False). Normally not needed \u2014 checkpoints are\n scoped by commit SHA, so stale ones from other commits\n are cleaned automatically. Use True only to force a\n full rebuild of the current commit.\n candidate_version: Candidate staging scope used for HF staging.\n release_version: Final stable release version. Usually empty until\n promotion.\n base_release_version: Stable release current when this candidate was\n built.\n release_bump: Intended SemVer bump for this candidate.\n sha_override: Exact source SHA deployed by GitHub Actions. When\n provided, this is recorded instead of reading the current\n branch tip.\n run_id: Cross-system run ID created by GitHub.\n run_context: Serialized run context from the launcher workflow.\n modal_app_name: Deployed Modal app name for this run.\n modal_environment: Modal environment used for this run.\n chunked_matrix: Build the calibration matrix in clone-household\n chunks instead of the non-chunked path. Opt-in; default off.\n chunk_size: Clone-household columns per chunk when\n ``chunked_matrix`` is True.\n parallel_matrix: Fan chunked matrix building across Modal\n workers via ``build_matrix_chunk_worker``. Only meaningful\n when ``chunked_matrix`` is True; ignored otherwise.\n num_matrix_workers: Number of Modal workers when\n ``parallel_matrix`` is True.\n\nReturns:\n The run ID for use with promote.", "id": "run_modal_pipeline", "kind": "function", - "line": 1112, + "line": 1113, "metadata": { "api_refs": [ "modal_app.pipeline.run_pipeline" @@ -3743,13 +3743,14 @@ "docstring": "Return canonical Stage 2 paths rooted in an artifacts directory.", "id": "stage2_artifact_specs", "kind": "function", - "line": 96, + "line": 488, "metadata": { "api_refs": [ "policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths" ], - "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", - "description": "Centralize calibration package, contract, metadata, and matrix-build artifact paths.", + "artifacts_in": "[SOURCE_DATASET_FILENAME, TARGET_DATABASE_FILENAME]", + "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_METADATA_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Centralize Stage 2 input, package, contract, metadata, report, and matrix-build artifact names.", "id": "stage2_artifact_specs", "label": "Stage 2 Artifact Specs", "node_type": "library", @@ -3764,7 +3765,36 @@ ] }, "object_path": "policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths", - "signature": "def calibration_package_artifact_paths(artifacts_dir: str | Path) -> CalibrationPackageArtifactPaths", + "signature": "def calibration_package_artifact_paths(artifacts_dir: str | Path) -> CalibrationPackageOutputBundle", + "source_file": "policyengine_us_data/calibration_package/specs.py" + }, + "stage2_build_context": { + "docstring": "Return Stage 2 run context, preferring the Stage 1 handoff contract.", + "id": "stage2_build_context", + "kind": "function", + "line": 431, + "metadata": { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.stage2_build_context_for_run" + ], + "artifacts_in": "[DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, SOURCE_DATASET_FILENAME, TARGET_DATABASE_FILENAME]", + "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Bind one run_id to canonical Stage 2 input and output bundles before remote package construction starts.", + "id": "stage2_build_context", + "label": "Stage 2 Build Context", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + "object_path": "policyengine_us_data.calibration_package.specs.stage2_build_context_for_run", + "signature": "def stage2_build_context_for_run(pipeline_mount: str | Path, run_id: str | None = '', *, stage1_contract_path: str | Path | None = None) -> Stage2BuildContext", "source_file": "policyengine_us_data/calibration_package/specs.py" }, "stage2_calibration_package_contract_validator": { @@ -3856,6 +3886,34 @@ "signature": "def save_calibration_package(path: str, X_sparse, targets_df: 'pd.DataFrame', target_names: list, metadata: dict, initial_weights: np.ndarray = None, cd_geoid: np.ndarray = None, block_geoid: np.ndarray = None) -> None", "source_file": "policyengine_us_data/calibration/unified_calibration.py" }, + "stage2_input_bundle": { + "docstring": "Return a compatibility Stage 2 input bundle from canonical filenames.", + "id": "stage2_input_bundle", + "kind": "function", + "line": 331, + "metadata": { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.stage2_input_bundle_from_artifacts_dir" + ], + "artifacts_in": "[DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, SOURCE_DATASET_FILENAME, TARGET_DATABASE_FILENAME]", + "description": "Resolve the source-imputed dataset and policy target database from a Stage 1 contract or compatibility filename fallback.", + "id": "stage2_input_bundle", + "label": "Stage 2 Input Bundle", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + "object_path": "policyengine_us_data.calibration_package.specs.stage2_input_bundle_from_artifacts_dir", + "signature": "def stage2_input_bundle_from_artifacts_dir(artifacts_dir: str | Path) -> Stage2InputBundle", + "source_file": "policyengine_us_data/calibration_package/specs.py" + }, "stage2_target_config_apply": { "docstring": "Filter target rows before matrix construction.", "id": "stage2_target_config_apply", @@ -3887,7 +3945,7 @@ "docstring": "Resolve the target config identity used by Stage 2 package construction.", "id": "stage2_target_config_identity", "kind": "function", - "line": 127, + "line": 520, "metadata": { "api_refs": [ "policyengine_us_data.calibration_package.specs.resolve_target_config_identity" @@ -4421,7 +4479,7 @@ "docstring": "Verify deployed-image imports and subprocess seams.", "id": "verify_runtime_seams", "kind": "function", - "line": 738, + "line": 739, "metadata": { "api_refs": [ "modal_app.pipeline.verify_runtime_seams" diff --git a/docs/generated/pipeline_map.json b/docs/generated/pipeline_map.json index 265bb9edc..ea6f4fb0e 100644 --- a/docs/generated/pipeline_map.json +++ b/docs/generated/pipeline_map.json @@ -1998,8 +1998,8 @@ "metadata": { "api_node_count": 96, "canonical_stage_count": 5, - "decorated_object_count": 154, - "mapped_decorated_node_count": 58, + "decorated_object_count": 156, + "mapped_decorated_node_count": 60, "stage_count": 17, "substage_count": 17 }, @@ -3837,9 +3837,46 @@ "edges": [ { "edge_type": "data_flow", + "label": "preferred input contract", + "source": "in_stage1_contract_s2", + "target": "stage2_input_bundle" + }, + { + "edge_type": "data_flow", + "label": "compatibility fallback", "source": "in_cps_s5", + "target": "stage2_input_bundle" + }, + { + "edge_type": "external_source", + "label": "compatibility fallback", + "source": "in_db_s5", + "target": "stage2_input_bundle" + }, + { + "edge_type": "data_flow", + "label": "validated inputs", + "source": "stage2_input_bundle", + "target": "stage2_build_context" + }, + { + "edge_type": "uses_utility", + "label": "output bundle paths", + "source": "stage2_artifact_specs", + "target": "stage2_build_context" + }, + { + "edge_type": "data_flow", + "label": "dataset and database paths", + "source": "stage2_build_context", "target": "target_resolve" }, + { + "edge_type": "uses_utility", + "label": "package output bundle", + "source": "stage2_build_context", + "target": "stage2_calibration_package_writer" + }, { "edge_type": "external_source", "label": "SQL targets", @@ -4014,6 +4051,8 @@ "id": "run_calibration_build", "label": "run_calibration()", "node_ids": [ + "stage2_input_bundle", + "stage2_build_context", "stage2_target_config_identity", "stage2_target_config_load", "target_resolve", @@ -4043,6 +4082,12 @@ "02_build_package" ], "nodes": [ + { + "description": "Stage 1 handoff contract preferred for Stage 2 input resolution", + "id": "in_stage1_contract_s2", + "label": "dataset_build_output.json", + "node_type": "artifact" + }, { "description": "From Stage 4 (~12K HH)", "id": "in_cps_s5", @@ -4139,6 +4184,86 @@ "label": "scipy.sparse", "node_type": "utility" }, + { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.stage2_input_bundle_from_artifacts_dir" + ], + "artifacts_in": "[DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, SOURCE_DATASET_FILENAME, TARGET_DATABASE_FILENAME]", + "description": "Resolve the source-imputed dataset and policy target database from a Stage 1 contract or compatibility filename fallback.", + "id": "stage2_input_bundle", + "label": "Stage 2 Input Bundle", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.stage2_build_context_for_run" + ], + "artifacts_in": "[DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, SOURCE_DATASET_FILENAME, TARGET_DATABASE_FILENAME]", + "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Bind one run_id to canonical Stage 2 input and output bundles before remote package construction starts.", + "id": "stage2_build_context", + "label": "Stage 2 Build Context", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths" + ], + "artifacts_in": "[SOURCE_DATASET_FILENAME, TARGET_DATABASE_FILENAME]", + "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_METADATA_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Centralize Stage 2 input, package, contract, metadata, report, and matrix-build artifact names.", + "id": "stage2_artifact_specs", + "label": "Stage 2 Artifact Specs", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration.unified_calibration.save_calibration_package" + ], + "artifacts_out": [ + "calibration_package.pkl" + ], + "description": "Persist the Stage 2 sparse matrix, target rows, target names, geography arrays, and provenance metadata.", + "id": "stage2_calibration_package_writer", + "label": "Stage 2 Package Writer", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_calibration.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_unified_calibration.py" + ] + }, { "api_refs": [ "policyengine_us_data.calibration_package.specs.resolve_target_config_identity" @@ -4280,46 +4405,6 @@ "uv run pytest tests/integration/test_chunked_matrix_builder.py" ] }, - { - "api_refs": [ - "policyengine_us_data.calibration.unified_calibration.save_calibration_package" - ], - "artifacts_out": [ - "calibration_package.pkl" - ], - "description": "Persist the Stage 2 sparse matrix, target rows, target names, geography arrays, and provenance metadata.", - "id": "stage2_calibration_package_writer", - "label": "Stage 2 Package Writer", - "node_type": "library", - "pathways": [ - "calibration_package" - ], - "source_file": "policyengine_us_data/calibration/unified_calibration.py", - "stability": "moving", - "status": "current", - "validation_commands": [ - "uv run pytest tests/unit/calibration/test_unified_calibration.py" - ] - }, - { - "api_refs": [ - "policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths" - ], - "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", - "description": "Centralize calibration package, contract, metadata, and matrix-build artifact paths.", - "id": "stage2_artifact_specs", - "label": "Stage 2 Artifact Specs", - "node_type": "library", - "pathways": [ - "calibration_package" - ], - "source_file": "policyengine_us_data/calibration_package/specs.py", - "stability": "moving", - "status": "current", - "validation_commands": [ - "uv run pytest tests/unit/calibration_package/test_specs.py" - ] - }, { "api_refs": [ "policyengine_us_data.stage_contracts.calibration_package.write_calibration_package_contract" diff --git a/docs/pipeline_map.yaml b/docs/pipeline_map.yaml index 91c8fa1c2..02d5581c5 100644 --- a/docs/pipeline_map.yaml +++ b/docs/pipeline_map.yaml @@ -805,6 +805,8 @@ stages: label: run_calibration() description: 'Build phase: resolve targets and constraints, assemble clone values, and package the sparse calibration matrix' node_ids: + - stage2_input_bundle + - stage2_build_context - stage2_target_config_identity - stage2_target_config_load - target_resolve @@ -825,6 +827,10 @@ stages: - out_contract - stage2_calibration_package_contract_validator extra_nodes: + - id: in_stage1_contract_s2 + label: dataset_build_output.json + node_type: artifact + description: Stage 1 handoff contract preferred for Stage 2 input resolution - id: in_cps_s5 label: source_imputed_stratified_extended_cps.h5 node_type: artifact @@ -890,9 +896,34 @@ stages: node_type: utility description: CSR/COO matrix construction edges: + - source: in_stage1_contract_s2 + target: stage2_input_bundle + edge_type: data_flow + label: preferred input contract - source: in_cps_s5 + target: stage2_input_bundle + edge_type: data_flow + label: compatibility fallback + - source: in_db_s5 + target: stage2_input_bundle + edge_type: external_source + label: compatibility fallback + - source: stage2_input_bundle + target: stage2_build_context + edge_type: data_flow + label: validated inputs + - source: stage2_artifact_specs + target: stage2_build_context + edge_type: uses_utility + label: output bundle paths + - source: stage2_build_context target: target_resolve edge_type: data_flow + label: dataset and database paths + - source: stage2_build_context + target: stage2_calibration_package_writer + edge_type: uses_utility + label: package output bundle - source: in_db_s5 target: target_resolve edge_type: external_source diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index be0a3fcf7..266892ac4 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -95,8 +95,9 @@ ) from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402 from policyengine_us_data.calibration_package.specs import ( # noqa: E402 - calibration_package_artifact_paths, + Stage2InputBundleError, resolve_target_config_identity, + stage2_build_context_for_run, ) from policyengine_us_data.utils.error_redaction import ( # noqa: E402 redacted_bounded_error_text, @@ -1438,14 +1439,13 @@ def run_pipeline( print(f" Completed in {completed_build_manifest.duration_s}s") # ── Step 2: Build calibration package ── + package_context = stage2_build_context_for_run(PIPELINE_MOUNT, run_id) + package_input_validation = package_context.input_bundle.validation_report() package_inputs = _artifact_identities( - { - "dataset": _artifacts_dir(run_id) - / "source_imputed_stratified_extended_cps.h5", - "database": _artifacts_dir(run_id) / "policy_data.db", - } + package_context.input_bundle.manifest_inputs ) - package_artifacts = calibration_package_artifact_paths(_artifacts_dir(run_id)) + package_inputs["input_validation"] = package_input_validation.to_dict() + package_artifacts = package_context.output_bundle package_parameters = _calibration_package_parameters( workers=num_workers, n_clones=n_clones, @@ -1456,6 +1456,18 @@ def run_pipeline( parallel_matrix=parallel_matrix, num_matrix_workers=num_matrix_workers, ) + if package_input_validation.status != "pass": + active_step_manifest = _start_step_manifest( + meta, + BUILD_CALIBRATION_PACKAGE, + parameters=package_parameters, + input_identities=package_inputs, + vol=pipeline_volume, + ) + raise Stage2InputBundleError( + package_context.input_bundle, + package_input_validation, + ) package_reuse = _step_reusable( meta, BUILD_CALIBRATION_PACKAGE, diff --git a/modal_app/remote_calibration_runner.py b/modal_app/remote_calibration_runner.py index a4ca08049..969cf405d 100644 --- a/modal_app/remote_calibration_runner.py +++ b/modal_app/remote_calibration_runner.py @@ -14,6 +14,7 @@ from modal_app.images import gpu_image as image # noqa: E402 from policyengine_us_data.calibration_package.specs import ( # noqa: E402 calibration_package_artifact_paths, + stage2_build_context_for_run, ) from policyengine_us_data.fit_weights import ( # noqa: E402 FitResultBytes, @@ -413,18 +414,13 @@ def _build_package_impl( _ensure_geography_prerequisites() pipeline_vol.reload() - artifacts = f"{PIPELINE_MOUNT}/artifacts" - if run_id: - artifacts = f"{artifacts}/{run_id}" - db_path = f"{artifacts}/policy_data.db" - dataset_path = f"{artifacts}/source_imputed_stratified_extended_cps.h5" - for label, p in [("database", db_path), ("dataset", dataset_path)]: - if not os.path.exists(p): - raise RuntimeError( - f"Missing {label} on pipeline volume: {p}. Run data_build first." - ) - - package_artifacts = calibration_package_artifact_paths(artifacts) + build_context = stage2_build_context_for_run( + PIPELINE_MOUNT, run_id + ).require_inputs() + input_bundle = build_context.input_bundle + package_artifacts = build_context.output_bundle + db_path = str(input_bundle.target_database) + dataset_path = str(input_bundle.source_dataset) pkg_path = str(package_artifacts.package) cmd = [ *_python_cmd("-m", "policyengine_us_data.calibration.unified_calibration"), diff --git a/policyengine_us_data/calibration_package/__init__.py b/policyengine_us_data/calibration_package/__init__.py index 762b17dc9..26655041a 100644 --- a/policyengine_us_data/calibration_package/__init__.py +++ b/policyengine_us_data/calibration_package/__init__.py @@ -5,13 +5,26 @@ CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_METADATA_FILENAME, CALIBRATION_PACKAGE_SUBSTAGE_ID, + CALIBRATION_REPORTS_DIRNAME, + DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, DEFAULT_TARGET_CONFIG_PATH, MATRIX_BUILD_DIRNAME, + SOURCE_DATASET_FILENAME, TARGET_CONFIG_IDENTITY_MODES, + TARGET_DATABASE_FILENAME, CalibrationPackageArtifactPaths, + CalibrationPackageOutputBundle, + Stage2BuildContext, + Stage2InputBundle, + Stage2InputBundleError, + Stage2InputSource, TargetConfigIdentity, calibration_package_artifact_paths, resolve_target_config_identity, + stage2_build_context_for_run, + stage2_input_bundle_from_artifacts_dir, + stage2_input_bundle_from_stage1_contract, + stage2_input_bundle_from_stage1_contract_path, ) __all__ = [ @@ -19,11 +32,24 @@ "CALIBRATION_PACKAGE_FILENAME", "CALIBRATION_PACKAGE_METADATA_FILENAME", "CALIBRATION_PACKAGE_SUBSTAGE_ID", + "CALIBRATION_REPORTS_DIRNAME", + "DATASET_BUILD_OUTPUT_CONTRACT_FILENAME", "DEFAULT_TARGET_CONFIG_PATH", "MATRIX_BUILD_DIRNAME", + "SOURCE_DATASET_FILENAME", "TARGET_CONFIG_IDENTITY_MODES", + "TARGET_DATABASE_FILENAME", "CalibrationPackageArtifactPaths", + "CalibrationPackageOutputBundle", + "Stage2BuildContext", + "Stage2InputBundle", + "Stage2InputBundleError", + "Stage2InputSource", "TargetConfigIdentity", "calibration_package_artifact_paths", "resolve_target_config_identity", + "stage2_build_context_for_run", + "stage2_input_bundle_from_artifacts_dir", + "stage2_input_bundle_from_stage1_contract", + "stage2_input_bundle_from_stage1_contract_path", ] diff --git a/policyengine_us_data/calibration_package/specs.py b/policyengine_us_data/calibration_package/specs.py index 637cc64a6..4542b8443 100644 --- a/policyengine_us_data/calibration_package/specs.py +++ b/policyengine_us_data/calibration_package/specs.py @@ -4,23 +4,37 @@ from dataclasses import dataclass from pathlib import Path -from typing import Literal +from typing import TYPE_CHECKING, Any, Literal +from urllib.parse import unquote, urlparse from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.pipeline_schema import PipelineNode from policyengine_us_data.utils.manifest import compute_file_checksum +if TYPE_CHECKING: + from policyengine_us_data.stage_contracts import StageContract, ValidationReport + DEFAULT_TARGET_CONFIG_PATH = "policyengine_us_data/calibration/target_config.yaml" +SOURCE_DATASET_FILENAME = "source_imputed_stratified_extended_cps.h5" +TARGET_DATABASE_FILENAME = "policy_data.db" +DATASET_BUILD_OUTPUT_CONTRACT_FILENAME = "dataset_build_output.json" CALIBRATION_PACKAGE_FILENAME = "calibration_package.pkl" CALIBRATION_PACKAGE_METADATA_FILENAME = "calibration_package_meta.json" CALIBRATION_PACKAGE_CONTRACT_FILENAME = "calibration_package_contract.json" +CALIBRATION_REPORTS_DIRNAME = "calibration_reports" MATRIX_BUILD_DIRNAME = "matrix_build" CALIBRATION_PACKAGE_SUBSTAGE_ID = "2a_matrix_build_calibration_target_construction" TargetConfigMode = Literal["default", "explicit", "all_active_targets"] +Stage2InputSource = Literal["stage1_contract", "artifacts_dir_fallback"] TARGET_CONFIG_IDENTITY_MODES: frozenset[str] = frozenset( {"default", "explicit", "all_active_targets"} ) +_SOURCE_DATASET_LOGICAL_NAMES = ( + "source_imputed_stratified_extended_cps", + "source_imputed_stratified_extended_cps_2024", +) +_TARGET_DATABASE_LOGICAL_NAMES = ("policy_data_db",) @dataclass(frozen=True, kw_only=True) @@ -58,13 +72,214 @@ def to_parameters(self) -> dict[str, str | None]: @dataclass(frozen=True, kw_only=True) -class CalibrationPackageArtifactPaths: - """Canonical run-scoped Stage 2 artifact paths.""" +class Stage2InputBundle: + """Canonical Stage 2 input artifacts resolved for one run.""" + + artifacts_dir: Path + source_dataset: Path + target_database: Path + source: Stage2InputSource + stage1_contract_path: Path | None = None + stage1_contract_run_id: str | None = None + expected_source_dataset_sha256: str | None = None + expected_source_dataset_size_bytes: int | None = None + expected_target_database_sha256: str | None = None + expected_target_database_size_bytes: int | None = None + + @property + def manifest_inputs(self) -> dict[str, Path]: + """Return input paths recorded in Stage 2 step manifests.""" + + return { + "dataset": self.source_dataset, + "database": self.target_database, + } + + @property + def compatibility_only(self) -> bool: + """Return whether the bundle came from legacy filename discovery.""" + + return self.source == "artifacts_dir_fallback" + + @property + def expected_input_identities(self) -> dict[str, dict[str, str | int]]: + """Return contract-declared Stage 2 input checksums and sizes.""" + + identities: dict[str, dict[str, str | int]] = {} + for label, expected_sha256, expected_size_bytes in ( + ( + "dataset", + self.expected_source_dataset_sha256, + self.expected_source_dataset_size_bytes, + ), + ( + "database", + self.expected_target_database_sha256, + self.expected_target_database_size_bytes, + ), + ): + identity: dict[str, str | int] = {} + if expected_sha256 is not None: + identity["sha256"] = expected_sha256 + if expected_size_bytes is not None: + identity["size_bytes"] = expected_size_bytes + if identity: + identities[label] = identity + return identities + + def missing_required_artifacts(self) -> tuple[tuple[str, Path], ...]: + """Return missing required Stage 2 input labels and paths.""" + + missing: list[tuple[str, Path]] = [] + for label, path in self.manifest_inputs.items(): + if not path.exists(): + missing.append((label, path)) + return tuple(missing) + + def validation_report(self) -> "ValidationReport": + """Return a canonical validation report for Stage 2 input readiness.""" + + from policyengine_us_data.stage_contracts.validation import ( + ValidationFinding, + ValidationReport, + ) + + findings: list[ValidationFinding] = [] + missing = self.missing_required_artifacts() + if missing: + findings.extend( + ValidationFinding( + check_id=f"stage2_input_exists:{label}", + status="fail", + message=f"Missing Stage 2 {label} artifact: {path}", + metadata={ + "artifact_label": label, + "path": str(path), + "source": self.source, + }, + ) + for label, path in missing + ) + missing_labels = {label for label, _path in missing} + for label, path, expected_sha256, expected_size_bytes in ( + ( + "dataset", + self.source_dataset, + self.expected_source_dataset_sha256, + self.expected_source_dataset_size_bytes, + ), + ( + "database", + self.target_database, + self.expected_target_database_sha256, + self.expected_target_database_size_bytes, + ), + ): + if label in missing_labels: + continue + if expected_size_bytes is not None: + actual_size_bytes = path.stat().st_size + if actual_size_bytes != expected_size_bytes: + findings.append( + ValidationFinding( + check_id=f"stage2_input_identity:{label}:size_bytes", + status="fail", + message=( + f"Stage 2 {label} artifact size mismatch: " + f"expected {expected_size_bytes}, " + f"found {actual_size_bytes} at {path}" + ), + metric="size_bytes", + value=actual_size_bytes, + threshold=expected_size_bytes, + metadata={ + "artifact_label": label, + "path": str(path), + "source": self.source, + }, + ) + ) + if expected_sha256 is not None: + actual_sha256 = f"sha256:{compute_file_checksum(path)}" + if actual_sha256 != expected_sha256: + findings.append( + ValidationFinding( + check_id=f"stage2_input_identity:{label}:sha256", + status="fail", + message=( + f"Stage 2 {label} artifact checksum mismatch: " + f"expected {expected_sha256}, " + f"found {actual_sha256} at {path}" + ), + metric="sha256", + value=actual_sha256, + threshold=expected_sha256, + metadata={ + "artifact_label": label, + "path": str(path), + "source": self.source, + }, + ) + ) + if findings: + return ValidationReport( + status="fail", + findings=tuple(findings), + metadata=self._validation_metadata(), + ) + return ValidationReport( + status="pass", + findings=(), + metadata=self._validation_metadata(), + ) + + def require_existing(self) -> "Stage2InputBundle": + """Raise a structured error when required Stage 2 inputs are missing.""" + + report = self.validation_report() + if report.status != "pass": + raise Stage2InputBundleError(self, report) + return self + + def _validation_metadata(self) -> dict[str, Any]: + metadata: dict[str, Any] = { + "source": self.source, + "artifacts_dir": str(self.artifacts_dir), + "compatibility_only": self.compatibility_only, + } + expected_identities = self.expected_input_identities + if expected_identities: + metadata["expected_identities"] = expected_identities + if self.stage1_contract_path is not None: + metadata["stage1_contract_path"] = str(self.stage1_contract_path) + if self.stage1_contract_run_id is not None: + metadata["stage1_contract_run_id"] = self.stage1_contract_run_id + return metadata + + +class Stage2InputBundleError(RuntimeError): + """Input validation failure raised before Stage 2 package work starts.""" + + def __init__( + self, + bundle: Stage2InputBundle, + validation_report: "ValidationReport", + ) -> None: + details = "; ".join(finding.message for finding in validation_report.findings) + super().__init__(details or "Stage 2 input validation failed") + self.bundle = bundle + self.validation_report = validation_report + + +@dataclass(frozen=True, kw_only=True) +class CalibrationPackageOutputBundle: + """Canonical run-scoped Stage 2 output artifact paths.""" artifacts_dir: Path package: Path metadata: Path contract: Path + reports_dir: Path matrix_build_dir: Path @property @@ -74,18 +289,195 @@ def manifest_outputs(self) -> tuple[Path, Path]: return (self.package, self.contract) +CalibrationPackageArtifactPaths = CalibrationPackageOutputBundle + + +@dataclass(frozen=True, kw_only=True) +class Stage2BuildContext: + """Run-scoped Stage 2 input and output bundles.""" + + artifacts_dir: Path + input_bundle: Stage2InputBundle + output_bundle: CalibrationPackageOutputBundle + run_id: str | None = None + + def require_inputs(self) -> "Stage2BuildContext": + """Validate inputs and return this context when Stage 2 may start.""" + + self.input_bundle.require_existing() + return self + + +@pipeline_node( + PipelineNode( + id="stage2_input_bundle", + label="Stage 2 Input Bundle", + node_type="library", + description="Resolve the source-imputed dataset and policy target database from a Stage 1 contract or compatibility filename fallback.", + source_file="policyengine_us_data/calibration_package/specs.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_in=[ + DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, + SOURCE_DATASET_FILENAME, + TARGET_DATABASE_FILENAME, + ], + validation_commands=[ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ], + ) +) +def stage2_input_bundle_from_artifacts_dir( + artifacts_dir: str | Path, +) -> Stage2InputBundle: + """Return a compatibility Stage 2 input bundle from canonical filenames.""" + + root = Path(artifacts_dir) + return Stage2InputBundle( + artifacts_dir=root, + source_dataset=root / SOURCE_DATASET_FILENAME, + target_database=root / TARGET_DATABASE_FILENAME, + source="artifacts_dir_fallback", + ) + + +def stage2_input_bundle_from_stage1_contract( + contract: "StageContract", + *, + artifacts_dir: str | Path | None = None, + contract_path: str | Path | None = None, +) -> Stage2InputBundle: + """Return a Stage 2 input bundle from a Stage 1 handoff contract.""" + + if getattr(contract, "stage_id", None) != "1_build_datasets": + raise ValueError("Stage 2 inputs require a Stage 1 dataset-build contract") + source_artifact = _contract_artifact( + contract, + logical_names=_SOURCE_DATASET_LOGICAL_NAMES, + label="source dataset", + ) + target_artifact = _contract_artifact( + contract, + logical_names=_TARGET_DATABASE_LOGICAL_NAMES, + label="target database", + ) + source_dataset = _artifact_uri_to_path(source_artifact.uri) + target_database = _artifact_uri_to_path(target_artifact.uri) + root = Path(artifacts_dir) if artifacts_dir is not None else source_dataset.parent + return Stage2InputBundle( + artifacts_dir=root, + source_dataset=source_dataset, + target_database=target_database, + source="stage1_contract", + stage1_contract_path=Path(contract_path) if contract_path is not None else None, + stage1_contract_run_id=getattr(contract, "run_id", None), + expected_source_dataset_sha256=getattr(source_artifact, "sha256", None), + expected_source_dataset_size_bytes=getattr( + source_artifact, + "size_bytes", + None, + ), + expected_target_database_sha256=getattr(target_artifact, "sha256", None), + expected_target_database_size_bytes=getattr( + target_artifact, + "size_bytes", + None, + ), + ) + + +def stage2_input_bundle_from_stage1_contract_path( + contract_path: str | Path, + *, + artifacts_dir: str | Path | None = None, +) -> Stage2InputBundle: + """Read a Stage 1 handoff contract and return the Stage 2 input bundle.""" + + from policyengine_us_data.stage_contracts.io import read_contract + + contract_file = Path(contract_path) + return stage2_input_bundle_from_stage1_contract( + read_contract(contract_file), + artifacts_dir=artifacts_dir, + contract_path=contract_file, + ) + + +@pipeline_node( + PipelineNode( + id="stage2_build_context", + label="Stage 2 Build Context", + node_type="library", + description="Bind one run_id to canonical Stage 2 input and output bundles before remote package construction starts.", + source_file="policyengine_us_data/calibration_package/specs.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_in=[ + DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, + SOURCE_DATASET_FILENAME, + TARGET_DATABASE_FILENAME, + ], + artifacts_out=[ + CALIBRATION_PACKAGE_FILENAME, + CALIBRATION_PACKAGE_CONTRACT_FILENAME, + ], + validation_commands=[ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ], + ) +) +def stage2_build_context_for_run( + pipeline_mount: str | Path, + run_id: str | None = "", + *, + stage1_contract_path: str | Path | None = None, +) -> Stage2BuildContext: + """Return Stage 2 run context, preferring the Stage 1 handoff contract.""" + + artifacts_dir = Path(pipeline_mount) / "artifacts" + if run_id: + artifacts_dir = artifacts_dir / run_id + contract_path = ( + Path(stage1_contract_path) + if stage1_contract_path is not None + else artifacts_dir / DATASET_BUILD_OUTPUT_CONTRACT_FILENAME + ) + if stage1_contract_path is not None and not contract_path.exists(): + raise FileNotFoundError(f"Stage 1 contract not found: {contract_path}") + if contract_path.exists(): + input_bundle = stage2_input_bundle_from_stage1_contract_path( + contract_path, + artifacts_dir=artifacts_dir, + ) + else: + input_bundle = stage2_input_bundle_from_artifacts_dir(artifacts_dir) + return Stage2BuildContext( + artifacts_dir=artifacts_dir, + input_bundle=input_bundle, + output_bundle=calibration_package_artifact_paths(artifacts_dir), + run_id=run_id or None, + ) + + @pipeline_node( PipelineNode( id="stage2_artifact_specs", label="Stage 2 Artifact Specs", node_type="library", - description="Centralize calibration package, contract, metadata, and matrix-build artifact paths.", + description="Centralize Stage 2 input, package, contract, metadata, report, and matrix-build artifact names.", source_file="policyengine_us_data/calibration_package/specs.py", status="current", stability="moving", pathways=["calibration_package"], + artifacts_in=[ + SOURCE_DATASET_FILENAME, + TARGET_DATABASE_FILENAME, + ], artifacts_out=[ CALIBRATION_PACKAGE_FILENAME, + CALIBRATION_PACKAGE_METADATA_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME, ], validation_commands=[ @@ -95,15 +487,16 @@ def manifest_outputs(self) -> tuple[Path, Path]: ) def calibration_package_artifact_paths( artifacts_dir: str | Path, -) -> CalibrationPackageArtifactPaths: +) -> CalibrationPackageOutputBundle: """Return canonical Stage 2 paths rooted in an artifacts directory.""" root = Path(artifacts_dir) - return CalibrationPackageArtifactPaths( + return CalibrationPackageOutputBundle( artifacts_dir=root, package=root / CALIBRATION_PACKAGE_FILENAME, metadata=root / CALIBRATION_PACKAGE_METADATA_FILENAME, contract=root / CALIBRATION_PACKAGE_CONTRACT_FILENAME, + reports_dir=root / CALIBRATION_REPORTS_DIRNAME, matrix_build_dir=root / MATRIX_BUILD_DIRNAME, ) @@ -181,17 +574,55 @@ def _logical_identity_path(path: Path, resolved_path: Path, repo_root: Path) -> return resolved_path.as_posix() if path.is_absolute() else path.as_posix() +def _contract_artifact( + contract: "StageContract", + *, + logical_names: tuple[str, ...], + label: str, +) -> Any: + for logical_name in logical_names: + for artifact in getattr(contract, "outputs", ()): + if getattr(artifact, "logical_name", None) == logical_name: + return artifact + raise ValueError( + f"Stage 1 contract is missing required Stage 2 {label}: " + + " or ".join(logical_names) + ) + + +def _artifact_uri_to_path(uri: str) -> Path: + parsed = urlparse(uri) + if parsed.scheme == "file": + return Path(unquote(parsed.path)) + if not parsed.scheme: + return Path(uri) + raise ValueError(f"Unsupported artifact URI scheme for Stage 2 input: {uri}") + + __all__ = [ "CALIBRATION_PACKAGE_CONTRACT_FILENAME", "CALIBRATION_PACKAGE_FILENAME", "CALIBRATION_PACKAGE_METADATA_FILENAME", "CALIBRATION_PACKAGE_SUBSTAGE_ID", + "CALIBRATION_REPORTS_DIRNAME", + "DATASET_BUILD_OUTPUT_CONTRACT_FILENAME", "DEFAULT_TARGET_CONFIG_PATH", "MATRIX_BUILD_DIRNAME", + "SOURCE_DATASET_FILENAME", "TARGET_CONFIG_IDENTITY_MODES", + "TARGET_DATABASE_FILENAME", "CalibrationPackageArtifactPaths", + "CalibrationPackageOutputBundle", + "Stage2BuildContext", + "Stage2InputBundle", + "Stage2InputBundleError", + "Stage2InputSource", "TargetConfigIdentity", "TargetConfigMode", "calibration_package_artifact_paths", "resolve_target_config_identity", + "stage2_build_context_for_run", + "stage2_input_bundle_from_artifacts_dir", + "stage2_input_bundle_from_stage1_contract", + "stage2_input_bundle_from_stage1_contract_path", ] diff --git a/tests/unit/calibration_package/test_specs.py b/tests/unit/calibration_package/test_specs.py index 152ad7ab1..45d7c700a 100644 --- a/tests/unit/calibration_package/test_specs.py +++ b/tests/unit/calibration_package/test_specs.py @@ -1,4 +1,5 @@ from pathlib import Path +from types import SimpleNamespace import pytest @@ -6,15 +7,27 @@ CALIBRATION_PACKAGE_CONTRACT_FILENAME, CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_METADATA_FILENAME, + CALIBRATION_REPORTS_DIRNAME, + DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, DEFAULT_TARGET_CONFIG_PATH, MATRIX_BUILD_DIRNAME, + SOURCE_DATASET_FILENAME, + TARGET_DATABASE_FILENAME, + Stage2InputBundleError, TargetConfigIdentity, calibration_package_artifact_paths, resolve_target_config_identity, + stage2_build_context_for_run, + stage2_input_bundle_from_artifacts_dir, + stage2_input_bundle_from_stage1_contract, ) from policyengine_us_data.stage_contracts.calibration_package import ( CalibrationPackageParameters, ) +from policyengine_us_data.stage_contracts.dataset_build import ( + build_dataset_build_output_contract, +) +from policyengine_us_data.stage_contracts.io import write_contract from policyengine_us_data.utils.manifest import compute_file_checksum @@ -113,7 +126,201 @@ def test_calibration_package_artifact_paths(): assert paths.contract == Path("/pipeline/artifacts/run-a") / ( CALIBRATION_PACKAGE_CONTRACT_FILENAME ) + assert paths.reports_dir == Path("/pipeline/artifacts/run-a") / ( + CALIBRATION_REPORTS_DIRNAME + ) assert paths.matrix_build_dir == Path("/pipeline/artifacts/run-a") / ( MATRIX_BUILD_DIRNAME ) assert paths.manifest_outputs == (paths.package, paths.contract) + + +def test_stage2_input_bundle_from_artifacts_dir(tmp_path): + (tmp_path / SOURCE_DATASET_FILENAME).write_bytes(b"h5") + (tmp_path / TARGET_DATABASE_FILENAME).write_bytes(b"db") + + bundle = stage2_input_bundle_from_artifacts_dir(tmp_path) + + assert bundle.source == "artifacts_dir_fallback" + assert bundle.compatibility_only is True + assert bundle.manifest_inputs == { + "dataset": tmp_path / SOURCE_DATASET_FILENAME, + "database": tmp_path / TARGET_DATABASE_FILENAME, + } + assert bundle.validation_report().status == "pass" + + +def test_stage2_input_bundle_from_fake_stage1_contract(tmp_path): + dataset = tmp_path / SOURCE_DATASET_FILENAME + database = tmp_path / TARGET_DATABASE_FILENAME + dataset.write_bytes(b"h5") + database.write_bytes(b"db") + contract = SimpleNamespace( + stage_id="1_build_datasets", + run_id="run-a", + outputs=( + SimpleNamespace( + logical_name="source_imputed_stratified_extended_cps", + uri=dataset.resolve().as_uri(), + sha256=_sha256_digest(dataset), + size_bytes=dataset.stat().st_size, + ), + SimpleNamespace( + logical_name="policy_data_db", + uri=database.resolve().as_uri(), + sha256=_sha256_digest(database), + size_bytes=database.stat().st_size, + ), + ), + ) + + bundle = stage2_input_bundle_from_stage1_contract( + contract, + artifacts_dir=tmp_path, + contract_path=tmp_path / DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, + ) + + assert bundle.source == "stage1_contract" + assert bundle.compatibility_only is False + assert bundle.stage1_contract_run_id == "run-a" + assert bundle.stage1_contract_path == ( + tmp_path / DATASET_BUILD_OUTPUT_CONTRACT_FILENAME + ) + assert bundle.source_dataset == dataset + assert bundle.target_database == database + assert bundle.expected_input_identities == { + "dataset": { + "sha256": _sha256_digest(dataset), + "size_bytes": dataset.stat().st_size, + }, + "database": { + "sha256": _sha256_digest(database), + "size_bytes": database.stat().st_size, + }, + } + assert bundle.validation_report().status == "pass" + + +def test_stage2_input_bundle_validates_stage1_contract_identity(tmp_path): + dataset = tmp_path / SOURCE_DATASET_FILENAME + database = tmp_path / TARGET_DATABASE_FILENAME + dataset.write_bytes(b"h5") + database.write_bytes(b"db") + contract = SimpleNamespace( + stage_id="1_build_datasets", + run_id="run-a", + outputs=( + SimpleNamespace( + logical_name="source_imputed_stratified_extended_cps", + uri=dataset.resolve().as_uri(), + sha256="sha256:not-the-dataset", + size_bytes=dataset.stat().st_size, + ), + SimpleNamespace( + logical_name="policy_data_db", + uri=database.resolve().as_uri(), + sha256=_sha256_digest(database), + size_bytes=database.stat().st_size + 1, + ), + ), + ) + + bundle = stage2_input_bundle_from_stage1_contract(contract) + report = bundle.validation_report() + + assert report.status == "fail" + assert [finding.check_id for finding in report.findings] == [ + "stage2_input_identity:dataset:sha256", + "stage2_input_identity:database:size_bytes", + ] + assert report.metadata["expected_identities"] == { + "dataset": { + "sha256": "sha256:not-the-dataset", + "size_bytes": dataset.stat().st_size, + }, + "database": { + "sha256": _sha256_digest(database), + "size_bytes": database.stat().st_size + 1, + }, + } + with pytest.raises(Stage2InputBundleError, match="checksum mismatch"): + bundle.require_existing() + + +def test_stage2_input_bundle_missing_required_artifacts_are_actionable(tmp_path): + (tmp_path / SOURCE_DATASET_FILENAME).write_bytes(b"h5") + bundle = stage2_input_bundle_from_artifacts_dir(tmp_path) + + report = bundle.validation_report() + + assert report.status == "fail" + assert [finding.check_id for finding in report.findings] == [ + "stage2_input_exists:database" + ] + assert str(tmp_path / TARGET_DATABASE_FILENAME) in report.findings[0].message + with pytest.raises(Stage2InputBundleError, match="database"): + bundle.require_existing() + + +def test_stage2_build_context_prefers_stage1_contract(tmp_path): + artifacts_dir = tmp_path / "artifacts" / "run-a" + artifacts_dir.mkdir(parents=True) + for filename in ( + "acs_2022.h5", + "irs_puf_2015.h5", + "cps_2024.h5", + "puf_2024.h5", + "extended_cps_2024.h5", + "enhanced_cps_2024.h5", + "small_enhanced_cps_2024.h5", + "stratified_extended_cps_2024.h5", + "source_imputed_stratified_extended_cps_2024.h5", + SOURCE_DATASET_FILENAME, + TARGET_DATABASE_FILENAME, + "build_log.txt", + "data_build_checkpoint_stats.json", + ): + (artifacts_dir / filename).write_bytes(filename.encode("utf-8")) + contract_path = artifacts_dir / DATASET_BUILD_OUTPUT_CONTRACT_FILENAME + write_contract( + build_dataset_build_output_contract( + artifacts_dir=artifacts_dir, + run_id="run-a", + code_sha="abc123", + package_version="1.0.0", + checkpoint_stats={}, + started_at="2026-01-01T00:00:00+00:00", + completed_at="2026-01-01T00:00:01+00:00", + duration_s=1.0, + ), + contract_path, + ) + + context = stage2_build_context_for_run(tmp_path, "run-a") + + assert context.input_bundle.source == "stage1_contract" + assert ( + context.input_bundle.source_dataset == artifacts_dir / SOURCE_DATASET_FILENAME + ) + assert ( + context.input_bundle.target_database == artifacts_dir / TARGET_DATABASE_FILENAME + ) + assert context.input_bundle.expected_source_dataset_sha256 == _sha256_digest( + artifacts_dir / SOURCE_DATASET_FILENAME + ) + assert ( + context.input_bundle.expected_target_database_size_bytes + == (artifacts_dir / TARGET_DATABASE_FILENAME).stat().st_size + ) + assert context.output_bundle.package == artifacts_dir / CALIBRATION_PACKAGE_FILENAME + + +def test_stage2_build_context_rejects_explicit_missing_stage1_contract(tmp_path): + explicit_contract = tmp_path / "missing-dataset-build-output.json" + + with pytest.raises(FileNotFoundError, match="Stage 1 contract not found"): + stage2_build_context_for_run( + tmp_path, + "run-a", + stage1_contract_path=explicit_contract, + ) diff --git a/tests/unit/test_pipeline_docs_extractor.py b/tests/unit/test_pipeline_docs_extractor.py index 27f16b177..79b39fa0d 100644 --- a/tests/unit/test_pipeline_docs_extractor.py +++ b/tests/unit/test_pipeline_docs_extractor.py @@ -130,6 +130,8 @@ def test_pipeline_map_manifest_validates(): ) stage2_node_ids = {node["id"] for node in stage2["nodes"]} assert { + "stage2_input_bundle", + "stage2_build_context", "stage2_target_config_identity", "stage2_target_config_load", "build_matrix", diff --git a/tests/unit/test_pipeline_source_contracts.py b/tests/unit/test_pipeline_source_contracts.py index 1f964af08..350a5043c 100644 --- a/tests/unit/test_pipeline_source_contracts.py +++ b/tests/unit/test_pipeline_source_contracts.py @@ -79,7 +79,10 @@ def test_stage_2_manifest_records_package_and_contract_outputs() -> None: run_pipeline = _function_def(tree, "run_pipeline") source = ast.get_source_segment(source_text, run_pipeline) - assert "package_artifacts = calibration_package_artifact_paths(" in source + assert "package_context = stage2_build_context_for_run(" in source + assert "package_context.input_bundle.manifest_inputs" in source + assert 'package_inputs["input_validation"]' in source + assert "package_artifacts = package_context.output_bundle" in source assert "package_artifacts.manifest_outputs" in source