Skip to content

Commit 54b2d5f

Browse files
easelclaude
andcommitted
hx-bdb8fff2 Migrate UMFLoader to populate ExpectationSuite on read
ADR-005 Phase B/C: UMFLoader now populates umf.expectations from legacy validation_rules + quality_checks via expectation_migration on both split-format and JSON loads. The split-format saver writes expectations.yaml (cross-column expectations, pending, thresholds, alert_config) with column-specific expectations in column YAML files. Loader changes: - _load_json: populates expectations from legacy fields if absent - _load_column_centric: reads expectations.yaml when present; merges column-level validations into the suite via Expectation.from_gx_dict - Legacy validation_rules.yaml + quality_checks.yaml still supported Saver changes: - _save_split: writes expectations.yaml when umf.expectations is populated - Column-specific expectations split into columns/{name}.yaml validations - Falls back to legacy format when only old fields exist Governing: docs/helix/02-design/adr/ADR-005-unified-expectation-model.md Verification: 59 loader tests pass, 2341 unit tests pass, pyright clean Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f1c3cef commit 54b2d5f

2 files changed

Lines changed: 402 additions & 86 deletions

File tree

src/tablespec/umf_loader.py

Lines changed: 158 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,14 @@ def _load_json(self, file_path: Path) -> UMF:
306306
# Convert all string types to plain Python str for consistency and Spark compatibility
307307
data = self._convert_yaml_to_plain_strings(data)
308308

309+
# Populate expectations from legacy fields if not already present (ADR-005)
310+
if "expectations" not in data and (
311+
"validation_rules" in data or "quality_checks" in data
312+
):
313+
from tablespec.expectation_migration import ensure_expectation_suite_data
314+
315+
data["expectations"] = ensure_expectation_suite_data(data)
316+
309317
umf = UMF(**data)
310318
if hasattr(umf, "mtime"):
311319
umf.mtime = file_path.stat().st_mtime
@@ -402,6 +410,16 @@ def _load_column_centric(self, dir_path: Path) -> UMF:
402410
except FileNotFoundError:
403411
pass
404412

413+
# Load expectations.yaml if it exists (unified expectation suite, ADR-005)
414+
expectations_file = dir_path / "expectations.yaml"
415+
try:
416+
with expectations_file.open() as f:
417+
expectations_data = self.yaml.load(f) or {}
418+
if expectations_data:
419+
umf_data["expectations"] = expectations_data
420+
except FileNotFoundError:
421+
pass
422+
405423
# Load and merge columns from columns/ directory
406424
columns_dir = dir_path / "columns"
407425
if not columns_dir.is_dir():
@@ -437,12 +455,27 @@ def _load_column_centric(self, dir_path: Path) -> UMF:
437455
columns.append(col)
438456

439457
# Extract and merge column-specific validations
440-
if "validations" in column_data:
441-
if "validation_rules" not in umf_data:
442-
umf_data["validation_rules"] = {}
443-
if "expectations" not in umf_data["validation_rules"]:
444-
umf_data["validation_rules"]["expectations"] = []
445-
if isinstance(column_data["validations"], list):
458+
if "validations" in column_data and isinstance(
459+
column_data["validations"], list
460+
):
461+
# When expectations.yaml is loaded (new format), merge column
462+
# validations directly into the expectations suite via migration.
463+
if "expectations" in umf_data:
464+
from tablespec.models.umf import Expectation
465+
466+
if "expectations" not in umf_data["expectations"]:
467+
umf_data["expectations"]["expectations"] = []
468+
for exp_dict in column_data["validations"]:
469+
migrated = Expectation.from_gx_dict(exp_dict).model_dump(
470+
exclude_none=True
471+
)
472+
umf_data["expectations"]["expectations"].append(migrated)
473+
else:
474+
# Legacy path: merge into validation_rules for later migration
475+
if "validation_rules" not in umf_data:
476+
umf_data["validation_rules"] = {}
477+
if "expectations" not in umf_data["validation_rules"]:
478+
umf_data["validation_rules"]["expectations"] = []
446479
umf_data["validation_rules"]["expectations"].extend(
447480
column_data["validations"]
448481
)
@@ -606,92 +639,137 @@ def _save_split(self, umf: UMF, dir_path: Path) -> None:
606639

607640
# Note: Validation rules are NOT saved in table.yaml
608641
# They are split between:
609-
# - validation_rules.yaml (for cross-column rules)
610-
# - columns/{column_name}.yaml (for column-specific rules)
642+
# - expectations.yaml (cross-column expectations, ADR-005 unified format)
643+
# - columns/{column_name}.yaml (column-specific expectations)
644+
# Legacy: validation_rules.yaml + quality_checks.yaml (when no unified suite)
611645

