diff --git a/ingestion/setup.py b/ingestion/setup.py index ec546b15f87a..2459d21a614a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -387,7 +387,7 @@ dev = { "black==22.3.0", - "uvloop==0.21.0", + "uvloop==0.21.0; platform_system != 'Windows'", "datamodel-code-generator==0.25.6", "boto3-stubs", "mypy-boto3-glue", diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index a24e8266f3a9..3d2207a791d7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -45,13 +45,18 @@ SourceUrl, Uuid, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import ( LINEAGE_MAP, @@ -350,11 +355,8 @@ def _yield_lineage_from_query( ) for from_entity in from_entities or []: if from_entity is not None and data_model_entity is not None: - columns = [ - col.name.root for col in data_model_entity.columns - ] - column_lineage = self._get_column_lineage( - from_entity, data_model_entity, columns + column_lineage = self._build_column_lineage_from_parser( + lineage_parser, from_entity, data_model_entity ) lineage_details.columnsLineage = column_lineage yield Either( @@ -539,9 +541,11 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals for datamodel in self.data_models or []: try: data_model_entity = self._get_datamodel( - datamodel_id=datamodel.dataset_id - if datamodel.dataset_id is not None - else datamodel.DataSource.DataSourceId + datamodel_id=( + datamodel.dataset_id + if datamodel.dataset_id is not None + else datamodel.DataSource.DataSourceId + ) ) if isinstance( datamodel.DataSource.data_source_resp, DataSourceRespQuery @@ -574,6 +578,62 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals ) ) + def _build_column_lineage_from_parser( + self, + lineage_parser: LineageParser, + from_entity: Table, + data_model_entity: DashboardDataModel, + ) -> List[ColumnLineage]: + """ + Build column lineage using SQL-parsed source→target column mappings. + + When the CustomSql query uses column aliases (e.g. SELECT a AS b), the + lineage parser resolves the original column name and alias. This allows + correct lineage when the data model columns carry alias names that do not + exist in the source table. + + Falls back to name-based matching when no parser mappings are available. + """ + column_lineage = [] + for col_pair in lineage_parser.column_lineage or []: + try: + src_col = col_pair[0] + tgt_col = col_pair[-1] + + # If the parser provides parent information, ensure it matches the current from_entity + if ( + hasattr(src_col, "parent") + and src_col.parent + and str(src_col.parent).split(".")[-1].lower() + != from_entity.name.root.lower() + ): + continue + + source_fqn = get_column_fqn( + table_entity=from_entity, column=src_col.raw_name + ) + target_fqn = self._get_data_model_column_fqn( + data_model_entity=data_model_entity, + column=tgt_col.raw_name, + ) + if source_fqn and target_fqn: + column_lineage.append( + ColumnLineage( + fromColumns=[str(source_fqn)], + toColumn=str(target_fqn), + ) + ) + except Exception as exc: + logger.debug( + f"Failed to resolve column lineage for pair {col_pair}: {exc}" + ) + if not column_lineage: + columns = [col.name.root for col in data_model_entity.columns] + column_lineage = ( + self._get_column_lineage(from_entity, data_model_entity, columns) or [] + ) + return column_lineage + def _get_column_info(self, data_model: DescribeDataSourceResponse): """Get column info""" datasource_columns = [] @@ -684,9 +744,9 @@ def yield_datamodel( Each QuickSight dataset produces a separate DataModel entity, identified by dataset_id rather than datasource_id. """ - self.data_models: List[ - DescribeDataSourceResponse - ] = self._get_dashboard_datamodels(dashboard_details) + self.data_models: List[DescribeDataSourceResponse] = ( + self._get_dashboard_datamodels(dashboard_details) + ) dataset_groups: dict[str, List[DescribeDataSourceResponse]] = defaultdict(list) for data_model in self.data_models: key = ( diff --git a/ingestion/src/metadata/profiler/metrics/static/column_count.py b/ingestion/src/metadata/profiler/metrics/static/column_count.py index a5e4bc461335..88cc525a7d2f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/column_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/column_count.py @@ -12,6 +12,7 @@ """ Table Column Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/column_names.py b/ingestion/src/metadata/profiler/metrics/static/column_names.py index 00568a68e2dd..6c085e7cbe4f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/column_names.py +++ b/ingestion/src/metadata/profiler/metrics/static/column_names.py @@ -12,6 +12,7 @@ """ Table Column Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/count.py b/ingestion/src/metadata/profiler/metrics/static/count.py index f30c1b170143..8adbe4335ef0 100644 --- a/ingestion/src/metadata/profiler/metrics/static/count.py +++ b/ingestion/src/metadata/profiler/metrics/static/count.py @@ -12,6 +12,7 @@ """ Count Metric definition """ + # pylint: disable=duplicate-code import traceback diff --git a/ingestion/src/metadata/profiler/metrics/static/count_in_set.py b/ingestion/src/metadata/profiler/metrics/static/count_in_set.py index da2955201355..37e927584c95 100644 --- a/ingestion/src/metadata/profiler/metrics/static/count_in_set.py +++ b/ingestion/src/metadata/profiler/metrics/static/count_in_set.py @@ -12,6 +12,7 @@ """ CountInSet Metric definition """ + # pylint: disable=duplicate-code import traceback from typing import TYPE_CHECKING, List, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py index b6ae7942b2ca..a800594451a2 100644 --- a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py @@ -12,12 +12,13 @@ """ Distinct Count Metric definition """ + # pylint: disable=duplicate-code import json from typing import TYPE_CHECKING, Optional -from sqlalchemy import column, distinct, func +from sqlalchemy import Text, column, distinct, func if TYPE_CHECKING: from metadata.profiler.processor.runner import PandasRunner @@ -25,6 +26,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.count import CountFn +from metadata.profiler.orm.registry import is_complex_type from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -52,6 +54,8 @@ def fn(self): """ Distinct Count metric for Sqlalchemy connectors """ + if is_complex_type(self.col.type): + return func.count(distinct(func.cast(column(self.col.name), Text))) return func.count(distinct(CountFn(column(self.col.name, self.col.type)))) def df_fn(self, dfs: Optional["PandasRunner"] = None): diff --git a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py index 9e3237ad33eb..3d9bebc673ff 100644 --- a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py @@ -12,6 +12,7 @@ """ ILIKE Count Metric definition """ + # pylint: disable=duplicate-code from sqlalchemy import case, column @@ -19,6 +20,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.sum import SumFn +from metadata.profiler.orm.registry import is_complex_type class ILikeCount(StaticMetric): @@ -46,6 +48,8 @@ def metric_type(self): @_label def fn(self): + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "ILike Count requires an expression to be set: add_props(expression=...)(Metrics.iLikeCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/like_count.py b/ingestion/src/metadata/profiler/metrics/static/like_count.py index 87d05ed1119f..b42f28da1852 100644 --- a/ingestion/src/metadata/profiler/metrics/static/like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/like_count.py @@ -12,6 +12,7 @@ """ Like Count Metric definition """ + # pylint: disable=duplicate-code from sqlalchemy import case, column @@ -19,6 +20,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.sum import SumFn +from metadata.profiler.orm.registry import is_complex_type class LikeCount(StaticMetric): @@ -46,6 +48,8 @@ def metric_type(self): @_label def fn(self): + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Like Count requires an expression to be set: add_props(expression=...)(Metrics.likeCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/max.py b/ingestion/src/metadata/profiler/metrics/static/max.py index 235555cb9fcd..52e6fa2d7f46 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max.py +++ b/ingestion/src/metadata/profiler/metrics/static/max.py @@ -12,6 +12,7 @@ """ Max Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/max_length.py b/ingestion/src/metadata/profiler/metrics/static/max_length.py index 7f374d4537ee..2c0aa3aded13 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/max_length.py @@ -12,18 +12,24 @@ """ MAX_LENGTH Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional +from numpy import vectorize from sqlalchemy import column, func from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import ( + is_complex_type, + is_concatenable, + is_length_computable, +) from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -59,7 +65,7 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if self._is_concatenable(): + if is_length_computable(self.col.type): return func.max(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -100,16 +106,24 @@ def update_accumulator( current_max: Optional[int], df: "pd.DataFrame", column ) -> Optional[int]: """Computes one DataFrame chunk and updates the running maximum""" + # pylint: disable=import-outside-toplevel import pandas as pd - from numpy import vectorize + + series = df[column.name].dropna() + if series.empty: + return current_max length_vectorize_func = vectorize(len) chunk_max = None if is_concatenable(column.type): - max_val = length_vectorize_func(df[column.name].dropna().astype(str)).max() + max_val = length_vectorize_func(series.astype(str)).max() + if not pd.isnull(max_val): + chunk_max = int(max_val) + elif is_complex_type(column.type): + max_val = length_vectorize_func(series.astype(str)).max() if not pd.isnull(max_val): - chunk_max = max_val + chunk_max = int(max_val) if chunk_max is None or pd.isnull(chunk_max): return current_max diff --git a/ingestion/src/metadata/profiler/metrics/static/mean.py b/ingestion/src/metadata/profiler/metrics/static/mean.py index 33393a2d54b3..195ca9605e2f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/mean.py +++ b/ingestion/src/metadata/profiler/metrics/static/mean.py @@ -12,6 +12,7 @@ """ AVG Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, NamedTuple, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/min.py b/ingestion/src/metadata/profiler/metrics/static/min.py index f831cd212e95..ad69cd4fff81 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min.py +++ b/ingestion/src/metadata/profiler/metrics/static/min.py @@ -12,6 +12,7 @@ """ Min Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/min_length.py b/ingestion/src/metadata/profiler/metrics/static/min_length.py index 5e5888ce71d3..67220f05f927 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/profiler/metrics/static/min_length.py @@ -12,18 +12,24 @@ """ MIN_LENGTH Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional +from numpy import vectorize from sqlalchemy import column, func from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import ( + is_complex_type, + is_concatenable, + is_length_computable, +) from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -59,7 +65,7 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" - if self._is_concatenable(): + if is_length_computable(self.col.type): return func.min(LenFn(column(self.col.name, self.col.type))) logger.debug( @@ -100,16 +106,24 @@ def update_accumulator( current_min: Optional[int], df: "pd.DataFrame", column ) -> Optional[int]: """Computes one DataFrame chunk and updates the running minimum""" + # pylint: disable=import-outside-toplevel import pandas as pd - from numpy import vectorize + + series = df[column.name].dropna() + if series.empty: + return current_min length_vectorize_func = vectorize(len) chunk_min = None if is_concatenable(column.type): - min_val = length_vectorize_func(df[column.name].dropna().astype(str)).min() + min_val = length_vectorize_func(series.astype(str)).min() + if not pd.isnull(min_val): + chunk_min = int(min_val) + elif is_complex_type(column.type): + min_val = length_vectorize_func(series.astype(str)).min() if not pd.isnull(min_val): - chunk_min = min_val + chunk_min = int(min_val) if chunk_min is None or pd.isnull(chunk_min): return current_min diff --git a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py index eadec9cc4274..470f7233eacd 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py @@ -12,6 +12,7 @@ """ Like Count Metric definition """ + # pylint: disable=duplicate-code from sqlalchemy import case, column @@ -19,6 +20,7 @@ from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.sum import SumFn +from metadata.profiler.orm.registry import is_complex_type class NotLikeCount(StaticMetric): @@ -46,6 +48,8 @@ def metric_type(self): @_label def fn(self): + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Not Like Count requires an expression to be set: add_props(expression=...)(Metrics.notLikeCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py index 8e4b7e33d9d2..88c4b6a7ba99 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py @@ -12,6 +12,7 @@ """ Regex Count Metric definition """ + # pylint: disable=duplicate-code import traceback @@ -24,7 +25,7 @@ from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.regexp import RegexpMatchFn from metadata.profiler.orm.functions.sum import SumFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import is_complex_type, is_concatenable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -64,6 +65,8 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Not Regex Count requires an expression to be set: add_props(expression=...)(Metrics.notRegexCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/null_count.py b/ingestion/src/metadata/profiler/metrics/static/null_count.py index 71f97a60e1e1..4e48c5996a4b 100644 --- a/ingestion/src/metadata/profiler/metrics/static/null_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/null_count.py @@ -12,6 +12,7 @@ """ Null Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py b/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py index 8b19638a3dd0..de316b074c82 100644 --- a/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py @@ -12,6 +12,7 @@ """ Null Count Metric definition """ + # pylint: disable=duplicate-code from typing import TYPE_CHECKING, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py index e4ab3c54796b..78c715d3aace 100644 --- a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py @@ -12,6 +12,7 @@ """ Regex Count Metric definition """ + # pylint: disable=duplicate-code import traceback @@ -24,7 +25,7 @@ from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.regexp import RegexpMatchFn from metadata.profiler.orm.functions.sum import SumFn -from metadata.profiler.orm.registry import is_concatenable +from metadata.profiler.orm.registry import is_complex_type, is_concatenable from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -64,6 +65,8 @@ def _is_concatenable(self): @_label def fn(self): """sqlalchemy function""" + if is_complex_type(self.col.type): + return None if not hasattr(self, "expression"): raise AttributeError( "Regex Count requires an expression to be set: add_props(expression=...)(Metrics.regexCount)" diff --git a/ingestion/src/metadata/profiler/metrics/static/row_count.py b/ingestion/src/metadata/profiler/metrics/static/row_count.py index d16311c1a8e5..b028a62e6852 100644 --- a/ingestion/src/metadata/profiler/metrics/static/row_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/row_count.py @@ -12,6 +12,7 @@ """ Table Count Metric definition """ + from typing import TYPE_CHECKING, Callable, Optional from sqlalchemy import func diff --git a/ingestion/src/metadata/profiler/metrics/static/sum.py b/ingestion/src/metadata/profiler/metrics/static/sum.py index c4d98ff7bcfe..9280f6321cc6 100644 --- a/ingestion/src/metadata/profiler/metrics/static/sum.py +++ b/ingestion/src/metadata/profiler/metrics/static/sum.py @@ -12,6 +12,7 @@ """ SUM Metric definition """ + from functools import partial from typing import TYPE_CHECKING, Callable, Optional diff --git a/ingestion/src/metadata/profiler/metrics/static/unique_count.py b/ingestion/src/metadata/profiler/metrics/static/unique_count.py index 1f646145c91d..a2b048887c6c 100644 --- a/ingestion/src/metadata/profiler/metrics/static/unique_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/unique_count.py @@ -12,6 +12,7 @@ """ Unique Count Metric definition """ + import json from collections import Counter from typing import TYPE_CHECKING, Optional @@ -23,7 +24,7 @@ from metadata.profiler.metrics.core import QueryMetric from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.unique_count import _unique_count_query_mapper -from metadata.profiler.orm.registry import NOT_COMPUTE, Dialects +from metadata.profiler.orm.registry import NOT_COMPUTE, Dialects, is_complex_type from metadata.utils.logger import profiler_logger if TYPE_CHECKING: @@ -60,7 +61,9 @@ def query(self, sample: Optional[type], session: Optional[Session] = None): "We are missing the session attribute to compute the UniqueCount." ) - if self.col.type.__class__.__name__ in NOT_COMPUTE: + if self.col.type.__class__.__name__ in NOT_COMPUTE or is_complex_type( + self.col.type + ): return None # Run all queries on top of the sampled data diff --git a/ingestion/src/metadata/profiler/orm/functions/length.py b/ingestion/src/metadata/profiler/orm/functions/length.py index 7c078da5fd77..c56e574c169a 100644 --- a/ingestion/src/metadata/profiler/orm/functions/length.py +++ b/ingestion/src/metadata/profiler/orm/functions/length.py @@ -19,7 +19,7 @@ from sqlalchemy.sql.functions import FunctionElement from metadata.profiler.metrics.core import CACHE -from metadata.profiler.orm.registry import Dialects +from metadata.profiler.orm.registry import Dialects, is_complex_type class LenFn(FunctionElement): @@ -28,6 +28,9 @@ class LenFn(FunctionElement): @compiles(LenFn) def _(element, compiler, **kw): + type_ = element.clauses.clauses[0].type + if is_complex_type(type_): + return "LEN(CAST(%s AS STRING))" % compiler.process(element.clauses, **kw) return "LEN(%s)" % compiler.process(element.clauses, **kw) @@ -53,6 +56,9 @@ def _(element, compiler, **kw): @compiles(LenFn, Dialects.Teradata) @compiles(LenFn, Dialects.Informix) def _(element, compiler, **kw): + type_ = element.clauses.clauses[0].type + if is_complex_type(type_): + return "LENGTH(CAST(%s AS STRING))" % compiler.process(element.clauses, **kw) return "LENGTH(%s)" % compiler.process(element.clauses, **kw) diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index c6da40f10e1b..5512effc324f 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -13,6 +13,7 @@ Custom types' registry for easy access without having an import mess """ + import math from enum import Enum @@ -116,23 +117,8 @@ class Dialects(metaclass=EnumAdapter): pass -# Sometimes we want to skip certain types for computing metrics. -# If the type is NULL, then we won't run the metric execution -# in the profiler. -# Note that not mapped types are set to NULL by default. NOT_COMPUTE = { sqlalchemy.types.NullType.__name__, - sqlalchemy.ARRAY.__name__, - sqlalchemy.JSON.__name__, - sqa_types.SQAMap.__name__, - sqa_types.SQAStruct.__name__, - sqa_types.SQASet.__name__, - sqa_types.SQAUnion.__name__, - sqa_types.SQASGeography.__name__, - DataType.GEOMETRY.value, - DataType.ARRAY.value, - DataType.JSON.value, - CustomTypes.ARRAY.value.__name__, CustomTypes.SQADATETIMERANGE.value.__name__, DataType.XML.value, CustomTypes.UNDETERMINED.value.__name__, @@ -154,6 +140,26 @@ class Dialects(metaclass=EnumAdapter): CONCATENABLE_SET = {DataType.STRING.value, DataType.TEXT.value} +COLLECTION_SET = { + sqlalchemy.ARRAY.__name__, + DataType.ARRAY.value, + CustomTypes.ARRAY.value.__name__, +} + +STRUCT_SET = { + sqlalchemy.JSON.__name__, + sqa_types.SQAMap.__name__, + sqa_types.SQAStruct.__name__, + sqa_types.SQASet.__name__, + sqa_types.SQAUnion.__name__, + DataType.JSON.value, +} + +COMPLEX_SET = { + sqa_types.SQASGeography.__name__, + DataType.GEOMETRY.value, +} + # Now, let's define some helper methods to identify # the nature of an SQLAlchemy type @@ -237,3 +243,34 @@ def is_blob(_type) -> bool: DataType.TEXT.value, } return isinstance(_type, (HexByteString, LargeBinary, Text)) + + +def is_collection(_type) -> bool: + """Check if the type is a collection, e.g. Array""" + if isinstance(_type, DataType): + return _type.value in COLLECTION_SET + return _type.__class__.__name__ in COLLECTION_SET + + +def is_struct(_type) -> bool: + """Check if the type is a struct, e.g. JSON, Map, Struct""" + if isinstance(_type, DataType): + return _type.value in STRUCT_SET + return _type.__class__.__name__ in STRUCT_SET + + +def is_complex(_type) -> bool: + """Check if the type is complex, e.g. Geography or Geometry""" + if isinstance(_type, DataType): + return _type.value in COMPLEX_SET + return _type.__class__.__name__ in COMPLEX_SET + + +def is_complex_type(_type) -> bool: + """Helper method to group collections, structs, and complex types""" + return is_collection(_type) or is_struct(_type) or is_complex(_type) + + +def is_length_computable(_type) -> bool: + """Check if length metrics can be computed for this type""" + return is_concatenable(_type) or is_complex_type(_type) diff --git a/ingestion/tests/unit/topology/dashboard/test_quicksight.py b/ingestion/tests/unit/topology/dashboard/test_quicksight.py index 2fae7d73471c..23eb0a6556c1 100644 --- a/ingestion/tests/unit/topology/dashboard/test_quicksight.py +++ b/ingestion/tests/unit/topology/dashboard/test_quicksight.py @@ -388,3 +388,81 @@ def describe_data_set_side_effect(**kwargs): col_names_b = {col.name.root for col in dm_b.columns} assert col_names_b == {"email", "created_at"} + + @pytest.mark.order(9) + def test_build_column_lineage_from_parser_uses_aliases(self): + """ + When a CustomSql query uses column aliases (SELECT src_col AS alias_col), + _build_column_lineage_from_parser must resolve the source column name and + map it to the aliased data model column, not drop the lineage pair. + """ + from unittest.mock import MagicMock + + src_col = MagicMock() + src_col.raw_name = "original_col" + src_col.parent = "my_table" + + tgt_col = MagicMock() + tgt_col.raw_name = "alias_col" + + mock_parser = MagicMock() + mock_parser.column_lineage = [(src_col, tgt_col)] + + src_fqn = "db.schema.my_table.original_col" + alias_fqn = "qs_service.dataset.alias_col" + + mock_from_entity = MagicMock() + mock_from_entity.name.root = "my_table" + mock_data_model = MagicMock() + + with patch( + "metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn", + return_value=src_fqn, + ) as mock_get_col_fqn: + with patch.object( + self.quicksight, + "_get_data_model_column_fqn", + return_value=alias_fqn, + ) as mock_get_dm_col_fqn: + result = self.quicksight._build_column_lineage_from_parser( + mock_parser, mock_from_entity, mock_data_model + ) + + mock_get_col_fqn.assert_called_once_with( + table_entity=mock_from_entity, column="original_col" + ) + mock_get_dm_col_fqn.assert_called_once_with( + data_model_entity=mock_data_model, column="alias_col" + ) + assert len(result) == 1 + assert [fqn.root for fqn in result[0].fromColumns] == [src_fqn] + assert result[0].toColumn.root == alias_fqn + + @pytest.mark.order(10) + def test_build_column_lineage_from_parser_falls_back_when_no_parser_results(self): + """ + When lineage_parser.column_lineage is empty, _build_column_lineage_from_parser + must fall back to name-based matching via _get_column_lineage. + """ + mock_parser = MagicMock() + mock_parser.column_lineage = [] + + fallback_lineage = [MagicMock()] + mock_from_entity = MagicMock() + mock_data_model = MagicMock() + mock_col = MagicMock() + mock_col.name = MagicMock(root="col_a") + mock_data_model.columns = [mock_col] + + with patch.object( + self.quicksight, + "_get_column_lineage", + return_value=fallback_lineage, + ) as mock_get_col_lineage: + result = self.quicksight._build_column_lineage_from_parser( + mock_parser, mock_from_entity, mock_data_model + ) + + mock_get_col_lineage.assert_called_once() + assert result is fallback_lineage + diff --git a/scripts/datamodel_generation.py b/scripts/datamodel_generation.py index ab1a847002e5..3737030a09a3 100644 --- a/scripts/datamodel_generation.py +++ b/scripts/datamodel_generation.py @@ -26,8 +26,11 @@ from datamodel_code_generator.__main__ import main current_directory = os.getcwd() -ingestion_path = "./" if current_directory.endswith("/ingestion") else "ingestion/" -directory_root = "../" if current_directory.endswith("/ingestion") else "./" +ingestion_path = "./" if os.path.basename(current_directory).lower() == "ingestion" else "ingestion/" +directory_root = "../" if os.path.basename(current_directory).lower() == "ingestion" else "./" +print(f"Current Directory: {current_directory}") +print(f"Ingestion Path: {ingestion_path}") +print(f"Directory Root: {directory_root}") UTF_8 = "UTF-8" UNICODE_REGEX_REPLACEMENT_FILE_PATHS = [ @@ -43,7 +46,7 @@ main(args) for file_path in UNICODE_REGEX_REPLACEMENT_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: content = file_.read() # Python now requires to move the global flags at the very start of the expression content = content.replace("(?U)", "(?u)") @@ -56,7 +59,7 @@ WRITE_AFTER = "from __future__ import annotations" for file_path in MISSING_IMPORTS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: lines = file_.readlines() with open(file_path, "w", encoding=UTF_8) as file_: for line in lines: @@ -75,7 +78,7 @@ ] for file_path in UNSUPPORTED_REGEX_PATTERN_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: content = file_.read() content = content.replace("pattern='^((?!::).)*$',", "") with open(file_path, "w", encoding=UTF_8) as file_: @@ -88,7 +91,7 @@ ] for file_path in DATETIME_AWARE_FILE_PATHS: - with open(file_path, "r", encoding=UTF_8) as file_: + with open(file_path, "r", encoding=UTF_8, errors="replace") as file_: content = file_.read() content = content.replace( "from pydantic import AnyUrl, AwareDatetime, ConfigDict, EmailStr, Field, RootModel", @@ -98,3 +101,15 @@ content = content.replace("AwareDatetime", "datetime") with open(file_path, "w", encoding=UTF_8) as file_: file_.write(content) + + +import glob + +# Final sanitization pass for Windows encoding issues +print("Sanitizing all generated files to UTF-8...") +for file_path in glob.iglob(f"{ingestion_path}src/metadata/generated/schema/**/*.py", recursive=True): + with open(file_path, "r", encoding=UTF_8, errors="replace") as f: + content = f.read() + with open(file_path, "w", encoding=UTF_8) as f: + f.write(content) +print("Sanitization complete.")