-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathtableRowInsertedCountToBeBetween.py
More file actions
104 lines (86 loc) · 3.59 KB
/
tableRowInsertedCountToBeBetween.py
File metadata and controls
104 lines (86 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validator for table row inserted count to be between test case
"""
import traceback
from abc import abstractmethod
from typing import cast
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
ROW_COUNT = "rowCount"
class BaseTableRowInsertedCountToBeBetweenValidator(BaseTestValidator):
"""Validator table row inserted count to be between test case"""
def _run_validation(self) -> TestCaseResult:
"""Execute the specific test validation logic
This method contains the core validation logic that was previously
in the run_validation method.
Returns:
TestCaseResult: The test case result for the overall validation
"""
column_name = self._get_column_name()
range_type = self.get_test_case_param_value(
self.test_case.parameterValues, # type: ignore
"rangeType",
str,
)
range_interval = self.get_test_case_param_value(
self.test_case.parameterValues, # type: ignore
"rangeInterval",
int,
)
try:
if any(var is None for var in [column_name, range_type, range_interval]):
raise ValueError("No value found for columnName, rangeType or rangeInterval")
range_interval = cast(int, range_interval)
column_name = cast(str, column_name)
range_type = cast(str, range_type)
res = self._run_results(column_name, range_type, range_interval)
except Exception as exc:
msg = f"Error computing {self.test_case.name}: {exc}" # type: ignore
logger.debug(traceback.format_exc())
logger.error(msg)
return self.get_test_case_result_object(
self.execution_date,
TestCaseStatus.Aborted,
msg,
[TestResultValue(name=ROW_COUNT, value=None)],
)
min_bound = self.get_test_case_param_value(
self.test_case.parameterValues, # type: ignore
"min",
int,
float("-inf"),
)
max_bound = self.get_test_case_param_value(
self.test_case.parameterValues, # type: ignore
"max",
int,
float("inf"),
)
return self.get_test_case_result_object(
self.execution_date,
self.get_test_case_status(min_bound <= res <= max_bound),
f"Found insertedRows={res} vs. the expected min={min_bound}, max={max_bound}.",
[TestResultValue(name=ROW_COUNT, value=str(res))],
)
@abstractmethod
def _get_column_name(self):
raise NotImplementedError
@abstractmethod
def _run_results(self, column_name: str, range_type: str, range_interval: int):
raise NotImplementedError