Skip to content

Commit 2e628b2

Browse files
authored
Add VerificationResult.rowLevelResultsAsDataFrame support (#262)
* Add VerificationResult.rowLevelResultsAsDataFrame support Wrap deequ's VerificationResult.rowLevelResultsAsDataFrame as a classmethod on pydeequ's VerificationResult. This returns the original DataFrame with additional Boolean columns indicating which rows passed or failed each Check. - Add rowLevelResultsAsDataFrame classmethod to VerificationResult - Add tests covering completeness, containedIn, ANDed constraints, aggregate-only checks, column preservation, and pandas output - Update README with usage example Closes #261 * Add orderBy to tests for deterministic row ordering Address review feedback: Spark DataFrames have no guaranteed row order, so add explicit orderBy() before collect() in all tests that assert row-level values. * Add row count assertion to completeness test Verify that rowLevelResultsAsDataFrame preserves the same number of rows as the original DataFrame. * test: add multi-Check test verifying separate Boolean columns per Check Addresses review feedback requesting a test for addCheck(check1).addCheck(check2) producing distinct Boolean columns in row-level results. * Address review feedback: improve AND test and README clarity - README: add sentence explaining multi-constraint AND behavior - Test: use isContainedIn + isComplete so constraints disagree on different rows, properly validating AND logic
1 parent 20693b8 commit 2e628b2

3 files changed

Lines changed: 203 additions & 0 deletions

File tree

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,17 @@ checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
119119
checkResult_df.show()
120120
```
121121

122+
#### Row-Level Results
123+
124+
You can also get row-level results to see which individual rows passed or failed each check. This is useful for quarantining rows with data quality issues:
125+
126+
```python
127+
rowLevelResult_df = VerificationResult.rowLevelResultsAsDataFrame(spark, checkResult, df)
128+
rowLevelResult_df.show()
129+
```
130+
131+
Each check produces a Boolean column (named after the check description) indicating pass/fail per row. When a single Check contains multiple constraints, they are ANDed together into one Boolean column — the row passes only if all constraints in that Check pass. Only checks with row-level-capable constraints (e.g., `isComplete`, `isContainedIn`, `hasPattern`, `isUnique`) will produce output columns.
132+
122133
### Repository
123134

124135
Save to a Metrics Repository by adding the `useRepository()` and `saveOrAppendResult()` calls to your Analysis Runner.

pydeequ/verification.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,34 @@ def checkResultsAsDataFrame(
143143
)
144144
return DataFrame(df, sql_ctx).toPandas() if pandas else DataFrame(df, sql_ctx)
145145

146+
@classmethod
147+
def rowLevelResultsAsDataFrame(
148+
cls, spark_session: SparkSession, verificationResult, data: DataFrame, pandas: bool = False
149+
):
150+
"""
151+
Returns the original DataFrame with additional Boolean columns indicating which rows
152+
passed or failed each Check. Each Check produces one Boolean column named after its
153+
description, where multiple constraints within a Check are ANDed together.
154+
155+
Only checks with row-level-capable constraints (e.g., isComplete, hasPattern, isContainedIn,
156+
isUnique) will produce output columns. Aggregate-only checks (e.g., hasSize) are skipped.
157+
158+
:param SparkSession spark_session: SparkSession
159+
:param verificationResult: The results of the verification run
160+
:param DataFrame data: The original input DataFrame that was verified
161+
:param bool pandas: If True, return a Pandas DataFrame instead of PySpark
162+
:return: DataFrame with original columns plus Boolean columns per qualifying Check
163+
"""
164+
df = spark_session._jvm.com.amazon.deequ.VerificationResult.rowLevelResultsAsDataFrame(
165+
spark_session._jsparkSession, verificationResult.verificationRun, data._jdf
166+
)
167+
sql_ctx = SQLContext(
168+
sparkContext=spark_session._sc,
169+
sparkSession=spark_session,
170+
jsqlContext=spark_session._jsparkSession.sqlContext(),
171+
)
172+
return DataFrame(df, sql_ctx).toPandas() if pandas else DataFrame(df, sql_ctx)
173+
146174

147175
class VerificationRunBuilder:
148176
# TODO Remaining Methods

tests/test_verification.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
# -*- coding: utf-8 -*-
2+
import unittest
3+
4+
import pandas as pd
5+
from pyspark.sql import Row
6+
from pyspark.sql.types import BooleanType
7+
8+
from pydeequ.checks import Check, CheckLevel
9+
from pydeequ.verification import VerificationResult, VerificationSuite
10+
from tests.conftest import setup_pyspark
11+
12+
13+
class TestRowLevelResults(unittest.TestCase):
14+
@classmethod
15+
def setUpClass(cls):
16+
cls.spark = setup_pyspark().appName("test-row-level-results-local").getOrCreate()
17+
cls.sc = cls.spark.sparkContext
18+
cls.df = cls.sc.parallelize(
19+
[
20+
Row(a="foo", b=1, c=5),
21+
Row(a="bar", b=2, c=6),
22+
Row(a="baz", b=3, c=None),
23+
]
24+
).toDF()
25+
26+
@classmethod
27+
def tearDownClass(cls):
28+
# Must shutdown callback for tests to stop
29+
# TODO Document this call to users or encapsulate in PyDeequSession
30+
cls.spark.sparkContext._gateway.shutdown_callback_server()
31+
cls.spark.stop()
32+
33+
def test_row_level_results_with_completeness(self):
34+
"""Test that isComplete produces a Boolean column with correct per-row values."""
35+
check = Check(self.spark, CheckLevel.Error, "completeness_check")
36+
check = check.isComplete("c")
37+
38+
result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
39+
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)
40+
41+
# Should have same row count as original DataFrame
42+
self.assertEqual(row_level_df.count(), self.df.count())
43+
44+
# Should have original columns (a, b, c) plus one Boolean column for the check
45+
self.assertIn("completeness_check", row_level_df.columns)
46+
self.assertTrue(isinstance(row_level_df.schema["completeness_check"].dataType, BooleanType))
47+
48+
# Order by b to ensure deterministic row ordering
49+
# b=1: c=5 (complete), b=2: c=6 (complete), b=3: c=None (incomplete)
50+
results = row_level_df.orderBy("b").select("completeness_check").collect()
51+
values = [row["completeness_check"] for row in results]
52+
self.assertEqual(values, [True, True, False])
53+
54+
def test_row_level_results_with_contained_in(self):
55+
"""Test that isContainedIn produces correct row-level results."""
56+
check = Check(self.spark, CheckLevel.Error, "contained_check")
57+
check = check.isContainedIn("a", ["foo", "bar"])
58+
59+
result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
60+
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)
61+
62+
self.assertIn("contained_check", row_level_df.columns)
63+
64+
# Order by a to ensure deterministic row ordering
65+
# a="bar" (contained), a="baz" (not contained), a="foo" (contained)
66+
results = row_level_df.orderBy("a").select("contained_check").collect()
67+
values = [row["contained_check"] for row in results]
68+
self.assertEqual(values, [True, False, True])
69+
70+
def test_row_level_results_multiple_constraints_anded(self):
71+
"""Test that multiple constraints in one Check are ANDed into a single column."""
72+
check = Check(self.spark, CheckLevel.Error, "multi_check")
73+
check = check.isContainedIn("a", ["foo", "baz"]).isComplete("c")
74+
75+
result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
76+
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)
77+
78+
self.assertIn("multi_check", row_level_df.columns)
79+
80+
# Order by b to ensure deterministic row ordering
81+
# b=1: a=foo (contained), c=5 (complete) -> True AND True = True
82+
# b=2: a=bar (NOT contained), c=6 (complete) -> False AND True = False
83+
# b=3: a=baz (contained), c=None (NOT complete) -> True AND False = False
84+
results = row_level_df.orderBy("b").select("multi_check").collect()
85+
values = [row["multi_check"] for row in results]
86+
self.assertEqual(values, [True, False, False])
87+
88+
def test_row_level_results_aggregate_only_check(self):
89+
"""Test that aggregate-only checks (hasSize) don't add columns."""
90+
check = Check(self.spark, CheckLevel.Warning, "size_check")
91+
check = check.hasSize(lambda x: x >= 3)
92+
93+
result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
94+
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)
95+
96+
# hasSize is aggregate-only, so no new column should be added
97+
self.assertEqual(sorted(row_level_df.columns), sorted(self.df.columns))
98+
99+
def test_row_level_results_preserves_original_columns(self):
100+
"""Test that the original DataFrame columns are preserved."""
101+
check = Check(self.spark, CheckLevel.Error, "preserve_check")
102+
check = check.isComplete("c")
103+
104+
result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
105+
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)
106+
107+
for col in self.df.columns:
108+
self.assertIn(col, row_level_df.columns)
109+
110+
# Verify original data is unchanged (ordered for deterministic comparison)
111+
original_values = self.df.orderBy("b").select("a", "b").collect()
112+
result_values = row_level_df.orderBy("b").select("a", "b").collect()
113+
self.assertEqual(original_values, result_values)
114+
115+
def test_row_level_results_multiple_checks(self):
116+
"""Test that multiple separate Check objects produce multiple Boolean columns."""
117+
check1 = Check(self.spark, CheckLevel.Error, "completeness_check")
118+
check1 = check1.isComplete("c")
119+
120+
check2 = Check(self.spark, CheckLevel.Error, "value_check")
121+
check2 = check2.isContainedIn("a", ["foo", "bar"])
122+
123+
result = (
124+
VerificationSuite(self.spark)
125+
.onData(self.df)
126+
.addCheck(check1)
127+
.addCheck(check2)
128+
.run()
129+
)
130+
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)
131+
132+
# Each Check should produce its own Boolean column
133+
self.assertIn("completeness_check", row_level_df.columns)
134+
self.assertIn("value_check", row_level_df.columns)
135+
self.assertEqual(row_level_df.count(), 3)
136+
137+
# Verify values: c is null for row 3, and "baz" is not in ["foo", "bar"]
138+
results = row_level_df.orderBy("b").select("completeness_check", "value_check").collect()
139+
# Row 1 (a=foo, c=x): complete=True, contained=True
140+
self.assertTrue(results[0]["completeness_check"])
141+
self.assertTrue(results[0]["value_check"])
142+
# Row 2 (a=bar, c=y): complete=True, contained=True
143+
self.assertTrue(results[1]["completeness_check"])
144+
self.assertTrue(results[1]["value_check"])
145+
# Row 3 (a=baz, c=None): complete=False, contained=False
146+
self.assertFalse(results[2]["completeness_check"])
147+
self.assertFalse(results[2]["value_check"])
148+
149+
def test_row_level_results_as_pandas(self):
150+
"""Test the pandas=True option returns a Pandas DataFrame."""
151+
check = Check(self.spark, CheckLevel.Error, "pandas_check")
152+
check = check.isComplete("c")
153+
154+
result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
155+
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(
156+
self.spark, result, self.df, pandas=True
157+
)
158+
159+
self.assertIsInstance(row_level_df, pd.DataFrame)
160+
self.assertIn("pandas_check", row_level_df.columns)
161+
162+
163+
if __name__ == "__main__":
164+
unittest.main()

0 commit comments

Comments
 (0)