612646
self._write_yaml(dir_path / "table.yaml", table_data)
613647

614-
# 3. Save validation_rules.yaml (formerly cross_column_validations.yaml)
615-
expectations_list: list[dict[str, Any]] = []
616-
if umf.validation_rules:
648+
# Build column-validation index for saving column-specific expectations
649+
# alongside column definitions in columns/{name}.yaml.
650+
col_validations_map: dict[str, list[dict[str, Any]]] = {}
617651

618-
def is_cross_column_validation(exp: dict) -> bool:
619-
"""Check if expectation is cross-column (not tied to a single column).
652+
if umf.expectations and umf.expectations.expectations:
653+
# --- ADR-005 unified format: write expectations.yaml ---
654+
all_exps: list[dict[str, Any]] = [
655+
exp.model_dump(exclude_none=True) for exp in umf.expectations.expectations
656+
]
620657

621-
A validation is cross-column if it doesn't have a 'column' kwarg,
622-
or if 'column' is None, empty string, or '-'.
623-
"""
658+
def _is_cross_column(exp: dict) -> bool:
624659
column = exp.get("kwargs", {}).get("column")
625660
return column is None or column in {"", "-"}
626661

627-
if hasattr(umf.validation_rules, "expectations") and umf.validation_rules.expectations:
628-
expectations_list = umf.validation_rules.expectations
629-
elif isinstance(umf.validation_rules, dict):
630-
expectations_list = umf.validation_rules.get("expectations", [])
631-
632-
cross_validations = {
633-
"expectations": [
634-
exp
635-
for exp in (expectations_list or [])
636-
if isinstance(exp, dict) and is_cross_column_validation(exp)
662+
cross_exps = [e for e in all_exps if _is_cross_column(e)]
663+
for e in all_exps:
664+
if not _is_cross_column(e):
665+
col = e["kwargs"]["column"]
666+
col_validations_map.setdefault(col, []).append(e)
667+
668+
expectations_data: dict[str, Any] = {}
669+
if cross_exps:
670+
expectations_data["expectations"] = cross_exps
671+
if umf.expectations.pending:
672+
expectations_data["pending"] = [
673+
exp.model_dump(exclude_none=True) for exp in umf.expectations.pending
637674
]
638-
}
639-
if cross_validations["expectations"]:
640-
self._write_yaml(dir_path / "validation_rules.yaml", cross_validations)
641-
642-
# Save pending_expectations if present
643-
pending_expectations = []
644-
if (
645-
hasattr(umf.validation_rules, "pending_expectations")
646-
and umf.validation_rules.pending_expectations
647-
):
648-
pending_expectations = umf.validation_rules.pending_expectations
649-
elif isinstance(umf.validation_rules, dict):
650-
pending_expectations = umf.validation_rules.get("pending_expectations", [])
651-
652-
if pending_expectations:
653-
pending_validations = {"pending_expectations": pending_expectations}
654-
self._write_yaml(dir_path / "pending_validations.yaml", pending_validations)
655-
656-
# 3b. Save quality_checks.yaml (post-ingestion quality checks)
657-
quality_checks = getattr(umf, "quality_checks", None)
658-
if quality_checks:
659-
quality_checks_data: dict[str, Any] = {}
660-
if hasattr(quality_checks, "checks") and quality_checks.checks:
661-
quality_checks_data["checks"] = [
662-
check.model_dump(exclude_none=True) if hasattr(check, "model_dump") else check
663-
for check in quality_checks.checks
664-
]
665-
elif isinstance(quality_checks, dict) and quality_checks.get("checks"):
666-
quality_checks_data["checks"] = quality_checks["checks"]
667-
668-
# Persist thresholds/alert_config if present
669-
thresholds = (
670-
getattr(quality_checks, "thresholds", None)
671-
if hasattr(quality_checks, "thresholds")
672-
else None
675+
if umf.expectations.thresholds:
676+
expectations_data["thresholds"] = umf.expectations.thresholds
677+
if umf.expectations.alert_config:
678+
expectations_data["alert_config"] = umf.expectations.alert_config
679+
680+
# Always write expectations.yaml when the unified suite exists,
681+
# even if empty — it serves as a format marker for the loader.
682+
self._write_yaml(
683+
dir_path / "expectations.yaml",
684+
expectations_data or {"expectations": []},
673685
)
674-
if thresholds:
675-
quality_checks_data["thresholds"] = (
676-
thresholds.model_dump(exclude_none=True)
677-
if hasattr(thresholds, "model_dump")
678-
else thresholds
686+
else:
687+
# --- Legacy format: validation_rules.yaml + quality_checks.yaml ---
688+
expectations_list: list[dict[str, Any]] = []
689+
if umf.validation_rules:
690+
691+
def is_cross_column_validation(exp: dict) -> bool:
692+
column = exp.get("kwargs", {}).get("column")
693+
return column is None or column in {"", "-"}
694+
695+
if (
696+
hasattr(umf.validation_rules, "expectations")
697+
and umf.validation_rules.expectations
698+
):
699+
expectations_list = umf.validation_rules.expectations
700+
elif isinstance(umf.validation_rules, dict):
701+
expectations_list = umf.validation_rules.get("expectations", [])
702+
703+
cross_validations = {
704+
"expectations": [
705+
exp
706+
for exp in (expectations_list or [])
707+
if isinstance(exp, dict) and is_cross_column_validation(exp)
708+
]
709+
}
710+
if cross_validations["expectations"]:
711+
self._write_yaml(dir_path / "validation_rules.yaml", cross_validations)
712+
713+
# Build column validation map from legacy expectations
714+
for exp in expectations_list or []:
715+
if isinstance(exp, dict) and not is_cross_column_validation(exp):
716+
col = exp.get("kwargs", {}).get("column")
717+
if col:
718+
col_validations_map.setdefault(col, []).append(exp)
719+
720+
# Save pending_expectations if present
721+
pending_expectations = []
722+
if (
723+
hasattr(umf.validation_rules, "pending_expectations")
724+
and umf.validation_rules.pending_expectations
725+
):
726+
pending_expectations = umf.validation_rules.pending_expectations
727+
elif isinstance(umf.validation_rules, dict):
728+
pending_expectations = umf.validation_rules.get("pending_expectations", [])
729+
730+
if pending_expectations:
731+
pending_validations = {"pending_expectations": pending_expectations}
732+
self._write_yaml(dir_path / "pending_validations.yaml", pending_validations)
733+
734+
quality_checks = getattr(umf, "quality_checks", None)
735+
if quality_checks:
736+
quality_checks_data: dict[str, Any] = {}
737+
if hasattr(quality_checks, "checks") and quality_checks.checks:
738+
quality_checks_data["checks"] = [
739+
check.model_dump(exclude_none=True)
740+
if hasattr(check, "model_dump")
741+
else check
742+
for check in quality_checks.checks
743+
]
744+
elif isinstance(quality_checks, dict) and quality_checks.get("checks"):
745+
quality_checks_data["checks"] = quality_checks["checks"]
746+
747+
thresholds = (
748+
getattr(quality_checks, "thresholds", None)
749+
if hasattr(quality_checks, "thresholds")
750+
else None
679751
)
752+
if thresholds:
753+
quality_checks_data["thresholds"] = (
754+
thresholds.model_dump(exclude_none=True)
755+
if hasattr(thresholds, "model_dump")
756+
else thresholds
757+
)
680758

681-
alert_config = (
682-
getattr(quality_checks, "alert_config", None)
683-
if hasattr(quality_checks, "alert_config")
684-
else None
685-
)
686-
if alert_config:
687-
quality_checks_data["alert_config"] = (
688-
alert_config.model_dump(exclude_none=True)
689-
if hasattr(alert_config, "model_dump")
690-
else alert_config
759+
alert_config = (
760+
getattr(quality_checks, "alert_config", None)
761+
if hasattr(quality_checks, "alert_config")
762+
else None
691763
)
764+
if alert_config:
765+
quality_checks_data["alert_config"] = (
766+
alert_config.model_dump(exclude_none=True)
767+
if hasattr(alert_config, "model_dump")
768+
else alert_config
769+
)
692770

693-
if quality_checks_data.get("checks"):
694-
self._write_yaml(dir_path / "quality_checks.yaml", quality_checks_data)
771+
if quality_checks_data.get("checks"):
772+
self._write_yaml(dir_path / "quality_checks.yaml", quality_checks_data)
695773

696774
# 4. Save columns/ directory
697775
if umf.columns:
@@ -745,14 +823,8 @@ def is_cross_column_validation(exp: dict) -> bool:
745823
col_data["derivation"] = mappings[col_name]
746824

747825
# Add column-specific validations
748-
if umf.validation_rules:
749-
col_validations = [
750-
exp
751-
for exp in (expectations_list or [])
752-
if isinstance(exp, dict) and exp.get("kwargs", {}).get("column") == col_name
753-
]
754-
if col_validations:
755-
col_data["validations"] = col_validations
826+
if col_name in col_validations_map:
827+
col_data["validations"] = col_validations_map[col_name]
756828

757829
self._write_yaml(columns_dir / f"{col_name}.yaml", col_data)
758830

0 commit comments

Comments
 (0)