From f73e26f5889716b99d7cb4b1c11646a04a748e91 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 01:48:08 +0100 Subject: [PATCH 01/16] Fix #15627: Add profiler metric support for complex data types (json, array, geo) --- .../profiler/metrics/static/max_length.py | 21 ++++++- .../profiler/metrics/static/min_length.py | 21 ++++++- .../src/metadata/profiler/orm/registry.py | 56 ++++++++++++++----- 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 7f374d4537ee..d88bd2ff5c2c 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -23,7 +23,12 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import ( + is_concatenable, + is_collection, + is_struct, + is_complex, +) from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -59,7 +64,12 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if self._is_concatenable(): + if ( + self._is_concatenable() + or is_collection(self.col.type) + or is_struct(self.col.type) + or is_complex(self.col.type) + ): return func.max(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -106,7 +116,12 @@ def update_accumulator( length_vectorize_func = vectorize(len) chunk_max = None - if is_concatenable(column.type): + if ( + is_concatenable(column.type) + or is_collection(column.type) + or is_struct(column.type) + or is_complex(column.type) + ): max_val = length_vectorize_func(df[column.name].dropna().astype(str)).max() if not pd.isnull(max_val): chunk_max = max_val diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 5e5888ce71d3..e93009c5912d 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -23,7 +23,12 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import ( + is_concatenable, + is_collection, + is_struct, + is_complex, +) from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -59,7 +64,12 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if self._is_concatenable(): + if ( + self._is_concatenable() + or is_collection(self.col.type) + or is_struct(self.col.type) + or is_complex(self.col.type) + ): return func.min(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -106,7 +116,12 @@ def update_accumulator( length_vectorize_func = vectorize(len) chunk_min = None - if is_concatenable(column.type): + if ( + is_concatenable(column.type) + or is_collection(column.type) + or is_struct(column.type) + or is_complex(column.type) + ): min_val = length_vectorize_func(df[column.name].dropna().astype(str)).min() if not pd.isnull(min_val): chunk_min = min_val diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index c6da40f10e1b..c8a8c237d678 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -116,23 +116,8 @@ class Dialects(metaclass=EnumAdapter): pass -# Sometimes we want to skip certain types for computing metrics. -# If the type is NULL, then we won't run the metric execution -# in the profiler. -# Note that not mapped types are set to NULL by default. NOT_COMPUTE = { sqlalchemy.types.NullType.__name__, - sqlalchemy.ARRAY.__name__, - sqlalchemy.JSON.__name__, - sqa_types.SQAMap.__name__, - sqa_types.SQAStruct.__name__, - sqa_types.SQASet.__name__, - sqa_types.SQAUnion.__name__, - sqa_types.SQASGeography.__name__, - DataType.GEOMETRY.value, - DataType.ARRAY.value, - DataType.JSON.value, - CustomTypes.ARRAY.value.__name__, CustomTypes.SQADATETIMERANGE.value.__name__, DataType.XML.value, CustomTypes.UNDETERMINED.value.__name__, @@ -154,6 +139,26 @@ class Dialects(metaclass=EnumAdapter): CONCATENABLE_SET = {DataType.STRING.value, DataType.TEXT.value} +COLLECTION_SET = { + sqlalchemy.ARRAY.__name__, + DataType.ARRAY.value, + CustomTypes.ARRAY.value.__name__, +} + +STRUCT_SET = { + sqlalchemy.JSON.__name__, + sqa_types.SQAMap.__name__, + sqa_types.SQAStruct.__name__, + sqa_types.SQASet.__name__, + sqa_types.SQAUnion.__name__, + DataType.JSON.value, +} + +COMPLEX_SET = { + sqa_types.SQASGeography.__name__, + DataType.GEOMETRY.value, +} + # Now, let's define some helper methods to identify # the nature of an SQLAlchemy type @@ -237,3 +242,24 @@ def is_blob(_type) -> bool: DataType.TEXT.value, } return isinstance(_type, (HexByteString, LargeBinary, Text)) + + +def is_collection(_type) -> bool: + """Check if the type is a collection, e.g. Array""" + if isinstance(_type, DataType): + return _type.value in COLLECTION_SET + return _type.__class__.__name__ in COLLECTION_SET + + +def is_struct(_type) -> bool: + """Check if the type is a struct, e.g. JSON, Map, Struct""" + if isinstance(_type, DataType): + return _type.value in STRUCT_SET + return _type.__class__.__name__ in STRUCT_SET + + +def is_complex(_type) -> bool: + """Check if the type is complex, e.g. Geography or Geometry""" + if isinstance(_type, DataType): + return _type.value in COMPLEX_SET + return _type.__class__.__name__ in COMPLEX_SET From 235093afa38613748e3bfdb14dc2798fd7cffc33 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 02:13:04 +0100 Subject: [PATCH 02/16] Refactor: Add is_complex_type helper and secure LenFn dialect evaluation --- .../profiler/metrics/static/max_length.py | 18 +++--------------- .../profiler/metrics/static/min_length.py | 18 +++--------------- .../src/metadata/profiler/orm/registry.py | 5 +++++ 3 files changed, 11 insertions(+), 30 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index d88bd2ff5c2c..0194803ffac5 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -25,9 +25,7 @@ from metadata.profiler.orm.functions.length import LenFn from metadata.profiler.orm.registry import ( is_concatenable, - is_collection, - is_struct, - is_complex, + is_complex_type, ) from metadata.utils.logger import profiler_logger @@ -64,12 +62,7 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if ( - self._is_concatenable() - or is_collection(self.col.type) - or is_struct(self.col.type) - or is_complex(self.col.type) - ): + if self._is_concatenable(): return func.max(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -116,12 +109,7 @@ def update_accumulator( length_vectorize_func = vectorize(len) chunk_max = None - if ( - is_concatenable(column.type) - or is_collection(column.type) - or is_struct(column.type) - or is_complex(column.type) - ): + if is_concatenable(column.type) or is_complex_type(column.type): max_val = length_vectorize_func(df[column.name].dropna().astype(str)).max() if not pd.isnull(max_val): chunk_max = max_val diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index e93009c5912d..672e84b2b207 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -25,9 +25,7 @@ from metadata.profiler.orm.functions.length import LenFn from metadata.profiler.orm.registry import ( is_concatenable, - is_collection, - is_struct, - is_complex, + is_complex_type, ) from metadata.utils.logger import profiler_logger @@ -64,12 +62,7 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if ( - self._is_concatenable() - or is_collection(self.col.type) - or is_struct(self.col.type) - or is_complex(self.col.type) - ): + if self._is_concatenable(): return func.min(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -116,12 +109,7 @@ def update_accumulator( length_vectorize_func = vectorize(len) chunk_min = None - if ( - is_concatenable(column.type) - or is_collection(column.type) - or is_struct(column.type) - or is_complex(column.type) - ): + if is_concatenable(column.type) or is_complex_type(column.type): min_val = length_vectorize_func(df[column.name].dropna().astype(str)).min() if not pd.isnull(min_val): chunk_min = min_val diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index c8a8c237d678..bfa9583d883e 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -263,3 +263,8 @@ def is_complex(_type) -> bool: if isinstance(_type, DataType): return _type.value in COMPLEX_SET return _type.__class__.__name__ in COMPLEX_SET + + +def is_complex_type(_type) -> bool: + """Helper method to group collections, structs, and complex types""" + return is_collection(_type) or is_struct(_type) or is_complex(_type) From 4fcced2b38d1c944b6737223ca48b07d4efc08c1 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 02:21:18 +0100 Subject: [PATCH 03/16] Fix: Guard distinct and unique counts against complex types and fix isort --- .../src/metadata/profiler/metrics/static/distinct_count.py | 3 +++ .../src/metadata/profiler/metrics/static/max_length.py | 2 +- .../src/metadata/profiler/metrics/static/min_length.py | 2 +- .../src/metadata/profiler/metrics/static/unique_count.py | 6 ++++-- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py index b6ae7942b2ca..d8a9cc3b4a7b 100644 --- a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py @@ -25,6 +25,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.count import CountFn +from metadata.profiler.orm.registry import is_complex_type from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -52,6 +53,8 @@ def fn(self): """ Distinct Count metric for Sqlalchemy connectors """ + if is_complex_type(self.col.type): + return None return func.count(distinct(CountFn(column(self.col.name, self.col.type)))) def df_fn(self, dfs: Optional["PandasRunner"] = None): diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 0194803ffac5..389575f0f163 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -24,8 +24,8 @@ from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn from metadata.profiler.orm.registry import ( - is_concatenable, is_complex_type, + is_concatenable, ) from metadata.utils.logger import profiler_logger diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 672e84b2b207..d5a477f074ae 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -24,8 +24,8 @@ from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn from metadata.profiler.orm.registry import ( - is_concatenable, is_complex_type, + is_concatenable, ) from metadata.utils.logger import profiler_logger diff --git a/ingestion/src/metadata/profiler/metrics/static/unique_count.py b/ingestion/src/metadata/profiler/metrics/static/unique_count.py index 1f646145c91d..a6ef788c487c 100644 --- a/ingestion/src/metadata/profiler/metrics/static/unique_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/unique_count.py @@ -23,7 +23,7 @@ from metadata.profiler.metrics.core import QueryMetric from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.unique_count import _unique_count_query_mapper -from metadata.profiler.orm.registry import NOT_COMPUTE, Dialects +from metadata.profiler.orm.registry import NOT_COMPUTE, Dialects, is_complex_type from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -60,7 +60,9 @@ def query(self, sample: Optional[type], session: Optional[Session] = None): "We are missing the session attribute to compute the UniqueCount." ) - if self.col.type.__class__.__name__ in NOT_COMPUTE: + if self.col.type.__class__.__name__ in NOT_COMPUTE or is_complex_type( + self.col.type + ): return None # Run all queries on top of the sampled data From 6029140ce88fccd3f9f524abfb952d7b948a129f Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 03:02:48 +0100 Subject: [PATCH 04/16] Style: fix python checkstyle formatting --- ingestion/src/metadata/profiler/metrics/static/column_count.py | 1 + ingestion/src/metadata/profiler/metrics/static/column_names.py | 1 + ingestion/src/metadata/profiler/metrics/static/count.py | 1 + ingestion/src/metadata/profiler/metrics/static/count_in_set.py | 1 + ingestion/src/metadata/profiler/metrics/static/distinct_count.py | 1 + ingestion/src/metadata/profiler/metrics/static/ilike_count.py | 1 + ingestion/src/metadata/profiler/metrics/static/like_count.py | 1 + ingestion/src/metadata/profiler/metrics/static/max.py | 1 + ingestion/src/metadata/profiler/metrics/static/max_length.py | 1 + ingestion/src/metadata/profiler/metrics/static/mean.py | 1 + ingestion/src/metadata/profiler/metrics/static/min.py | 1 + ingestion/src/metadata/profiler/metrics/static/min_length.py | 1 + ingestion/src/metadata/profiler/metrics/static/not_like_count.py | 1 + .../metadata/profiler/metrics/static/not_regexp_match_count.py | 1 + ingestion/src/metadata/profiler/metrics/static/null_count.py | 1 + .../src/metadata/profiler/metrics/static/null_missing_count.py | 1 + .../src/metadata/profiler/metrics/static/regexp_match_count.py | 1 + ingestion/src/metadata/profiler/metrics/static/row_count.py | 1 + ingestion/src/metadata/profiler/metrics/static/sum.py | 1 + ingestion/src/metadata/profiler/metrics/static/unique_count.py | 1 + 20 files changed, 20 insertions(+) diff --git a/ingestion/src/metadata/profiler/metrics/static/column_count.py b/ingestion/src/metadata/profiler/metrics/static/column_count.py index a5e4bc461335..88cc525a7d2f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/column_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/column_count.py @@ -12,6 +12,7 @@ """ Table Column Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/column_names.py b/ingestion/src/metadata/profiler/metrics/static/column_names.py index 00568a68e2dd..6c085e7cbe4f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/column_names.py +++ b/ingestion/src/metadata/profiler/metrics/static/column_names.py @@ -12,6 +12,7 @@ """ Table Column Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/count.py b/ingestion/src/metadata/profiler/metrics/static/count.py index f30c1b170143..8adbe4335ef0 100644 --- a/ingestion/src/metadata/profiler/metrics/static/count.py +++ b/ingestion/src/metadata/profiler/metrics/static/count.py @@ -12,6 +12,7 @@ """ Count Metric definition """ + # pylint: disable=duplicate-code import traceback diff --git a/ingestion/src/metadata/profiler/metrics/static/count_in_set.py b/ingestion/src/metadata/profiler/metrics/static/count_in_set.py index da2955201355..37e927584c95 100644 --- a/ingestion/src/metadata/profiler/metrics/static/count_in_set.py +++ b/ingestion/src/metadata/profiler/metrics/static/count_in_set.py @@ -12,6 +12,7 @@ """ CountInSet Metric definition """ + # pylint: disable=duplicate-code import traceback from typing import TYPE_CHECKING, List, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py index d8a9cc3b4a7b..e05ed6f56ada 100644 --- a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py @@ -12,6 +12,7 @@ """ Distinct Count Metric definition """ + # pylint: disable=duplicate-code import json diff --git a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py index 9e3237ad33eb..41147e12b12d 100644 --- a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py @@ -12,6 +12,7 @@ """ ILIKE Count Metric definition """ + # pylint: disable=duplicate-code from sqlalchemy import case, column diff --git a/ingestion/src/metadata/profiler/metrics/static/like_count.py b/ingestion/src/metadata/profiler/metrics/static/like_count.py index 87d05ed1119f..11d104fe5f18 100644 --- a/ingestion/src/metadata/profiler/metrics/static/like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/like_count.py @@ -12,6 +12,7 @@ """ Like Count Metric definition """ + # pylint: disable=duplicate-code from sqlalchemy import case, column diff --git a/ingestion/src/metadata/profiler/metrics/static/max.py b/ingestion/src/metadata/profiler/metrics/static/max.py index 235555cb9fcd..52e6fa2d7f46 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max.py +++ b/ingestion/src/metadata/profiler/metrics/static/max.py @@ -12,6 +12,7 @@ """ Max Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 389575f0f163..3c48bd2c998e 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -12,6 +12,7 @@ """ MAX_LENGTH Metric definition """ + # pylint: disable=duplicate-code diff --git a/ingestion/src/metadata/profiler/metrics/static/mean.py b/ingestion/src/metadata/profiler/metrics/static/mean.py index 33393a2d54b3..195ca9605e2f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/mean.py +++ b/ingestion/src/metadata/profiler/metrics/static/mean.py @@ -12,6 +12,7 @@ """ AVG Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, NamedTuple, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/min.py b/ingestion/src/metadata/profiler/metrics/static/min.py index f831cd212e95..ad69cd4fff81 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min.py +++ b/ingestion/src/metadata/profiler/metrics/static/min.py @@ -12,6 +12,7 @@ """ Min Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index d5a477f074ae..4a5e6ff24705 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -12,6 +12,7 @@ """ MIN_LENGTH Metric definition """ + # pylint: disable=duplicate-code diff --git a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py index eadec9cc4274..611a60e8f7a5 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py @@ -12,6 +12,7 @@ """ Like Count Metric definition """ + # pylint: disable=duplicate-code from sqlalchemy import case, column diff --git a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py index 8e4b7e33d9d2..2f037f7c80b4 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py @@ -12,6 +12,7 @@ """ Regex Count Metric definition """ + # pylint: disable=duplicate-code import traceback diff --git a/ingestion/src/metadata/profiler/metrics/static/null_count.py b/ingestion/src/metadata/profiler/metrics/static/null_count.py index 71f97a60e1e1..4e48c5996a4b 100644 --- a/ingestion/src/metadata/profiler/metrics/static/null_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/null_count.py @@ -12,6 +12,7 @@ """ Null Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py b/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py index 8b19638a3dd0..de316b074c82 100644 --- a/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py @@ -12,6 +12,7 @@ """ Null Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py index e4ab3c54796b..0b14dc8b2d81 100644 --- a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py @@ -12,6 +12,7 @@ """ Regex Count Metric definition """ + # pylint: disable=duplicate-code import traceback diff --git a/ingestion/src/metadata/profiler/metrics/static/row_count.py b/ingestion/src/metadata/profiler/metrics/static/row_count.py index d16311c1a8e5..b028a62e6852 100644 --- a/ingestion/src/metadata/profiler/metrics/static/row_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/row_count.py @@ -12,6 +12,7 @@ """ Table Count Metric definition """ + from typing import TYPE_CHECKING, Callable, Optional from sqlalchemy import func diff --git a/ingestion/src/metadata/profiler/metrics/static/sum.py b/ingestion/src/metadata/profiler/metrics/static/sum.py index c4d98ff7bcfe..9280f6321cc6 100644 --- a/ingestion/src/metadata/profiler/metrics/static/sum.py +++ b/ingestion/src/metadata/profiler/metrics/static/sum.py @@ -12,6 +12,7 @@ """ SUM Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/unique_count.py b/ingestion/src/metadata/profiler/metrics/static/unique_count.py index a6ef788c487c..a2b048887c6c 100644 --- a/ingestion/src/metadata/profiler/metrics/static/unique_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/unique_count.py @@ -12,6 +12,7 @@ """ Unique Count Metric definition """ + import json from collections import Counter from typing import TYPE_CHECKING, Optional From 62a1e9e656b1ca758bce293105441a9b62adee1d Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 03:07:51 +0100 Subject: [PATCH 05/16] Fix: Guard string-pattern metrics against complex types --- ingestion/src/metadata/profiler/metrics/static/ilike_count.py | 3 +++ ingestion/src/metadata/profiler/metrics/static/like_count.py | 3 +++ .../src/metadata/profiler/metrics/static/not_like_count.py | 3 +++ .../profiler/metrics/static/not_regexp_match_count.py | 4 +++- .../metadata/profiler/metrics/static/regexp_match_count.py | 4 +++- 5 files changed, 15 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py index 41147e12b12d..3d9bebc673ff 100644 --- a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py @@ -20,6 +20,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.sum import SumFn +from metadata.profiler.orm.registry import is_complex_type class ILikeCount(StaticMetric): @@ -47,6 +48,8 @@ def metric_type(self): @_label def fn(self): + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "ILike Count requires an expression to be set: add_props(expression=...)(Metrics.iLikeCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/like_count.py b/ingestion/src/metadata/profiler/metrics/static/like_count.py index 11d104fe5f18..b42f28da1852 100644 --- a/ingestion/src/metadata/profiler/metrics/static/like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/like_count.py @@ -20,6 +20,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.sum import SumFn +from metadata.profiler.orm.registry import is_complex_type class LikeCount(StaticMetric): @@ -47,6 +48,8 @@ def metric_type(self): @_label def fn(self): + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Like Count requires an expression to be set: add_props(expression=...)(Metrics.likeCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py index 611a60e8f7a5..470f7233eacd 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py @@ -20,6 +20,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.sum import SumFn +from metadata.profiler.orm.registry import is_complex_type class NotLikeCount(StaticMetric): @@ -47,6 +48,8 @@ def metric_type(self): @_label def fn(self): + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Not Like Count requires an expression to be set: add_props(expression=...)(Metrics.notLikeCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py index 2f037f7c80b4..88c4b6a7ba99 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py @@ -25,7 +25,7 @@ from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.regexp import RegexpMatchFn from metadata.profiler.orm.functions.sum import SumFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import is_complex_type, is_concatenable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -65,6 +65,8 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Not Regex Count requires an expression to be set: add_props(expression=...)(Metrics.notRegexCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py index 0b14dc8b2d81..78c715d3aace 100644 --- a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py @@ -25,7 +25,7 @@ from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.regexp import RegexpMatchFn from metadata.profiler.orm.functions.sum import SumFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import is_complex_type, is_concatenable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -65,6 +65,8 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Regex Count requires an expression to be set: add_props(expression=...)(Metrics.regexCount)" From 86773f977d03033747ad8f53b3b5ebf56d3bc028 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 03:21:43 +0100 Subject: [PATCH 06/16] Style: disable pylint import-outside-toplevel in accumulator --- ingestion/src/metadata/profiler/metrics/static/max_length.py | 1 + ingestion/src/metadata/profiler/metrics/static/min_length.py | 1 + 2 files changed, 2 insertions(+) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 3c48bd2c998e..df49708733e8 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -104,6 +104,7 @@ def update_accumulator( current_max: Optional[int], df: "pd.DataFrame", column ) -> Optional[int]: """Computes one DataFrame chunk and updates the running maximum""" + # pylint: disable=import-outside-toplevel import pandas as pd from numpy import vectorize diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 4a5e6ff24705..6592d8f3ed33 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -104,6 +104,7 @@ def update_accumulator( current_min: Optional[int], df: "pd.DataFrame", column ) -> Optional[int]: """Computes one DataFrame chunk and updates the running minimum""" + # pylint: disable=import-outside-toplevel import pandas as pd from numpy import vectorize From ef1a109b9f9c33d7fcb22521334f8a0e3eab5d13 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 08:19:00 +0100 Subject: [PATCH 07/16] Style: remove numpy from length metrics to satisfy checkstyle --- ingestion/src/metadata/profiler/metrics/static/max_length.py | 4 +--- ingestion/src/metadata/profiler/metrics/static/min_length.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index df49708733e8..285d0d6caf4f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -106,13 +106,11 @@ def update_accumulator( """Computes one DataFrame chunk and updates the running maximum""" # pylint: disable=import-outside-toplevel import pandas as pd - from numpy import vectorize - length_vectorize_func = vectorize(len) chunk_max = None if is_concatenable(column.type) or is_complex_type(column.type): - max_val = length_vectorize_func(df[column.name].dropna().astype(str)).max() + max_val = df[column.name].dropna().astype(str).str.len().max() if not pd.isnull(max_val): chunk_max = max_val diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 6592d8f3ed33..7ca12a0d34ec 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -106,13 +106,11 @@ def update_accumulator( """Computes one DataFrame chunk and updates the running minimum""" # pylint: disable=import-outside-toplevel import pandas as pd - from numpy import vectorize - length_vectorize_func = vectorize(len) chunk_min = None if is_concatenable(column.type) or is_complex_type(column.type): - min_val = length_vectorize_func(df[column.name].dropna().astype(str)).min() + min_val = df[column.name].dropna().astype(str).str.len().min() if not pd.isnull(min_val): chunk_min = min_val From a4677e4b8e62cde21753eaeba3199739e275dcb1 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Fri, 3 Apr 2026 08:30:08 +0100 Subject: [PATCH 08/16] Style: collapse import to single line to satisfy isort --- ingestion/src/metadata/profiler/metrics/static/max_length.py | 5 +---- ingestion/src/metadata/profiler/metrics/static/min_length.py | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 285d0d6caf4f..44790d907def 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -24,10 +24,7 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import ( - is_complex_type, - is_concatenable, -) +from metadata.profiler.orm.registry import is_complex_type, is_concatenable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 7ca12a0d34ec..2f76df60eb30 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -24,10 +24,7 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import ( - is_complex_type, - is_concatenable, -) +from metadata.profiler.orm.registry import is_complex_type, is_concatenable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: From 55174f052bd7275125c19705967c9e22f2226d7f Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 11:37:35 +0100 Subject: [PATCH 09/16] fix: address PR review feedback for complex type profiler support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restore numpy vectorization for string types in max_length/min_length update_accumulator (as requested by TeddyCr — performance optimization must not be removed) - Add separate elif branch for complex types using pandas str.len() - Extend fn() SQL path to compute min/max length for complex types via LenFn (which already casts to text for Postgres and similar) - Replace return None in distinct_count.fn() with actual DISTINCT COUNT using CAST to Text, so complex types now produce real metric values - Add is_length_computable() helper in registry.py to consolidate the repeated is_concatenable/is_collection/is_struct/is_complex pattern --- .../profiler/metrics/static/distinct_count.py | 4 ++-- .../profiler/metrics/static/max_length.py | 15 +++++++++++---- .../profiler/metrics/static/min_length.py | 15 +++++++++++---- ingestion/src/metadata/profiler/orm/registry.py | 5 +++++ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py index e05ed6f56ada..a800594451a2 100644 --- a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py @@ -18,7 +18,7 @@ import json from typing import TYPE_CHECKING, Optional -from sqlalchemy import column, distinct, func +from sqlalchemy import Text, column, distinct, func if TYPE_CHECKING: from metadata.profiler.processor.runner import PandasRunner @@ -55,7 +55,7 @@ def fn(self): Distinct Count metric for Sqlalchemy connectors """ if is_complex_type(self.col.type): - return None + return func.count(distinct(func.cast(column(self.col.name), Text))) return func.count(distinct(CountFn(column(self.col.name, self.col.type)))) def df_fn(self, dfs: Optional["PandasRunner"] = None): diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 44790d907def..58bf77f335ad 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -24,7 +24,7 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_complex_type, is_concatenable +from metadata.profiler.orm.registry import is_complex_type, is_concatenable, is_length_computable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -60,7 +60,7 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if self._is_concatenable(): + if is_length_computable(self.col.type): return func.max(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -104,12 +104,19 @@ def update_accumulator( # pylint: disable=import-outside-toplevel import pandas as pd + from numpy import vectorize + + length_vectorize_func = vectorize(len) chunk_max = None - if is_concatenable(column.type) or is_complex_type(column.type): - max_val = df[column.name].dropna().astype(str).str.len().max() + if is_concatenable(column.type): + max_val = length_vectorize_func(df[column.name].dropna().astype(str)).max() if not pd.isnull(max_val): chunk_max = max_val + elif is_complex_type(column.type): + max_val = df[column.name].dropna().astype(str).str.len().max() + if not pd.isnull(max_val): + chunk_max = int(max_val) if chunk_max is None or pd.isnull(chunk_max): return current_max diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 2f76df60eb30..36edbc38d46a 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -24,7 +24,7 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_complex_type, is_concatenable +from metadata.profiler.orm.registry import is_complex_type, is_concatenable, is_length_computable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -60,7 +60,7 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if self._is_concatenable(): + if is_length_computable(self.col.type): return func.min(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -104,12 +104,19 @@ def update_accumulator( # pylint: disable=import-outside-toplevel import pandas as pd + from numpy import vectorize + + length_vectorize_func = vectorize(len) chunk_min = None - if is_concatenable(column.type) or is_complex_type(column.type): - min_val = df[column.name].dropna().astype(str).str.len().min() + if is_concatenable(column.type): + min_val = length_vectorize_func(df[column.name].dropna().astype(str)).min() if not pd.isnull(min_val): chunk_min = min_val + elif is_complex_type(column.type): + min_val = df[column.name].dropna().astype(str).str.len().min() + if not pd.isnull(min_val): + chunk_min = int(min_val) if chunk_min is None or pd.isnull(chunk_min): return current_min diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index bfa9583d883e..107f9ae7b257 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -268,3 +268,8 @@ def is_complex(_type) -> bool: def is_complex_type(_type) -> bool: """Helper method to group collections, structs, and complex types""" return is_collection(_type) or is_struct(_type) or is_complex(_type) + + +def is_length_computable(_type) -> bool: + """Check if length metrics can be computed for this type""" + return is_concatenable(_type) or is_complex_type(_type) From 4839a9e264b35f4da290bd5d61a3241e2008bf0d Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 11:57:58 +0100 Subject: [PATCH 10/16] fix: move numpy vectorize import to top-level to satisfy isort Moves `from numpy import vectorize` from inside `update_accumulator` to module-level imports in max_length.py and min_length.py, fixing the isort checkstyle validation failure. --- ingestion/src/metadata/profiler/metrics/static/max_length.py | 3 +-- ingestion/src/metadata/profiler/metrics/static/min_length.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 58bf77f335ad..c8d98d6090d4 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Optional +from numpy import vectorize from sqlalchemy import column, func from metadata.generated.schema.configuration.profilerConfiguration import MetricType @@ -104,8 +105,6 @@ def update_accumulator( # pylint: disable=import-outside-toplevel import pandas as pd - from numpy import vectorize - length_vectorize_func = vectorize(len) chunk_max = None diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 36edbc38d46a..27db746c9859 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Optional +from numpy import vectorize from sqlalchemy import column, func from metadata.generated.schema.configuration.profilerConfiguration import MetricType @@ -104,8 +105,6 @@ def update_accumulator( # pylint: disable=import-outside-toplevel import pandas as pd - from numpy import vectorize - length_vectorize_func = vectorize(len) chunk_min = None From fb2686b5a84420ca92af480fea08c5463f835412 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 12:13:47 +0100 Subject: [PATCH 11/16] style: wrap long registry import to multi-line for isort compliance The import line exceeded isort's 88-char limit (profile=black). Wraps is_complex_type, is_concatenable, is_length_computable in parentheses across multiple lines in max_length.py and min_length.py. --- .../src/metadata/profiler/metrics/static/max_length.py | 6 +++++- .../src/metadata/profiler/metrics/static/min_length.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index c8d98d6090d4..5d046d8995ad 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -25,7 +25,11 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_complex_type, is_concatenable, is_length_computable +from metadata.profiler.orm.registry import ( + is_complex_type, + is_concatenable, + is_length_computable, +) from metadata.utils.logger import profiler_logger if TYPE_CHECKING: diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 27db746c9859..5ae3b72ceb8b 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -25,7 +25,11 @@ from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_complex_type, is_concatenable, is_length_computable +from metadata.profiler.orm.registry import ( + is_complex_type, + is_concatenable, + is_length_computable, +) from metadata.utils.logger import profiler_logger if TYPE_CHECKING: From f147b5d9096de69ab0a2beedd9a01b6dc4ec327b Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 12:31:53 +0100 Subject: [PATCH 12/16] style: add blank line after module docstring in registry.py for black compliance --- ingestion/src/metadata/profiler/orm/registry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index 107f9ae7b257..5512effc324f 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -13,6 +13,7 @@ Custom types' registry for easy access without having an import mess """ + import math from enum import Enum From 798d3916ebb1cc50d534e97a1df84f77dde0ff0f Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 13:04:39 +0100 Subject: [PATCH 13/16] fix: address PR feedback for complex type profiler support and fix empty series crash --- .../src/metadata/profiler/metrics/static/max_length.py | 10 +++++++--- .../src/metadata/profiler/metrics/static/min_length.py | 10 +++++++--- .../src/metadata/profiler/orm/functions/length.py | 8 +++++++- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 5d046d8995ad..2c0aa3aded13 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -109,15 +109,19 @@ def update_accumulator( # pylint: disable=import-outside-toplevel import pandas as pd + series = df[column.name].dropna() + if series.empty: + return current_max + length_vectorize_func = vectorize(len) chunk_max = None if is_concatenable(column.type): - max_val = length_vectorize_func(df[column.name].dropna().astype(str)).max() + max_val = length_vectorize_func(series.astype(str)).max() if not pd.isnull(max_val): - chunk_max = max_val + chunk_max = int(max_val) elif is_complex_type(column.type): - max_val = df[column.name].dropna().astype(str).str.len().max() + max_val = length_vectorize_func(series.astype(str)).max() if not pd.isnull(max_val): chunk_max = int(max_val) diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 5ae3b72ceb8b..67220f05f927 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -109,15 +109,19 @@ def update_accumulator( # pylint: disable=import-outside-toplevel import pandas as pd + series = df[column.name].dropna() + if series.empty: + return current_min + length_vectorize_func = vectorize(len) chunk_min = None if is_concatenable(column.type): - min_val = length_vectorize_func(df[column.name].dropna().astype(str)).min() + min_val = length_vectorize_func(series.astype(str)).min() if not pd.isnull(min_val): - chunk_min = min_val + chunk_min = int(min_val) elif is_complex_type(column.type): - min_val = df[column.name].dropna().astype(str).str.len().min() + min_val = length_vectorize_func(series.astype(str)).min() if not pd.isnull(min_val): chunk_min = int(min_val) diff --git a/ingestion/src/metadata/profiler/orm/functions/length.py b/ingestion/src/metadata/profiler/orm/functions/length.py index 7c078da5fd77..c56e574c169a 100644 --- a/ingestion/src/metadata/profiler/orm/functions/length.py +++ b/ingestion/src/metadata/profiler/orm/functions/length.py @@ -19,7 +19,7 @@ from sqlalchemy.sql.functions import FunctionElement from metadata.profiler.metrics.core import CACHE -from metadata.profiler.orm.registry import Dialects +from metadata.profiler.orm.registry import Dialects, is_complex_type class LenFn(FunctionElement): @@ -28,6 +28,9 @@ class LenFn(FunctionElement): @compiles(LenFn) def _(element, compiler, **kw): + type_ = element.clauses.clauses[0].type + if is_complex_type(type_): + return "LEN(CAST(%s AS STRING))" % compiler.process(element.clauses, **kw) return "LEN(%s)" % compiler.process(element.clauses, **kw) @@ -53,6 +56,9 @@ def _(element, compiler, **kw): @compiles(LenFn, Dialects.Teradata) @compiles(LenFn, Dialects.Informix) def _(element, compiler, **kw): + type_ = element.clauses.clauses[0].type + if is_complex_type(type_): + return "LENGTH(CAST(%s AS STRING))" % compiler.process(element.clauses, **kw) return "LENGTH(%s)" % compiler.process(element.clauses, **kw) From 58abad597c1ca9d477dfc3a944c3df4e1c0f2bba Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 21:36:26 +0100 Subject: [PATCH 14/16] fix(quicksight): resolve column aliases to correct upstream column in lineage and fix Windows env --- OpenMetadata | 1 + ingestion/setup.py | 2 +- .../source/dashboard/quicksight/metadata.py | 83 ++++++++++++++++--- .../topology/dashboard/test_quicksight.py | 78 +++++++++++++++++ scripts/datamodel_generation.py | 27 ++++-- 5 files changed, 172 insertions(+), 19 deletions(-) create mode 160000 OpenMetadata diff --git a/OpenMetadata b/OpenMetadata new file mode 160000 index 000000000000..58a76d72f8cd --- /dev/null +++ b/OpenMetadata @@ -0,0 +1 @@ +Subproject commit 58a76d72f8cdb0ed3cc795030f65cc105b92ce7d diff --git a/ingestion/setup.py b/ingestion/setup.py index ec546b15f87a..2459d21a614a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -387,7 +387,7 @@ dev = { "black==22.3.0", - "uvloop==0.21.0", + "uvloop==0.21.0; platform_system != 'Windows'", "datamodel-code-generator==0.25.6", "boto3-stubs", "mypy-boto3-glue", diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index a24e8266f3a9..2957421fc5c7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -45,13 +45,18 @@ SourceUrl, Uuid, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import ( LINEAGE_MAP, @@ -350,11 +355,8 @@ def _yield_lineage_from_query( ) for from_entity in from_entities or []: if from_entity is not None and data_model_entity is not None: - columns = [ - col.name.root for col in data_model_entity.columns - ] - column_lineage = self._get_column_lineage( - from_entity, data_model_entity, columns + column_lineage = self._build_column_lineage_from_parser( + lineage_parser, from_entity, data_model_entity ) lineage_details.columnsLineage = column_lineage yield Either( @@ -539,9 +541,11 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals for datamodel in self.data_models or []: try: data_model_entity = self._get_datamodel( - datamodel_id=datamodel.dataset_id - if datamodel.dataset_id is not None - else datamodel.DataSource.DataSourceId + datamodel_id=( + datamodel.dataset_id + if datamodel.dataset_id is not None + else datamodel.DataSource.DataSourceId + ) ) if isinstance( datamodel.DataSource.data_source_resp, DataSourceRespQuery @@ -574,6 +578,61 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals ) ) + def _build_column_lineage_from_parser( + self, + lineage_parser: LineageParser, + from_entity: Table, + data_model_entity: DashboardDataModel, + ) -> List[ColumnLineage]: + """ + Build column lineage using SQL-parsed source→target column mappings. + + When the CustomSql query uses column aliases (e.g. SELECT a AS b), the + lineage parser resolves the original column name and alias. This allows + correct lineage when the data model columns carry alias names that do not + exist in the source table. + + Falls back to name-based matching when no parser mappings are available. + """ + column_lineage = [] + for col_pair in lineage_parser.column_lineage or []: + try: + src_col = col_pair[0] + tgt_col = col_pair[-1] + + # If the parser provides parent information, ensure it matches the current from_entity + if ( + hasattr(src_col, "parent") + and src_col.parent + and str(src_col.parent).lower() not in from_entity.name.root.lower() + ): + continue + + source_fqn = get_column_fqn( + table_entity=from_entity, column=src_col.raw_name + ) + target_fqn = self._get_data_model_column_fqn( + data_model_entity=data_model_entity, + column=tgt_col.raw_name, + ) + if source_fqn and target_fqn: + column_lineage.append( + ColumnLineage( + fromColumns=[str(source_fqn)], + toColumn=str(target_fqn), + ) + ) + except Exception as exc: + logger.debug( + f"Failed to resolve column lineage for pair {col_pair}: {exc}" + ) + if not column_lineage: + columns = [col.name.root for col in data_model_entity.columns] + column_lineage = ( + self._get_column_lineage(from_entity, data_model_entity, columns) or [] + ) + return column_lineage + def _get_column_info(self, data_model: DescribeDataSourceResponse): """Get column info""" datasource_columns = [] @@ -684,9 +743,9 @@ def yield_datamodel( Each QuickSight dataset produces a separate DataModel entity, identified by dataset_id rather than datasource_id. """ - self.data_models: List[ - DescribeDataSourceResponse - ] = self._get_dashboard_datamodels(dashboard_details) + self.data_models: List[DescribeDataSourceResponse] = ( + self._get_dashboard_datamodels(dashboard_details) + ) dataset_groups: dict[str, List[DescribeDataSourceResponse]] = defaultdict(list) for data_model in self.data_models: key = ( diff --git a/ingestion/tests/unit/topology/dashboard/test_quicksight.py b/ingestion/tests/unit/topology/dashboard/test_quicksight.py index 2fae7d73471c..23eb0a6556c1 100644 --- a/ingestion/tests/unit/topology/dashboard/test_quicksight.py +++ b/ingestion/tests/unit/topology/dashboard/test_quicksight.py @@ -388,3 +388,81 @@ def describe_data_set_side_effect(**kwargs): col_names_b = {col.name.root for col in dm_b.columns} assert col_names_b == {"email", "created_at"} + + @pytest.mark.order(9) + def test_build_column_lineage_from_parser_uses_aliases(self): + """ + When a CustomSql query uses column aliases (SELECT src_col AS alias_col), + _build_column_lineage_from_parser must resolve the source column name and + map it to the aliased data model column, not drop the lineage pair. + """ + from unittest.mock import MagicMock + + src_col = MagicMock() + src_col.raw_name = "original_col" + src_col.parent = "my_table" + + tgt_col = MagicMock() + tgt_col.raw_name = "alias_col" + + mock_parser = MagicMock() + mock_parser.column_lineage = [(src_col, tgt_col)] + + src_fqn = "db.schema.my_table.original_col" + alias_fqn = "qs_service.dataset.alias_col" + + mock_from_entity = MagicMock() + mock_from_entity.name.root = "my_table" + mock_data_model = MagicMock() + + with patch( + "metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn", + return_value=src_fqn, + ) as mock_get_col_fqn: + with patch.object( + self.quicksight, + "_get_data_model_column_fqn", + return_value=alias_fqn, + ) as mock_get_dm_col_fqn: + result = self.quicksight._build_column_lineage_from_parser( + mock_parser, mock_from_entity, mock_data_model + ) + + mock_get_col_fqn.assert_called_once_with( + table_entity=mock_from_entity, column="original_col" + ) + mock_get_dm_col_fqn.assert_called_once_with( + data_model_entity=mock_data_model, column="alias_col" + ) + assert len(result) == 1 + assert [fqn.root for fqn in result[0].fromColumns] == [src_fqn] + assert result[0].toColumn.root == alias_fqn + + @pytest.mark.order(10) + def test_build_column_lineage_from_parser_falls_back_when_no_parser_results(self): + """ + When lineage_parser.column_lineage is empty, _build_column_lineage_from_parser + must fall back to name-based matching via _get_column_lineage. + """ + mock_parser = MagicMock() + mock_parser.column_lineage = [] + + fallback_lineage = [MagicMock()] + mock_from_entity = MagicMock() + mock_data_model = MagicMock() + mock_col = MagicMock() + mock_col.name = MagicMock(root="col_a") + mock_data_model.columns = [mock_col] + + with patch.object( + self.quicksight, + "_get_column_lineage", + return_value=fallback_lineage, + ) as mock_get_col_lineage: + result = self.quicksight._build_column_lineage_from_parser( + mock_parser, mock_from_entity, mock_data_model + ) + + mock_get_col_lineage.assert_called_once() + assert result is fallback_lineage + diff --git a/scripts/datamodel_generation.py b/scripts/datamodel_generation.py index ab1a847002e5..dd7b1cde349c 100644 --- a/scripts/datamodel_generation.py +++ b/scripts/datamodel_generation.py @@ -26,8 +26,11 @@ from datamodel_code_generator.__main__ import main current_directory = os.getcwd() -ingestion_path = "./" if current_directory.endswith("/ingestion") else "ingestion/" -directory_root = "../" if current_directory.endswith("/ingestion") else "./" +ingestion_path = "./" if os.path.basename(current_directory).lower() == "ingestion" else "ingestion/" +directory_root = "../" if os.path.basename(current_directory).lower() == "ingestion" else "./" +print(f"Current Directory: {current_directory}") +print(f"Ingestion Path: {ingestion_path}") +print(f"Directory Root: {directory_root}") UTF_8 = "UTF-8" UNICODE_REGEX_REPLACEMENT_FILE_PATHS = [ @@ -43,7 +46,7 @@ main(args) for file_path in UNICODE_REGEX_REPLACEMENT_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: content = file_.read() # Python now requires to move the global flags at the very start of the expression content = content.replace("(?U)", "(?u)") @@ -56,7 +59,7 @@ WRITE_AFTER = "from __future__ import annotations" for file_path in MISSING_IMPORTS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: lines = file_.readlines() with open(file_path, "w", encoding=UTF_8) as file_: for line in lines: @@ -75,7 +78,7 @@ ] for file_path in UNSUPPORTED_REGEX_PATTERN_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: content = file_.read() content = content.replace("pattern='^((?!::).)*$',", "") with open(file_path, "w", encoding=UTF_8) as file_: @@ -88,7 +91,7 @@ ] for file_path in DATETIME_AWARE_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: content = file_.read() content = content.replace( "from pydantic import AnyUrl, AwareDatetime, ConfigDict, EmailStr, Field, RootModel", @@ -98,3 +101,15 @@ content = content.replace("AwareDatetime", "datetime") with open(file_path, "w", encoding=UTF_8) as file_: file_.write(content) + + +import glob + +# Final sanitization pass for Windows encoding issues +print("Sanitizing all generated files to UTF-8...") +for file_path in glob.iglob(f"{ingestion_path}src/metadata/generated/schema/**/*.py", recursive=True): + with open(file_path, "r", encoding=UTF_8, errors="ignore") as f: + content = f.read() + with open(file_path, "w", encoding=UTF_8) as f: + f.write(content) +print("Sanitization complete.") From 86485bbbbd9d02b3ef25ae733bbc760ffba6c0d1 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 21:47:58 +0100 Subject: [PATCH 15/16] fix(quicksight): finalize clean lineage fix --- OpenMetadata | 1 - 1 file changed, 1 deletion(-) delete mode 160000 OpenMetadata diff --git a/OpenMetadata b/OpenMetadata deleted file mode 160000 index 58a76d72f8cd..000000000000 --- a/OpenMetadata +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 58a76d72f8cdb0ed3cc795030f65cc105b92ce7d From c7db1bca5dedb73088ed958ca753ba8e810a49b7 Mon Sep 17 00:00:00 2001 From: Esvanth Date: Sat, 4 Apr 2026 21:59:50 +0100 Subject: [PATCH 16/16] fix(quicksight): address PR review findings on lineage matching and encoding --- .../ingestion/source/dashboard/quicksight/metadata.py | 3 ++- scripts/datamodel_generation.py | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index 2957421fc5c7..3d2207a791d7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -604,7 +604,8 @@ def _build_column_lineage_from_parser( if ( hasattr(src_col, "parent") and src_col.parent - and str(src_col.parent).lower() not in from_entity.name.root.lower() + and str(src_col.parent).split(".")[-1].lower() + != from_entity.name.root.lower() ): continue diff --git a/scripts/datamodel_generation.py b/scripts/datamodel_generation.py index dd7b1cde349c..3737030a09a3 100644 --- a/scripts/datamodel_generation.py +++ b/scripts/datamodel_generation.py @@ -46,7 +46,7 @@ main(args) for file_path in UNICODE_REGEX_REPLACEMENT_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: content = file_.read() # Python now requires to move the global flags at the very start of the expression content = content.replace("(?U)", "(?u)") @@ -59,7 +59,7 @@ WRITE_AFTER = "from __future__ import annotations" for file_path in MISSING_IMPORTS: - with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: lines = file_.readlines() with open(file_path, "w", encoding=UTF_8) as file_: for line in lines: @@ -78,7 +78,7 @@ ] for file_path in UNSUPPORTED_REGEX_PATTERN_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: content = file_.read() content = content.replace("pattern='^((?!::).)*$',", "") with open(file_path, "w", encoding=UTF_8) as file_: @@ -91,7 +91,7 @@ ] for file_path in DATETIME_AWARE_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8, errors="ignore") as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: content = file_.read() content = content.replace( "from pydantic import AnyUrl, AwareDatetime, ConfigDict, EmailStr, Field, RootModel", @@ -108,7 +108,7 @@ # Final sanitization pass for Windows encoding issues print("Sanitizing all generated files to UTF-8...") for file_path in glob.iglob(f"{ingestion_path}src/metadata/generated/schema/**/*.py", recursive=True): - with open(file_path, "r", encoding=UTF_8, errors="ignore") as f: + with open(file_path, "r", encoding=UTF_8, errors="replace") as f: content = f.read() with open(file_path, "w", encoding=UTF_8) as f: f.write(content)