Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
f73e26f
Fix #15627: Add profiler metric support for complex data types (json,…
Esvanth Apr 3, 2026
235093a
Refactor: Add is_complex_type helper and secure LenFn dialect evaluation
Esvanth Apr 3, 2026
4fcced2
Fix: Guard distinct and unique counts against complex types and fix i…
Esvanth Apr 3, 2026
6029140
Style: fix python checkstyle formatting
Esvanth Apr 3, 2026
62a1e9e
Fix: Guard string-pattern metrics against complex types
Esvanth Apr 3, 2026
86773f9
Style: disable pylint import-outside-toplevel in accumulator
Esvanth Apr 3, 2026
ef1a109
Style: remove numpy from length metrics to satisfy checkstyle
Esvanth Apr 3, 2026
a4677e4
Style: collapse import to single line to satisfy isort
Esvanth Apr 3, 2026
55174f0
fix: address PR review feedback for complex type profiler support
Esvanth Apr 4, 2026
4839a9e
fix: move numpy vectorize import to top-level to satisfy isort
Esvanth Apr 4, 2026
fb2686b
style: wrap long registry import to multi-line for isort compliance
Esvanth Apr 4, 2026
f147b5d
style: add blank line after module docstring in registry.py for black…
Esvanth Apr 4, 2026
171f6bc
Merge branch 'main' into feature/issue-15627-complex-type-profiler
Esvanth Apr 4, 2026
798d391
fix: address PR feedback for complex type profiler support and fix em…
Esvanth Apr 4, 2026
58abad5
fix(quicksight): resolve column aliases to correct upstream column in…
Esvanth Apr 4, 2026
86485bb
fix(quicksight): finalize clean lineage fix
Esvanth Apr 4, 2026
c7db1bc
fix(quicksight): address PR review findings on lineage matching and e…
Esvanth Apr 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@

