11# -*- coding: utf-8 -*-
22import unittest
33
4- import pytest
4+ from py4j . protocol import Py4JError
55from pyspark .sql import Row
66
7- from pydeequ .analyzers import *
8- from pydeequ .checks import *
9- from pydeequ .repository import *
10- from pydeequ .verification import *
7+ from pydeequ .analyzers import AnalyzerContext , AnalysisRunner , ApproxCountDistinct
8+ from pydeequ .checks import Check , CheckLevel
9+ from pydeequ .repository import FileSystemMetricsRepository , InMemoryMetricsRepository , ResultKey
10+ from pydeequ .verification import VerificationResult , VerificationSuite
1111from tests .conftest import setup_pyspark
1212
1313
@@ -18,7 +18,9 @@ def setUpClass(cls):
1818 cls .AnalysisRunner = AnalysisRunner (cls .spark )
1919 cls .VerificationSuite = VerificationSuite (cls .spark )
2020 cls .sc = cls .spark .sparkContext
21- cls .df = cls .sc .parallelize ([Row (a = "foo" , b = 1 , c = 5 ), Row (a = "bar" , b = 2 , c = 6 ), Row (a = "baz" , b = 3 , c = None )]).toDF ()
21+ cls .df = cls .sc .parallelize (
22+ [Row (a = "foo" , b = 1 , c = 5 ), Row (a = "bar" , b = 2 , c = 6 ), Row (a = "baz" , b = 3 , c = None )]
23+ ).toDF ()
2224
2325 @classmethod
2426 def tearDownClass (cls ):
@@ -121,12 +123,16 @@ def test_verifications_FSmetrep(self):
121123 )
122124
123125 # TEST: Check JSON for tags
124- result_metrep_json = repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsJson ()
126+ result_metrep_json = (
127+ repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsJson ()
128+ )
125129
126130 print (result_metrep_json [0 ]["tag" ], key_tags ["tag" ])
127131 self .assertEqual (result_metrep_json [0 ]["tag" ], key_tags ["tag" ])
128132
129- result_metrep = repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
133+ result_metrep = (
134+ repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
135+ )
130136
131137 df = VerificationResult .checkResultsAsDataFrame (self .spark , result )
132138 print (df .collect ())
@@ -146,7 +152,9 @@ def test_verifications_FSmetrep_noTags_noFile(self):
146152 )
147153
148154 # TEST: Check DF parity
149- result_metrep = repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
155+ result_metrep = (
156+ repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
157+ )
150158
151159 df = VerificationResult .checkResultsAsDataFrame (self .spark , result )
152160 print (df .collect ())
@@ -243,12 +251,16 @@ def test_verifications_IMmetrep(self):
243251 )
244252
245253 # TEST: Check JSON for tags
246- result_metrep_json = repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsJson ()
254+ result_metrep_json = (
255+ repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsJson ()
256+ )
247257
248258 print (result_metrep_json [0 ]["tag" ], key_tags ["tag" ])
249259 self .assertEqual (result_metrep_json [0 ]["tag" ], key_tags ["tag" ])
250260
251- result_metrep = repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
261+ result_metrep = (
262+ repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
263+ )
252264
253265 df = VerificationResult .checkResultsAsDataFrame (self .spark , result )
254266 print (df .collect ())
@@ -267,37 +279,39 @@ def test_verifications_IMmetrep_noTags_noFile(self):
267279 )
268280
269281 # TEST: Check DF parity
270- result_metrep = repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
282+ result_metrep = (
283+ repository .load ().before (ResultKey .current_milli_time ()).getSuccessMetricsAsDataFrame ()
284+ )
271285
272286 df = VerificationResult .checkResultsAsDataFrame (self .spark , result )
273287 print (df .collect ())
274288 print (result_metrep .collect ())
275289
276- @pytest .mark .xfail (reason = "@unittest.expectedFailure" )
277290 def test_fail_no_useRepository (self ):
278- """This test should fail because it doesn't call useRepository() before saveOrAppendResult()"""
291+ """This run fails because it doesn't call useRepository() before saveOrAppendResult(). """
279292 metrics_file = FileSystemMetricsRepository .helper_metrics_file (self .spark , "metrics.json" )
280293 print (f"metrics filepath: { metrics_file } " )
281294 key_tags = {"tag" : "FS metrep analyzers -- FAIL" }
282295 resultKey = ResultKey (self .spark , ResultKey .current_milli_time (), key_tags )
283296
284297 # MISSING useRepository()
285- result = (
286- self .AnalysisRunner .onData (self .df )
287- .addAnalyzer (ApproxCountDistinct ("b" ))
288- .saveOrAppendResult (resultKey )
289- .run ()
290- )
298+ with self .assertRaises (Py4JError ):
299+ _ = (
300+ self .AnalysisRunner .onData (self .df )
301+ .addAnalyzer (ApproxCountDistinct ("b" ))
302+ .saveOrAppendResult (resultKey )
303+ .run ()
304+ )
305+
291306
292- @pytest .mark .xfail (reason = "@unittest.expectedFailure" )
293307 def test_fail_no_load (self ):
294- """This test should fail because we do not load() for the repository reading"""
308+ """This run fails because we do not load() for the repository reading. """
295309 metrics_file = FileSystemMetricsRepository .helper_metrics_file (self .spark , "metrics.json" )
296310 print (f"metrics filepath: { metrics_file } " )
297311 repository = FileSystemMetricsRepository (self .spark , metrics_file )
298312 key_tags = {"tag" : "FS metrep analyzers" }
299313 resultKey = ResultKey (self .spark , ResultKey .current_milli_time (), key_tags )
300- result = (
314+ _ = (
301315 self .AnalysisRunner .onData (self .df )
302316 .addAnalyzer (ApproxCountDistinct ("b" ))
303317 .useRepository (repository )
@@ -306,8 +320,10 @@ def test_fail_no_load(self):
306320 )
307321
308322 # MISSING: repository.load()
309- result_metrep_json = (
310- repository .before (ResultKey .current_milli_time ())
311- .forAnalyzers ([ApproxCountDistinct ("b" )])
312- .getSuccessMetricsAsJson ()
313- )
323+ with self .assertRaises (AttributeError ):
324+ _ = (
325+ repository .before (ResultKey .current_milli_time ())
326+ .forAnalyzers ([ApproxCountDistinct ("b" )])
327+ .getSuccessMetricsAsJson ()
328+ )
329+
0 commit comments