From e0ed19bf658443c431eccf9c87ea09e710241fb5 Mon Sep 17 00:00:00 2001 From: Anshuman Tiwari Date: Tue, 14 Oct 2025 13:15:40 +0530 Subject: [PATCH 1/5] feat: add build_table_metrics_query method for enhanced column metrics retrieval --- dcs_core/integrations/databases/postgres.py | 89 ++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/dcs_core/integrations/databases/postgres.py b/dcs_core/integrations/databases/postgres.py index f096af2..9f765dc 100644 --- a/dcs_core/integrations/databases/postgres.py +++ b/dcs_core/integrations/databases/postgres.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple -from sqlalchemy import create_engine, text +from sqlalchemy import UUID, create_engine, text from sqlalchemy.engine import URL from dcs_core.core.common.errors import DataChecksDataSourcesConnectionError @@ -309,3 +311,88 @@ def fetch_sample_values_from_database( result = self.connection.execute(text(query)) rows = result.fetchall() return rows + + def build_table_metrics_query( + self, table_name: str, column_info: list[dict] + ) -> list[dict]: + query_parts = [] + + for col in column_info: + name = col["column_name"] + dtype = col["data_type"].lower() + + query_parts.append(f'COUNT(DISTINCT "{name}") AS "{name}_distinct"') + query_parts.append( + f'COUNT(*) - COUNT(DISTINCT "{name}") AS "{name}_duplicate"' + ) + query_parts.append( + f'SUM(CASE WHEN "{name}" IS NULL THEN 1 ELSE 0 END) AS "{name}_is_null"' + ) + + if dtype in ( + "int", + "integer", + "bigint", + "smallint", + "decimal", + "numeric", + "float", + "double", + ): + query_parts.append(f'MIN("{name}") AS "{name}_min"') + query_parts.append(f'MAX("{name}") AS "{name}_max"') + query_parts.append(f'AVG("{name}") AS "{name}_average"') + + elif dtype in ("varchar", "text", "char", "string", "character varying"): + query_parts.append( + f'MAX(CHAR_LENGTH("{name}")) AS "{name}_max_character_length"' + ) + + query = f"SELECT\n {',\n '.join(query_parts)}\nFROM {table_name};" + + result = self.connection.execute(text(query)) + row = dict(list(result)[0]._mapping) + + def _normalize_metrics(value): + """ + Safely normalizes DB metric values into JSON-serializable Python types. + Handles: + - Decimal → float + - datetime/date → ISO 8601 string + - UUID → string + - Nested dict/list recursion + - None passthrough + """ + if value is None: + return None + + if isinstance(value, Decimal): + return float(value) + if isinstance(value, (int, float, bool)): + return value + + if isinstance(value, (datetime, datetime.date)): + return value.isoformat() + + if isinstance(value, UUID): + return str(value) + + if isinstance(value, list): + return [_normalize_metrics(v) for v in value] + if isinstance(value, dict): + return {k: _normalize_metrics(v) for k, v in value.items()} + + return str(value) + + column_wise = [] + for col in column_info: + name = col["column_name"] + col_metrics = {} + + for key, value in row.items(): + if key.startswith(f"{name}_"): + metric_name = key[len(name) + 1 :] + col_metrics[metric_name] = _normalize_metrics(value) + + column_wise.append({"column_name": name, "metrics": col_metrics}) + return column_wise From 989ee55e467498daf8381a2026575865b5c353b1 Mon Sep 17 00:00:00 2001 From: Anshuman Tiwari Date: Tue, 14 Oct 2025 13:41:19 +0530 Subject: [PATCH 2/5] feat: enhance build_table_metrics_query to support additional queries and improve column quoting --- dcs_core/integrations/databases/postgres.py | 23 +++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/dcs_core/integrations/databases/postgres.py b/dcs_core/integrations/databases/postgres.py index 9f765dc..8341b87 100644 --- a/dcs_core/integrations/databases/postgres.py +++ b/dcs_core/integrations/databases/postgres.py @@ -313,7 +313,7 @@ def fetch_sample_values_from_database( return rows def build_table_metrics_query( - self, table_name: str, column_info: list[dict] + self, table_name: str, column_info: list[dict], additional_queries:Optional[list[str]]=None ) -> list[dict]: query_parts = [] @@ -321,12 +321,12 @@ def build_table_metrics_query( name = col["column_name"] dtype = col["data_type"].lower() - query_parts.append(f'COUNT(DISTINCT "{name}") AS "{name}_distinct"') + query_parts.append(f'COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_distinct"') query_parts.append( - f'COUNT(*) - COUNT(DISTINCT "{name}") AS "{name}_duplicate"' + f'COUNT(*) - COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_duplicate"' ) query_parts.append( - f'SUM(CASE WHEN "{name}" IS NULL THEN 1 ELSE 0 END) AS "{name}_is_null"' + f'SUM(CASE WHEN {self.quote_column(name)} IS NULL THEN 1 ELSE 0 END) AS "{name}_is_null"' ) if dtype in ( @@ -339,16 +339,21 @@ def build_table_metrics_query( "float", "double", ): - query_parts.append(f'MIN("{name}") AS "{name}_min"') - query_parts.append(f'MAX("{name}") AS "{name}_max"') - query_parts.append(f'AVG("{name}") AS "{name}_average"') + query_parts.append(f'MIN({self.quote_column(name)}) AS "{name}_min"') + query_parts.append(f'MAX({self.quote_column(name)}) AS "{name}_max"') + query_parts.append(f'AVG({self.quote_column(name)}) AS "{name}_average"') elif dtype in ("varchar", "text", "char", "string", "character varying"): query_parts.append( - f'MAX(CHAR_LENGTH("{name}")) AS "{name}_max_character_length"' + f'MAX(CHAR_LENGTH({self.quote_column(name)})) AS "{name}_max_character_length"' ) + + if additional_queries: + for queries in additional_queries: + query_parts.append(queries) - query = f"SELECT\n {',\n '.join(query_parts)}\nFROM {table_name};" + qualified_table = self.qualified_table_name(table_name) + query = f'SELECT\n {",\n ".join(query_parts)}\nFROM {qualified_table};' result = self.connection.execute(text(query)) row = dict(list(result)[0]._mapping) From 1eab27947f0d5b9fc84bca3d2b02ef5dc821e005 Mon Sep 17 00:00:00 2001 From: Anshuman Tiwari Date: Tue, 14 Oct 2025 13:45:07 +0530 Subject: [PATCH 3/5] fix: correct import statement for UUID and update type hint for additional_queries in build_table_metrics_query --- dcs_core/integrations/databases/postgres.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dcs_core/integrations/databases/postgres.py b/dcs_core/integrations/databases/postgres.py index 8341b87..81ebb42 100644 --- a/dcs_core/integrations/databases/postgres.py +++ b/dcs_core/integrations/databases/postgres.py @@ -15,8 +15,9 @@ import datetime from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple +from uuid import UUID -from sqlalchemy import UUID, create_engine, text +from sqlalchemy import create_engine, text from sqlalchemy.engine import URL from dcs_core.core.common.errors import DataChecksDataSourcesConnectionError @@ -313,7 +314,7 @@ def fetch_sample_values_from_database( return rows def build_table_metrics_query( - self, table_name: str, column_info: list[dict], additional_queries:Optional[list[str]]=None + self, table_name: str, column_info: list[dict], additional_queries:Optional[List[str]]=None ) -> list[dict]: query_parts = [] From ff9a6d1007229e11227d15046a26cc1e43379aef Mon Sep 17 00:00:00 2001 From: Anshuman Tiwari Date: Tue, 14 Oct 2025 13:46:26 +0530 Subject: [PATCH 4/5] fix: file format --- dcs_core/integrations/databases/postgres.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/dcs_core/integrations/databases/postgres.py b/dcs_core/integrations/databases/postgres.py index 81ebb42..10ad830 100644 --- a/dcs_core/integrations/databases/postgres.py +++ b/dcs_core/integrations/databases/postgres.py @@ -314,7 +314,10 @@ def fetch_sample_values_from_database( return rows def build_table_metrics_query( - self, table_name: str, column_info: list[dict], additional_queries:Optional[List[str]]=None + self, + table_name: str, + column_info: list[dict], + additional_queries: Optional[List[str]] = None, ) -> list[dict]: query_parts = [] @@ -322,7 +325,9 @@ def build_table_metrics_query( name = col["column_name"] dtype = col["data_type"].lower() - query_parts.append(f'COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_distinct"') + query_parts.append( + f'COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_distinct"' + ) query_parts.append( f'COUNT(*) - COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_duplicate"' ) @@ -342,13 +347,15 @@ def build_table_metrics_query( ): query_parts.append(f'MIN({self.quote_column(name)}) AS "{name}_min"') query_parts.append(f'MAX({self.quote_column(name)}) AS "{name}_max"') - query_parts.append(f'AVG({self.quote_column(name)}) AS "{name}_average"') + query_parts.append( + f'AVG({self.quote_column(name)}) AS "{name}_average"' + ) elif dtype in ("varchar", "text", "char", "string", "character varying"): query_parts.append( f'MAX(CHAR_LENGTH({self.quote_column(name)})) AS "{name}_max_character_length"' ) - + if additional_queries: for queries in additional_queries: query_parts.append(queries) From f6c0119508b130c87ece1ce1ce0284f6d3ecc5d4 Mon Sep 17 00:00:00 2001 From: Anshuman Tiwari Date: Tue, 14 Oct 2025 13:51:25 +0530 Subject: [PATCH 5/5] fix: handle empty column_info in build_table_metrics_query and correct datetime import --- dcs_core/integrations/databases/postgres.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dcs_core/integrations/databases/postgres.py b/dcs_core/integrations/databases/postgres.py index 10ad830..ecd3525 100644 --- a/dcs_core/integrations/databases/postgres.py +++ b/dcs_core/integrations/databases/postgres.py @@ -320,6 +320,8 @@ def build_table_metrics_query( additional_queries: Optional[List[str]] = None, ) -> list[dict]: query_parts = [] + if not column_info: + return [] for col in column_info: name = col["column_name"] @@ -384,7 +386,7 @@ def _normalize_metrics(value): if isinstance(value, (int, float, bool)): return value - if isinstance(value, (datetime, datetime.date)): + if isinstance(value, (datetime.datetime, datetime.date)): return value.isoformat() if isinstance(value, UUID):