Skip to content

Commit 98f63c0

Browse files
authored
feat: Add distribution graph below 20 distinct count [#DCS-1244] (#352)
* feat: enhance distinct count handling for JSON columns and add distribution graph for low distinct counts * chore: add environment and installation log files and enhance exception logging in postgres integration. * feat: format metrics data structure for improved clarity in distribution graph handling * refactor: remove commented-out code and clean up distribution graph logic * refactor: remove outdated comments and simplify distinct expression handling for JSON columns * refactor: remove unnecessary comment regarding JSON grouping in PostgresDataSource
1 parent 5f26f7b commit 98f63c0

1 file changed

Lines changed: 64 additions & 4 deletions

File tree

dcs_core/integrations/databases/postgres.py

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,12 +331,16 @@ def build_table_metrics_query(
331331
for col in column_info:
332332
name = col["column_name"]
333333
dtype = col["data_type"].lower()
334+
quoted = self.quote_column(name)
334335

336+
if dtype in ("json", "jsonb"):
337+
distinct_expr = f"{quoted}::text"
338+
else:
339+
distinct_expr = f"{quoted}"
340+
341+
query_parts.append(f'COUNT(DISTINCT {distinct_expr}) AS "{name}_distinct"')
335342
query_parts.append(
336-
f'COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_distinct"'
337-
)
338-
query_parts.append(
339-
f'COUNT(*) - COUNT(DISTINCT {self.quote_column(name)}) AS "{name}_duplicate"'
343+
f'COUNT(*) - COUNT(DISTINCT {distinct_expr}) AS "{name}_duplicate"'
340344
)
341345
query_parts.append(
342346
f'SUM(CASE WHEN {self.quote_column(name)} IS NULL THEN 1 ELSE 0 END) AS "{name}_is_null"'
@@ -415,6 +419,62 @@ def _normalize_metrics(value):
415419
col_metrics[metric_name] = _normalize_metrics(value)
416420

417421
column_wise.append({"column_name": name, "metrics": col_metrics})
422+
423+
for col_data in column_wise:
424+
metrics = col_data["metrics"]
425+
distinct_count = metrics.get("distinct")
426+
col_name = col_data["column_name"]
427+
dtype = next(
428+
c["data_type"].lower()
429+
for c in column_info
430+
if c["column_name"] == col_name
431+
)
432+
433+
if isinstance(distinct_count, (int, float)) and distinct_count < 20:
434+
quoted = self.quote_column(col_name)
435+
436+
if dtype in ("json", "jsonb"):
437+
group_expr = f"{quoted}::text"
438+
else:
439+
group_expr = quoted
440+
441+
dist_query = (
442+
f"SELECT {group_expr}, COUNT(*) "
443+
f"FROM {qualified_table} GROUP BY {group_expr} ORDER BY COUNT(*) DESC"
444+
)
445+
446+
try:
447+
dist_result = self.connection.execute(text(dist_query)).fetchall()
448+
449+
distribution = []
450+
for r in dist_result:
451+
val = _normalize_metrics(r[0])
452+
distribution.append(
453+
{
454+
"col_val": val,
455+
"count": r[1],
456+
}
457+
)
458+
459+
metrics["distribution_graph"] = distribution
460+
461+
except Exception as e:
462+
print(
463+
f"Failed to generate distribution graph for column {col_name}: {e}"
464+
)
465+
466+
for col_data in column_wise:
467+
metrics = col_data["metrics"]
468+
formatted_metrics_data = {
469+
"general_data": {
470+
key: value
471+
for key, value in metrics.items()
472+
if key != "distribution_graph"
473+
},
474+
"distribution_data": metrics.get("distribution_graph", []),
475+
}
476+
col_data["metrics"] = formatted_metrics_data
477+
418478
return column_wise
419479

420480
def get_table_foreign_key_info(self, table_name: str, schema: str | None = None):

0 commit comments

Comments
 (0)