-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathcolumnValueStdDevToBeBetween.py
More file actions
235 lines (192 loc) · 8.38 KB
/
columnValueStdDevToBeBetween.py
File metadata and controls
235 lines (192 loc) · 8.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validator for column value stddev to be between test case
"""
import traceback
from abc import abstractmethod
from typing import List, Optional, Union
from sqlalchemy import Column
from metadata.data_quality.validations.base_test_handler import (
BaseTestValidator,
DimensionInfo,
DimensionResult,
TestEvaluation,
)
from metadata.data_quality.validations.checkers.between_bounds_checker import (
BetweenBoundsChecker,
)
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import test_suite_logger
from metadata.utils.sqa_like_column import SQALikeColumn
logger = test_suite_logger()
STDDEV = "stddev"
class BaseColumnValueStdDevToBeBetweenValidator(BaseTestValidator):
"""Validator for column value stddev to be between test case"""
MIN_BOUND = "minValueForStdDevInCol"
MAX_BOUND = "maxValueForStdDevInCol"
def _run_validation(self) -> TestCaseResult:
"""Execute the specific test validation logic
This method contains the core validation logic that was previously
in the run_validation method.
Returns:
TestCaseResult: The test case result for the overall validation
"""
test_params = self._get_test_parameters()
try:
column: Union[SQALikeColumn, Column] = self.get_column()
stddev_value = self._run_results(Metrics.stddev, column)
metric_values = {
Metrics.stddev.name: stddev_value,
}
except (ValueError, RuntimeError) as exc:
msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore
logger.debug(traceback.format_exc())
logger.error(msg)
return self.get_test_case_result_object(
self.execution_date,
TestCaseStatus.Aborted,
msg,
[TestResultValue(name=STDDEV, value=None)],
)
evaluation = self._evaluate_test_condition(metric_values, test_params)
result_message = self._format_result_message(metric_values, test_params=test_params)
test_result_values = self._get_test_result_values(metric_values)
return self.get_test_case_result_object(
self.execution_date,
self.get_test_case_status(evaluation["matched"]),
result_message,
test_result_values,
min_bound=test_params[self.MIN_BOUND],
max_bound=test_params[self.MAX_BOUND],
)
def _get_validation_checker(self, test_params: dict) -> BetweenBoundsChecker:
"""Get validation checker for this test
Args:
test_params: Test parameters including min and max bounds
Returns:
BetweenBoundsChecker: Checker instance configured with bounds
"""
return BetweenBoundsChecker(
min_bound=test_params[self.MIN_BOUND],
max_bound=test_params[self.MAX_BOUND],
)
def _get_test_parameters(self) -> dict:
"""Get test parameters for this validator
Returns:
dict: Test parameters including min and max bounds
"""
return {
self.MIN_BOUND: self.get_min_bound(self.MIN_BOUND),
self.MAX_BOUND: self.get_max_bound(self.MAX_BOUND),
}
def _get_metrics_to_compute(self, test_params: Optional[dict] = None) -> dict:
"""Get metrics that need to be computed for this test
Args:
test_params: Optional test parameters (unused for stddev validator)
Returns:
dict: Dictionary mapping metric names to Metrics enum values
"""
return {
Metrics.stddev.name: Metrics.stddev,
}
def _evaluate_test_condition(self, metric_values: dict, test_params: dict) -> TestEvaluation:
"""Evaluate the stddev-to-be-between test condition
For stddev test, the condition passes if the stddev value is within the specified bounds.
Since this is a statistical validator (group-level), passed/failed row counts are not applicable.
Args:
metric_values: Dictionary with keys from Metrics enum names
e.g., {"STDDEV": 15.5}
test_params: Dictionary with 'minValueForStdDevInCol' and 'maxValueForStdDevInCol'
Returns:
dict with keys:
- matched: bool - whether test passed (stddev within bounds)
- passed_rows: None - not applicable for statistical validators
- failed_rows: None - not applicable for statistical validators
- total_rows: None - not applicable for statistical validators
"""
stddev_value = metric_values[Metrics.stddev.name]
min_bound = test_params[self.MIN_BOUND]
max_bound = test_params[self.MAX_BOUND]
matched = min_bound <= stddev_value <= max_bound
return {
"matched": matched,
"passed_rows": None,
"failed_rows": None,
"total_rows": None,
}
def _format_result_message(
self,
metric_values: dict,
dimension_info: Optional[DimensionInfo] = None,
test_params: Optional[dict] = None,
) -> str:
"""Format the result message for stddev-to-be-between test
Args:
metric_values: Dictionary with Metrics enum names as keys
dimension_info: Optional DimensionInfo with dimension details
test_params: Test parameters with min/max bounds. Required for this test.
Returns:
str: Formatted result message
"""
if test_params is None:
raise ValueError("test_params is required for columnValueStdDevToBeBetween._format_result_message")
stddev_value = metric_values[Metrics.stddev.name]
min_bound = test_params[self.MIN_BOUND]
max_bound = test_params[self.MAX_BOUND]
if dimension_info:
return (
f"Dimension {dimension_info['dimension_name']}={dimension_info['dimension_value']}: "
f"Found stddev={stddev_value} vs. the expected min={min_bound}, max={max_bound}"
)
else:
return f"Found stddev={stddev_value} vs. the expected min={min_bound}, max={max_bound}."
def _get_test_result_values(self, metric_values: dict) -> List[TestResultValue]:
"""Get test result values for stddev-to-be-between test
Args:
metric_values: Dictionary with Metrics enum names as keys
Returns:
List[TestResultValue]: Test result values for the test case
"""
return [
TestResultValue(
name=STDDEV,
value=str(metric_values[Metrics.stddev.name]),
),
]
@abstractmethod
def _run_results(self, metric: Metrics, column: Union[SQALikeColumn, Column]):
raise NotImplementedError
@abstractmethod
def _execute_dimensional_validation(
self,
column: Union[SQALikeColumn, Column],
dimension_col: Union[SQALikeColumn, Column],
metrics_to_compute: dict,
test_params: dict,
top_n: int,
) -> List[DimensionResult]:
"""Execute dimensional validation query for a single dimension column
Args:
column: The column being tested (e.g., revenue)
dimension_col: The dimension column to group by (e.g., region)
metrics_to_compute: Dict mapping metric names to Metrics enum values
test_params: Test parameters including min and max bounds
top_n: Number of top dimension values before grouping as "Others"
Returns:
List of DimensionResult objects for each dimension value
"""
raise NotImplementedError