Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions dcs_core/integrations/databases/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID

from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL
Expand Down Expand Up @@ -309,3 +312,102 @@ def fetch_sample_values_from_database(
result = self.connection.execute(text(query))
rows = result.fetchall()
return rows

def build_table_metrics_query(
self,
table_name: str,
column_info: list[dict],
additional_queries: Optional[List[str]] = None,
) -> list[dict]:
query_parts = []
if not column_info:
return []

for col in column_info:
name = col["column_name"]
Comment thread
coderabbitai[bot] marked this conversation as resolved.
dtype = col["data_type"].lower()

query_parts.append(
f'COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_distinct"'
)
query_parts.append(
f'COUNT(*) - COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_duplicate"'
)
query_parts.append(
f'SUM(CASE WHEN {self.quote_column(name)} IS NULL THEN 1 ELSE 0 END) AS "{name}_is_null"'
)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if dtype in (
"int",
"integer",
"bigint",
"smallint",
"decimal",
"numeric",
"float",
"double",
):
query_parts.append(f'MIN({self.quote_column(name)}) AS "{name}_min"')
query_parts.append(f'MAX({self.quote_column(name)}) AS "{name}_max"')
query_parts.append(
f'AVG({self.quote_column(name)}) AS "{name}_average"'
)

elif dtype in ("varchar", "text", "char", "string", "character varying"):
query_parts.append(
f'MAX(CHAR_LENGTH({self.quote_column(name)})) AS "{name}_max_character_length"'
)

if additional_queries:
for queries in additional_queries:
query_parts.append(queries)

qualified_table = self.qualified_table_name(table_name)
query = f'SELECT\n {",\n ".join(query_parts)}\nFROM {qualified_table};'

result = self.connection.execute(text(query))
row = dict(list(result)[0]._mapping)

def _normalize_metrics(value):
"""
Safely normalizes DB metric values into JSON-serializable Python types.
Handles:
- Decimal → float
- datetime/date → ISO 8601 string
- UUID → string
- Nested dict/list recursion
- None passthrough
"""
if value is None:
return None

if isinstance(value, Decimal):
return float(value)
if isinstance(value, (int, float, bool)):
return value

if isinstance(value, (datetime.datetime, datetime.date)):
return value.isoformat()

if isinstance(value, UUID):
return str(value)

if isinstance(value, list):
return [_normalize_metrics(v) for v in value]
if isinstance(value, dict):
return {k: _normalize_metrics(v) for k, v in value.items()}

return str(value)

column_wise = []
for col in column_info:
name = col["column_name"]
col_metrics = {}

for key, value in row.items():
if key.startswith(f"{name}_"):
metric_name = key[len(name) + 1 :]
col_metrics[metric_name] = _normalize_metrics(value)

column_wise.append({"column_name": name, "metrics": col_metrics})
return column_wise
Loading