diff --git a/dcs_core/integrations/databases/postgres.py b/dcs_core/integrations/databases/postgres.py index f096af26..ecd3525a 100644 --- a/dcs_core/integrations/databases/postgres.py +++ b/dcs_core/integrations/databases/postgres.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple +from uuid import UUID from sqlalchemy import create_engine, text from sqlalchemy.engine import URL @@ -309,3 +312,102 @@ def fetch_sample_values_from_database( result = self.connection.execute(text(query)) rows = result.fetchall() return rows + + def build_table_metrics_query( + self, + table_name: str, + column_info: list[dict], + additional_queries: Optional[List[str]] = None, + ) -> list[dict]: + query_parts = [] + if not column_info: + return [] + + for col in column_info: + name = col["column_name"] + dtype = col["data_type"].lower() + + query_parts.append( + f'COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_distinct"' + ) + query_parts.append( + f'COUNT(*) - COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_duplicate"' + ) + query_parts.append( + f'SUM(CASE WHEN {self.quote_column(name)} IS NULL THEN 1 ELSE 0 END) AS "{name}_is_null"' + ) + + if dtype in ( + "int", + "integer", + "bigint", + "smallint", + "decimal", + "numeric", + "float", + "double", + ): + query_parts.append(f'MIN({self.quote_column(name)}) AS "{name}_min"') + query_parts.append(f'MAX({self.quote_column(name)}) AS "{name}_max"') + query_parts.append( + f'AVG({self.quote_column(name)}) AS "{name}_average"' + ) + + elif dtype in ("varchar", "text", "char", "string", "character varying"): + query_parts.append( + f'MAX(CHAR_LENGTH({self.quote_column(name)})) AS "{name}_max_character_length"' + ) + + if additional_queries: + for queries in additional_queries: + query_parts.append(queries) + + qualified_table = self.qualified_table_name(table_name) + query = f'SELECT\n {",\n ".join(query_parts)}\nFROM {qualified_table};' + + result = self.connection.execute(text(query)) + row = dict(list(result)[0]._mapping) + + def _normalize_metrics(value): + """ + Safely normalizes DB metric values into JSON-serializable Python types. + Handles: + - Decimal → float + - datetime/date → ISO 8601 string + - UUID → string + - Nested dict/list recursion + - None passthrough + """ + if value is None: + return None + + if isinstance(value, Decimal): + return float(value) + if isinstance(value, (int, float, bool)): + return value + + if isinstance(value, (datetime.datetime, datetime.date)): + return value.isoformat() + + if isinstance(value, UUID): + return str(value) + + if isinstance(value, list): + return [_normalize_metrics(v) for v in value] + if isinstance(value, dict): + return {k: _normalize_metrics(v) for k, v in value.items()} + + return str(value) + + column_wise = [] + for col in column_info: + name = col["column_name"] + col_metrics = {} + + for key, value in row.items(): + if key.startswith(f"{name}_"): + metric_name = key[len(name) + 1 :] + col_metrics[metric_name] = _normalize_metrics(value) + + column_wise.append({"column_name": name, "metrics": col_metrics}) + return column_wise