dev = {
"black==22.3.0",
"uvloop==0.21.0",
"uvloop==0.21.0; platform_system != 'Windows'",
"datamodel-code-generator==0.25.6",
"boto3-stubs",
"mypy-boto3-glue",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@
SourceUrl,
Uuid,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
LINEAGE_MAP,
Expand Down Expand Up @@ -350,11 +355,8 @@ def _yield_lineage_from_query(
)
for from_entity in from_entities or []:
if from_entity is not None and data_model_entity is not None:
columns = [
col.name.root for col in data_model_entity.columns
]
column_lineage = self._get_column_lineage(
from_entity, data_model_entity, columns
column_lineage = self._build_column_lineage_from_parser(
lineage_parser, from_entity, data_model_entity
)
lineage_details.columnsLineage = column_lineage
yield Either(
Expand Down Expand Up @@ -539,9 +541,11 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
for datamodel in self.data_models or []:
try:
data_model_entity = self._get_datamodel(
datamodel_id=datamodel.dataset_id
if datamodel.dataset_id is not None
else datamodel.DataSource.DataSourceId
datamodel_id=(
datamodel.dataset_id
if datamodel.dataset_id is not None
else datamodel.DataSource.DataSourceId
)
)
if isinstance(
datamodel.DataSource.data_source_resp, DataSourceRespQuery
Expand Down Expand Up @@ -574,6 +578,62 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
)
)

def _build_column_lineage_from_parser(
self,
lineage_parser: LineageParser,
from_entity: Table,
data_model_entity: DashboardDataModel,
) -> List[ColumnLineage]:
"""
Build column lineage using SQL-parsed source→target column mappings.

When the CustomSql query uses column aliases (e.g. SELECT a AS b), the
lineage parser resolves the original column name and alias. This allows
correct lineage when the data model columns carry alias names that do not
exist in the source table.

Falls back to name-based matching when no parser mappings are available.
"""
column_lineage = []
for col_pair in lineage_parser.column_lineage or []:
try:
src_col = col_pair[0]
tgt_col = col_pair[-1]

# If the parser provides parent information, ensure it matches the current from_entity
if (
hasattr(src_col, "parent")
and src_col.parent
and str(src_col.parent).split(".")[-1].lower()
!= from_entity.name.root.lower()
):
continue

source_fqn = get_column_fqn(
table_entity=from_entity, column=src_col.raw_name
)
target_fqn = self._get_data_model_column_fqn(
data_model_entity=data_model_entity,
column=tgt_col.raw_name,
)
if source_fqn and target_fqn:
column_lineage.append(
ColumnLineage(
fromColumns=[str(source_fqn)],
toColumn=str(target_fqn),
)
)
except Exception as exc:
logger.debug(
f"Failed to resolve column lineage for pair {col_pair}: {exc}"
)
if not column_lineage:
columns = [col.name.root for col in data_model_entity.columns]
column_lineage = (
self._get_column_lineage(from_entity, data_model_entity, columns) or []
)
return column_lineage

def _get_column_info(self, data_model: DescribeDataSourceResponse):
"""Get column info"""
datasource_columns = []
Expand Down Expand Up @@ -684,9 +744,9 @@ def yield_datamodel(
Each QuickSight dataset produces a separate DataModel entity,
identified by dataset_id rather than datasource_id.
"""
self.data_models: List[
DescribeDataSourceResponse
] = self._get_dashboard_datamodels(dashboard_details)
self.data_models: List[DescribeDataSourceResponse] = (
self._get_dashboard_datamodels(dashboard_details)
)
dataset_groups: dict[str, List[DescribeDataSourceResponse]] = defaultdict(list)
for data_model in self.data_models:
key = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
Table Column Count Metric definition
"""

# pylint: disable=duplicate-code

from typing import TYPE_CHECKING, Optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
Table Column Count Metric definition
"""

# pylint: disable=duplicate-code

from typing import TYPE_CHECKING, Optional
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/profiler/metrics/static/count.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
Count Metric definition
"""

# pylint: disable=duplicate-code

import traceback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
CountInSet Metric definition
"""

# pylint: disable=duplicate-code
import traceback
from typing import TYPE_CHECKING, List, Optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
"""
Distinct Count Metric definition
"""

# pylint: disable=duplicate-code

import json
from typing import TYPE_CHECKING, Optional

from sqlalchemy import column, distinct, func
from sqlalchemy import Text, column, distinct, func

if TYPE_CHECKING:
from metadata.profiler.processor.runner import PandasRunner

from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.count import CountFn
from metadata.profiler.orm.registry import is_complex_type
from metadata.utils.logger import profiler_logger

logger = profiler_logger()
Expand Down Expand Up @@ -52,6 +54,8 @@ def fn(self):
"""
Distinct Count metric for Sqlalchemy connectors
"""
if is_complex_type(self.col.type):
return func.count(distinct(func.cast(column(self.col.name), Text)))
return func.count(distinct(CountFn(column(self.col.name, self.col.type))))

def df_fn(self, dfs: Optional["PandasRunner"] = None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
"""
ILIKE Count Metric definition
"""

# pylint: disable=duplicate-code

from sqlalchemy import case, column

from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.sum import SumFn
from metadata.profiler.orm.registry import is_complex_type


class ILikeCount(StaticMetric):
Expand Down Expand Up @@ -46,6 +48,8 @@ def metric_type(self):

@_label
def fn(self):
if is_complex_type(self.col.type):
return None
if not hasattr(self, "expression"):
raise AttributeError(
"ILike Count requires an expression to be set: add_props(expression=...)(Metrics.iLikeCount)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
"""
Like Count Metric definition
"""

# pylint: disable=duplicate-code

from sqlalchemy import case, column

from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.sum import SumFn
from metadata.profiler.orm.registry import is_complex_type


class LikeCount(StaticMetric):
Expand Down Expand Up @@ -46,6 +48,8 @@ def metric_type(self):

@_label
def fn(self):
if is_complex_type(self.col.type):
return None
if not hasattr(self, "expression"):
raise AttributeError(
"Like Count requires an expression to be set: add_props(expression=...)(Metrics.likeCount)"
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/profiler/metrics/static/max.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
Max Metric definition
"""

from functools import partial
from typing import TYPE_CHECKING, Callable, Optional

Expand Down
24 changes: 19 additions & 5 deletions ingestion/src/metadata/profiler/metrics/static/max_length.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
"""
MAX_LENGTH Metric definition
"""

# pylint: disable=duplicate-code


from typing import TYPE_CHECKING, Optional

from numpy import vectorize
from sqlalchemy import column, func

from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import is_concatenable
from metadata.profiler.orm.registry import (
is_complex_type,
is_concatenable,
is_length_computable,
)
from metadata.utils.logger import profiler_logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -59,7 +65,7 @@ def _is_concatenable(self):
@_label
def fn(self):
"""sqlalchemy function"""
if self._is_concatenable():
if is_length_computable(self.col.type):
return func.max(LenFn(column(self.col.name, self.col.type)))

logger.debug(
Expand Down Expand Up @@ -100,16 +106,24 @@ def update_accumulator(
current_max: Optional[int], df: "pd.DataFrame", column
) -> Optional[int]:
"""Computes one DataFrame chunk and updates the running maximum"""
# pylint: disable=import-outside-toplevel
import pandas as pd
from numpy import vectorize

series = df[column.name].dropna()
if series.empty:
return current_max

length_vectorize_func = vectorize(len)
chunk_max = None

if is_concatenable(column.type):
max_val = length_vectorize_func(df[column.name].dropna().astype(str)).max()
max_val = length_vectorize_func(series.astype(str)).max()
if not pd.isnull(max_val):
chunk_max = int(max_val)
elif is_complex_type(column.type):
max_val = length_vectorize_func(series.astype(str)).max()
if not pd.isnull(max_val):
chunk_max = max_val
chunk_max = int(max_val)

if chunk_max is None or pd.isnull(chunk_max):
return current_max
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/profiler/metrics/static/mean.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
AVG Metric definition
"""

from functools import partial
from typing import TYPE_CHECKING, Callable, NamedTuple, Optional

Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/profiler/metrics/static/min.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
Min Metric definition
"""

from functools import partial
from typing import TYPE_CHECKING, Callable, Optional

Expand Down
24 changes: 19 additions & 5 deletions ingestion/src/metadata/profiler/metrics/static/min_length.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
"""
MIN_LENGTH Metric definition
"""

# pylint: disable=duplicate-code


from typing import TYPE_CHECKING, Optional

from numpy import vectorize
from sqlalchemy import column, func

from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import is_concatenable
from metadata.profiler.orm.registry import (
is_complex_type,
is_concatenable,
is_length_computable,
)
from metadata.utils.logger import profiler_logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -59,7 +65,7 @@ def _is_concatenable(self):
@_label
def fn(self):
"""sqlalchemy function"""
if self._is_concatenable():
if is_length_computable(self.col.type):
return func.min(LenFn(column(self.col.name, self.col.type)))

logger.debug(
Expand Down Expand Up @@ -100,16 +106,24 @@ def update_accumulator(
current_min: Optional[int], df: "pd.DataFrame", column
) -> Optional[int]:
"""Computes one DataFrame chunk and updates the running minimum"""
# pylint: disable=import-outside-toplevel
import pandas as pd
from numpy import vectorize

series = df[column.name].dropna()
if series.empty:
return current_min

length_vectorize_func = vectorize(len)
chunk_min = None

if is_concatenable(column.type):
min_val = length_vectorize_func(df[column.name].dropna().astype(str)).min()
min_val = length_vectorize_func(series.astype(str)).min()
if not pd.isnull(min_val):
chunk_min = int(min_val)
elif is_complex_type(column.type):
min_val = length_vectorize_func(series.astype(str)).min()
if not pd.isnull(min_val):
chunk_min = min_val
chunk_min = int(min_val)

if chunk_min is None or pd.isnull(chunk_min):
return current_min
Expand Down
Loading
Loading