Skip to content

Commit 0fa82de

Browse files
committed
hx-4db4f06e migrate validator off gx_wrapper
Refs: docs/helix/01-frame/features/FEAT-024-validator-consolidation.md
1 parent 04cb8ae commit 0fa82de

4 files changed

Lines changed: 235 additions & 87 deletions

File tree

src/tablespec/validation/gx_executor.py

Lines changed: 154 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
"""Suite-level GX execution with staged validation support.
22
3-
Executes entire expectation suites in a single batch pass using the GX Pandas
3+
Executes entire expectation suites in a single batch pass via the GX Spark
44
engine, replacing the per-expectation validator pattern in gx_wrapper.py.
5-
Supports staged execution where raw (string) and ingested (typed) expectations
6-
route to different DataFrames.
5+
6+
Requires a Spark or Sail session — use ``get_session()`` from
7+
``tablespec.session`` to obtain one.
8+
9+
Supports staged execution where raw (string) and ingested (typed)
10+
expectations route to different DataFrames.
711
"""
812

913
from __future__ import annotations
@@ -62,16 +66,23 @@ class StagedExecutionResult:
6266

6367

6468
class GXSuiteExecutor:
65-
"""Execute GX expectation suites efficiently.
69+
"""Execute GX expectation suites against Spark DataFrames.
70+
71+
Requires a Spark or Sail session. All validation runs through the GX
72+
Spark execution engine.
6673
6774
Supports two execution modes:
6875
- execute_suite(): Run all expectations against a single DataFrame
6976
- execute_staged(): Classify and route expectations to raw/ingested DataFrames
70-
71-
Uses GX Pandas execution engine for lightweight validation.
7277
"""
7378

74-
def __init__(self) -> None:
79+
def __init__(self, spark: Any | None = None) -> None:
80+
"""Initialise the executor.
81+
82+
Args:
83+
spark: A ``SparkSession`` (from Spark or Sail).
84+
"""
85+
self._spark = spark
7586
self._context: Any | None = None
7687

7788
def _get_context(self) -> Any:
@@ -81,95 +92,55 @@ def _get_context(self) -> Any:
8192
self._context = gx.get_context() # type: ignore[attr-defined]
8293
return self._context
8394

95+
# ------------------------------------------------------------------
96+
# Public API
97+
# ------------------------------------------------------------------
98+
8499
def execute_suite(
85100
self,
86-
df: Any, # pandas DataFrame
101+
df: Any,
87102
expectations: list[dict[str, Any]],
88103
) -> SuiteExecutionResult:
89-
"""Execute all expectations against a single pandas DataFrame in one batch.
104+
"""Execute all expectations against a Spark DataFrame in one batch.
90105
91106
Args:
92-
df: A pandas DataFrame to validate.
107+
df: A PySpark DataFrame (from Spark or Sail session).
93108
expectations: List of expectation dicts with 'type', 'kwargs', and
94109
optional 'meta' keys.
95110
96111
Returns:
97112
SuiteExecutionResult with per-expectation results and summary counts.
98113
"""
99-
from great_expectations.core import ExpectationSuite as GXSuite
100-
from great_expectations.core import ValidationDefinition
101-
from great_expectations.expectations.expectation_configuration import (
102-
ExpectationConfiguration,
103-
)
104-
105114
if not expectations:
106115
return SuiteExecutionResult.from_results([])
107116

108-
context = self._get_context()
109-
run_id = uuid.uuid4().hex[:8]
117+
return self._execute_spark(df, expectations)
110118

