diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index 384b27fd189d..89947300c4bf 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -45,13 +45,18 @@ SourceUrl, Uuid, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import ( LINEAGE_MAP, @@ -233,6 +238,99 @@ def _describe_data_sets(self, dataset_id, dashboard_details: DashboardDetail) -> logger.info(f"Cannot parse lineage from the dashboard: {dashboard_details.Name} to dataset due to: {err}") return dataset_id, [] + def _build_column_lineage_from_parser( + self, + lineage_parser: LineageParser, + from_entity: Table, + data_model_entity: DashboardDataModel, + ) -> list[ColumnLineage]: + """ + Build column-level lineage using SQL parser alias mappings. + + When a QuickSight CustomSql query uses column aliases + (e.g. ``SELECT id AS relation_id``), name-based matching fails + because the alias name is matched against upstream columns instead + of tracing back through the SQL expression. + + This method uses :class:`LineageParser` column mappings to resolve + the true source column (``id``) from the alias (``relation_id``), + and filters by ``src_col._parent`` to avoid multi-table column + name collisions. + + Falls back to :meth:`_get_column_lineage` when the parser returns + no column lineage (e.g. SQL too complex, parsing failed, or no + aliases present). + + Issue #26670. + """ + column_lineage: list[ColumnLineage] = [] + + for col_pair in lineage_parser.column_lineage or []: + # Guard: parser may return single-element tuples in edge cases + if len(col_pair) < 2: + continue + + src_col = col_pair[0] + tgt_col = col_pair[-1] + + parent = getattr(src_col, "_parent", None) + # _parent may be a single Table, an iterable of Tables, or an + # empty iterable when the parser could not infer the source table. + # Treat empty iterables the same as missing parent info. + if parent: + parents = list(parent) if hasattr(parent, "__iter__") and not isinstance(parent, str) else [parent] + else: + parents = [] + + if parents: + entity_table = from_entity.name.root.lower() + # Accept the pair only when at least one parent table + # matches the current upstream entity being processed. + if not any(str(p).replace(".", "").split(".")[-1].lower() == entity_table for p in parents): + continue + else: + # _parent is missing or empty — parser could not determine source + # table. Log for visibility but allow the pair through; + # get_column_fqn provides a secondary guard (column must + # exist in entity). + logger.debug( + "No parent table info for column %s; skipping parent-table filter", + src_col.raw_name, + ) + + # raw_name may be fully-qualified (e.g. 'schema.table.col') + # Extract just the column name portion. + src_col_name = src_col.raw_name.split(".")[-1] + tgt_col_name = tgt_col.raw_name.split(".")[-1] + + try: + from_col_fqn = get_column_fqn(table_entity=from_entity, column=src_col_name) + to_col_fqn = self._get_data_model_column_fqn( + data_model_entity=data_model_entity, + column=tgt_col_name, + ) + if from_col_fqn and to_col_fqn: + column_lineage.append( + ColumnLineage( + fromColumns=[from_col_fqn], + toColumn=to_col_fqn, + ) + ) + except Exception as exc: # pylint: disable=broad-except + logger.debug(f"Failed to build column lineage for {src_col_name} -> {tgt_col_name}: {exc}") + logger.debug(traceback.format_exc()) + + # Only fall back to name-based matching when the parser found + # NO column lineage globally (parse failure, too complex, no aliases). + # If the parser DID produce lineage but none matched this specific + # from_entity (multi-table query), return an empty list rather than + # manufacturing incorrect cross-table lineage. + if not column_lineage and not lineage_parser.column_lineage: + columns = [col.name.root for col in data_model_entity.columns] + return self._get_column_lineage(from_entity, data_model_entity, columns) + + return column_lineage + def _yield_lineage_from_query( self, data_model_entity, @@ -308,8 +406,9 @@ def _yield_lineage_from_query( ) for from_entity in from_entities or []: if from_entity is not None and data_model_entity is not None: - columns = [col.name.root for col in data_model_entity.columns] - column_lineage = self._get_column_lineage(from_entity, data_model_entity, columns) + column_lineage = self._build_column_lineage_from_parser( + lineage_parser, from_entity, data_model_entity + ) lineage_details.columnsLineage = column_lineage yield Either( right=AddLineageRequest( diff --git a/ingestion/tests/unit/topology/dashboard/test_quicksight.py b/ingestion/tests/unit/topology/dashboard/test_quicksight.py index ecfed49896cc..776845590f7c 100644 --- a/ingestion/tests/unit/topology/dashboard/test_quicksight.py +++ b/ingestion/tests/unit/topology/dashboard/test_quicksight.py @@ -364,6 +364,7 @@ def describe_data_set_side_effect(**kwargs): col_names_b = {col.name.root for col in dm_b.columns} assert col_names_b == {"email", "created_at"} + @pytest.mark.order(9) def test_chart_source_state_populated(self): """Verify register_record_chart populates chart_source_state after yield_dashboard_chart.""" dashboard_details = DashboardDetail(**{**MOCK_DASHBOARD_DETAILS, "Version": mock_data["Version"]}) @@ -372,3 +373,228 @@ def test_chart_source_state_populated(self): assert len(self.quicksight.chart_source_state) == len(mock_data["Version"]["Sheets"]) for fqn in self.quicksight.chart_source_state: assert "quicksight_source_test" in fqn + + @pytest.mark.order(10) + def test_build_column_lineage_from_parser_resolves_alias(self): + """ + When CustomSql uses SELECT src_col AS alias_col, + _build_column_lineage_from_parser must map src_col (source) + to alias_col (data model column) — not match by name. + Issue #26670. + """ + src_col = MagicMock() + src_col.raw_name = "id" + src_col._parent = None # No parent — single-table query + + tgt_col = MagicMock() + tgt_col.raw_name = "relation_id" + + mock_parser = MagicMock() + mock_parser.column_lineage = [(src_col, tgt_col)] + + src_fqn = "postgres.public.relation_table.id" + alias_fqn = "quicksight_service.dataset.relation_id" + + mock_from_entity = MagicMock() + mock_from_entity.name.root = "relation_table" + mock_data_model = MagicMock() + + with ( + patch( + "metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn", + return_value=src_fqn, + ) as mock_get_col_fqn, + patch.object( + self.quicksight, + "_get_data_model_column_fqn", + return_value=alias_fqn, + ) as mock_get_dm_col_fqn, + ): + result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model) + + mock_get_col_fqn.assert_called_once_with(table_entity=mock_from_entity, column="id") + mock_get_dm_col_fqn.assert_called_once_with(data_model_entity=mock_data_model, column="relation_id") + assert len(result) == 1 + assert result[0].fromColumns[0].root == src_fqn + assert result[0].toColumn.root == alias_fqn + + @pytest.mark.order(11) + def test_build_column_lineage_from_parser_multi_table_filters_correctly(self): + """ + When CustomSql joins multiple tables with shared column names + (e.g. t1.id and t2.id), _build_column_lineage_from_parser must + only emit lineage for columns belonging to from_entity — not + columns from other tables with the same name. + Issue #26670. + """ + # Column from the correct upstream table + src_col_correct = MagicMock() + src_col_correct.raw_name = "id" + src_col_correct._parent = type("_FakeTable", (), {"__str__": lambda self: "relation_table"})() + + tgt_col_correct = MagicMock() + tgt_col_correct.raw_name = "relation_id" + + # Column from a DIFFERENT table with same name 'id' + src_col_wrong = MagicMock() + src_col_wrong.raw_name = "id" + src_col_wrong._parent = type("_FakeTable", (), {"__str__": lambda self: "other_table"})() + + tgt_col_wrong = MagicMock() + tgt_col_wrong.raw_name = "other_relation_id" + + mock_parser = MagicMock() + mock_parser.column_lineage = [ + (src_col_correct, tgt_col_correct), + (src_col_wrong, tgt_col_wrong), + ] + + src_fqn = "postgres.public.relation_table.id" + alias_fqn = "quicksight_service.dataset.relation_id" + + mock_from_entity = MagicMock() + mock_from_entity.name.root = "relation_table" + mock_data_model = MagicMock() + + with ( + patch( + "metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn", + return_value=src_fqn, + ), + patch.object( + self.quicksight, + "_get_data_model_column_fqn", + return_value=alias_fqn, + ), + ): + result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model) + + # Only 1 result — the wrong table's column must be filtered out + assert len(result) == 1 + assert result[0].fromColumns[0].root == src_fqn + assert result[0].toColumn.root == alias_fqn + + @pytest.mark.order(12) + def test_build_column_lineage_no_fallback_when_parser_has_global_lineage(self): + """ + Regression test for the multi-table fallback bug (Issue #26670). + + When lineage_parser.column_lineage is non-empty (parser succeeded) + but none of the pairs match from_entity (because they belong to a + different upstream table in a multi-table JOIN), the method must + return an empty list and must NOT call _get_column_lineage (the + name-based fallback). Calling the fallback here would manufacture + incorrect cross-table column lineage. + """ + # Parser found lineage for a DIFFERENT table, not our from_entity + other_src_col = MagicMock() + other_src_col.raw_name = "user_id" + other_src_col._parent = type("_FakeTable", (), {"__str__": lambda self: "users_table"})() + + other_tgt_col = MagicMock() + other_tgt_col.raw_name = "uid" + + mock_parser = MagicMock() + # Parser globally found lineage — but only for 'users_table' + mock_parser.column_lineage = [(other_src_col, other_tgt_col)] + + mock_from_entity = MagicMock() + # Our from_entity is 'orders_table' — no parser pairs match it + mock_from_entity.name.root = "orders_table" + mock_data_model = MagicMock() + + with patch.object( + self.quicksight, + "_get_column_lineage", + ) as mock_fallback: + result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model) + + # Must NOT have called the name-based fallback + mock_fallback.assert_not_called() + # Must return an empty list — no manufactured lineage + assert result == [] + + @pytest.mark.order(13) + def test_build_column_lineage_from_parser_iterable_parent(self): + """ + When src_col._parent is an iterable of parent tables (as some + parser outputs produce), _build_column_lineage_from_parser must + correctly match against from_entity when any parent matches. + Issue #26670. + """ + # Parent is an iterable (list) of Table objects + parent_table_mock = MagicMock() + parent_table_mock.__str__ = MagicMock(return_value="relation_table") + + src_col = MagicMock() + src_col.raw_name = "id" + # _parent is a list — simulates iterable parser output + src_col._parent = [parent_table_mock] + + tgt_col = MagicMock() + tgt_col.raw_name = "relation_id" + + mock_parser = MagicMock() + mock_parser.column_lineage = [(src_col, tgt_col)] + + src_fqn = "postgres.public.relation_table.id" + alias_fqn = "quicksight_service.dataset.relation_id" + + mock_from_entity = MagicMock() + mock_from_entity.name.root = "relation_table" + mock_data_model = MagicMock() + + with ( + patch( + "metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn", + return_value=src_fqn, + ), + patch.object( + self.quicksight, + "_get_data_model_column_fqn", + return_value=alias_fqn, + ), + ): + result = self.quicksight._build_column_lineage_from_parser( + mock_parser, + mock_from_entity, + mock_data_model, + ) + + assert len(result) == 1 + assert result[0].fromColumns[0].root == src_fqn + assert result[0].toColumn.root == alias_fqn + + @pytest.mark.order(14) + def test_build_column_lineage_from_parser_falls_back_when_empty(self): + """ + When lineage_parser.column_lineage is empty (parser failed or + no aliases), _build_column_lineage_from_parser must fall back + to name-based matching via _get_column_lineage. + Issue #26670. + """ + mock_parser = MagicMock() + mock_parser.column_lineage = [] + + fallback_lineage = [MagicMock()] + mock_from_entity = MagicMock() + mock_from_entity.name.root = "relation_table" + + # Build mock columns properly — avoid MagicMock name kwarg trap + mock_col = MagicMock() + mock_col.name = MagicMock() + mock_col.name.root = "col_a" + + mock_data_model = MagicMock() + mock_data_model.columns = [mock_col] + + with patch.object( + self.quicksight, + "_get_column_lineage", + return_value=fallback_lineage, + ) as mock_get_col_lineage: + result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model) + + # Verify fallback was called with correct column names + mock_get_col_lineage.assert_called_once_with(mock_from_entity, mock_data_model, ["col_a"]) + assert result is fallback_lineage