111-
# Build suite with all expectations
112-
suite = GXSuite(name=f"suite_{run_id}")
113-
for exp in expectations:
114-
exp_type = exp.get("type", exp.get("expectation_type", ""))
115-
kwargs = exp.get("kwargs", {})
116-
meta = exp.get("meta", {})
117-
suite.add_expectation_configuration(
118-
ExpectationConfiguration(type=exp_type, kwargs=kwargs, meta=meta)
119-
)
120-
suite = context.suites.add(suite)
121-
122-
# Set up pandas datasource, asset, and batch definition
123-
ds = context.data_sources.pandas_default
124-
asset_name = f"asset_{run_id}"
125-
batch_name = f"batch_{run_id}"
126-
asset = ds.add_dataframe_asset(asset_name)
127-
batch_def = asset.add_batch_definition_whole_dataframe(batch_name)
128-
129-
# Create validation definition and run
130-
vd_name = f"vd_{run_id}"
131-
vd = context.validation_definitions.add(
132-
ValidationDefinition(name=vd_name, suite=suite, data=batch_def)
119+
def validate_expectation(
120+
self,
121+
exp_type: str,
122+
kwargs: dict[str, Any],
123+
meta: dict[str, Any] | None = None,
124+
) -> tuple[bool, str | None]:
125+
"""Validate a single expectation configuration without executing it."""
126+
from great_expectations.core import ExpectationSuite as GXSuite
127+
from great_expectations.expectations.expectation_configuration import (
128+
ExpectationConfiguration,
133129
)
134130

135131
try:
136-
validation_result = vd.run(batch_parameters={"dataframe": df})
137-
138-
# Parse results
139-
results: list[ExpectationResult] = []
140-
for res in validation_result.results:
141-
result_dict = res.to_json_dict() if hasattr(res, "to_json_dict") else {}
142-
result_obj = result_dict.get("result", {})
143-
exp_config = result_dict.get("expectation_config", {})
144-
145-
results.append(
146-
ExpectationResult(
147-
expectation_type=exp_config.get("type", ""),
148-
success=result_dict.get("success", False),
149-
column=exp_config.get("kwargs", {}).get("column"),
150-
observed_value=result_obj.get("observed_value"),
151-
unexpected_count=result_obj.get("unexpected_count", 0),
152-
unexpected_values=result_obj.get("partial_unexpected_list", []),
153-
details=result_obj,
154-
)
155-
)
156-
157-
return SuiteExecutionResult.from_results(results)
158-
finally:
159-
# Cleanup ephemeral resources
160-
try:
161-
context.suites.delete(suite.name)
162-
except Exception:
163-
pass
164-
try:
165-
ds.delete_asset(asset_name)
166-
except Exception:
167-
pass
132+
suite = GXSuite(name="validation_test")
133+
suite.add_expectation_configuration(
134+
ExpectationConfiguration(type=exp_type, kwargs=kwargs, meta=meta or {})
135+
)
136+
return (True, None)
137+
except Exception as exc:
138+
return (False, str(exc))
168139

169140
def execute_staged(
170141
self,
171-
raw_df: Any, # pandas DataFrame with all string columns (raw stage)
172-
ingested_df: Any, # pandas DataFrame with typed columns (ingested stage)
142+
raw_df: Any,
143+
ingested_df: Any,
173144
expectations: list[dict[str, Any]],
174145
) -> StagedExecutionResult:
175146
"""Classify expectations by stage and execute against appropriate DataFrame.
@@ -179,8 +150,8 @@ def execute_staged(
179150
Redundant/unknown expectations are skipped.
180151
181152
Args:
182-
raw_df: DataFrame with string columns representing raw/bronze data.
183-
ingested_df: DataFrame with typed columns representing ingested data.
153+
raw_df: Spark DataFrame with string columns representing raw/bronze data.
154+
ingested_df: Spark DataFrame with typed columns representing ingested data.
184155
expectations: List of expectation dicts to classify and execute.
185156
186157
Returns:
@@ -228,3 +199,110 @@ def execute_staged(
228199
ingested=ingested_result,
229200
skipped=skipped,
230201
)
202+
203+
# ------------------------------------------------------------------
204+
# Internal: Spark execution (works with Spark & Sail)
205+
# ------------------------------------------------------------------
206+
207+
def _execute_spark(
208+
self,
209+
df: Any,
210+
expectations: list[dict[str, Any]],
211+
) -> SuiteExecutionResult:
212+
"""Execute expectations against a Spark DataFrame via the GX Spark engine."""
213+
from great_expectations.core import ExpectationSuite as GXSuite
214+
from great_expectations.core import ValidationDefinition
215+
from great_expectations.expectations.expectation_configuration import (
216+
ExpectationConfiguration,
217+
)
218+
219+
context = self._get_context()
220+
run_id = uuid.uuid4().hex[:8]
221+
222+
# Build suite
223+
suite = GXSuite(name=f"suite_{run_id}")
224+
for exp in expectations:
225+
exp_type = exp.get("type", exp.get("expectation_type", ""))
226+
kwargs = exp.get("kwargs", {})
227+
meta = exp.get("meta", {})
228+
suite.add_expectation_configuration(
229+
ExpectationConfiguration(type=exp_type, kwargs=kwargs, meta=meta)
230+
)
231+
suite = context.suites.add(suite)
232+
233+
# Set up Spark datasource and asset
234+
ds_name = f"spark_ds_{run_id}"
235+
asset_name = f"spark_asset_{run_id}"
236+
batch_name = f"spark_batch_{run_id}"
237+
238+
ds = None
239+
vd_name = f"vd_{run_id}"
240+
try:
241+
ds = context.data_sources.add_spark(name=ds_name)
242+
asset = ds.add_dataframe_asset(name=asset_name)
243+
batch_def = asset.add_batch_definition_whole_dataframe(batch_name)
244+
245+
vd = context.validation_definitions.add(
246+
ValidationDefinition(name=vd_name, suite=suite, data=batch_def)
247+
)
248+
249+
validation_result = vd.run(batch_parameters={"dataframe": df})
250+
return self._parse_validation_result(validation_result)
251+
finally:
252+
self._cleanup(context, suite, ds, ds_name, asset_name, vd_name)
253+
254+
# ------------------------------------------------------------------
255+
# Shared helpers
256+
# ------------------------------------------------------------------
257+
258+
@staticmethod
259+
def _parse_validation_result(validation_result: Any) -> SuiteExecutionResult:
260+
"""Convert a GX ValidationResult into our SuiteExecutionResult."""
261+
results: list[ExpectationResult] = []
262+
for res in validation_result.results:
263+
result_dict = res.to_json_dict() if hasattr(res, "to_json_dict") else {}
264+
result_obj = result_dict.get("result", {})
265+
exp_config = result_dict.get("expectation_config", {})
266+
267+
results.append(
268+
ExpectationResult(
269+
expectation_type=exp_config.get("type", ""),
270+
success=result_dict.get("success", False),
271+
column=exp_config.get("kwargs", {}).get("column"),
272+
observed_value=result_obj.get("observed_value"),
273+
unexpected_count=result_obj.get("unexpected_count", 0),
274+
unexpected_values=result_obj.get("partial_unexpected_list", []),
275+
details=result_obj,
276+
)
277+
)
278+
279+
return SuiteExecutionResult.from_results(results)
280+
281+
@staticmethod
282+
def _cleanup(
283+
context: Any,
284+
suite: Any,
285+
ds: Any | None,
286+
ds_name: str,
287+
asset_name: str,
288+
vd_name: str | None = None,
289+
) -> None:
290+
"""Clean up ephemeral GX resources."""
291+
if vd_name is not None:
292+
try:
293+
context.validation_definitions.delete(vd_name)
294+
except Exception:
295+
pass
296+
try:
297+
context.suites.delete(suite.name)
298+
except Exception:
299+
pass
300+
if ds is not None:
301+
try:
302+
ds.delete_asset(asset_name)
303+
except Exception:
304+
pass
305+
try:
306+
context.data_sources.delete(ds_name)
307+
except Exception:
308+
pass

src/tablespec/validator.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@
2929
from tablespec.type_mappings import VALID_PYSPARK_TYPES
3030
from tablespec.umf_loader import UMFFormat, UMFLoader
3131

32-
# gx_wrapper may not be ported yet
3332
try:
34-
from tablespec.gx_wrapper import get_gx_wrapper
33+
from tablespec.validation.gx_executor import GXSuiteExecutor
3534
except ImportError:
36-
get_gx_wrapper = None # type: ignore[assignment]
35+
GXSuiteExecutor = None # type: ignore[assignment]
3736

3837
# Cache the JSON schema at module level since it never changes at runtime
3938
_UMF_JSON_SCHEMA: dict[str, Any] = UMF.model_json_schema()
@@ -185,13 +184,13 @@ def validate_table(
185184

186185
# 5. Validate expectations if present
187186
if umf.validation_rules and umf.validation_rules.expectations:
188-
# Get GX wrapper for validation - catches param errors like column_list < 2
189-
gx_wrapper = None
190-
if get_gx_wrapper is not None:
187+
# Validate expectation configuration through the consolidated GX executor.
188+
gx_executor = None
189+
if GXSuiteExecutor is not None:
191190
try:
192-
gx_wrapper = get_gx_wrapper()
191+
gx_executor = GXSuiteExecutor()
193192
except ImportError:
194-
gx_wrapper = None
193+
gx_executor = None
195194

196195
column_names = {col.name for col in umf.columns}
197196

@@ -209,8 +208,8 @@ def validate_table(
209208
meta = exp.get("meta", {})
210209

211210
# Validate expectation with GX library using actual kwargs
212-
if gx_wrapper is not None:
213-
is_valid, gx_error = gx_wrapper.validate_expectation(exp_type, kwargs, meta)
211+
if gx_executor is not None:
212+
is_valid, gx_error = gx_executor.validate_expectation(exp_type, kwargs, meta)
214213
if not is_valid and gx_error:
215214
errors.append(f"Expectation {i} ({exp_type}): {gx_error}")
216215

0 commit comments

Comments
 (0)