Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@
SourceUrl,
Uuid,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
LINEAGE_MAP,
Expand Down Expand Up @@ -233,6 +238,99 @@ def _describe_data_sets(self, dataset_id, dashboard_details: DashboardDetail) ->
logger.info(f"Cannot parse lineage from the dashboard: {dashboard_details.Name} to dataset due to: {err}")
return dataset_id, []

def _build_column_lineage_from_parser(
self,
lineage_parser: LineageParser,
from_entity: Table,
data_model_entity: DashboardDataModel,
) -> list[ColumnLineage]:
"""
Build column-level lineage using SQL parser alias mappings.

When a QuickSight CustomSql query uses column aliases
(e.g. ``SELECT id AS relation_id``), name-based matching fails
because the alias name is matched against upstream columns instead
of tracing back through the SQL expression.

This method uses :class:`LineageParser` column mappings to resolve
the true source column (``id``) from the alias (``relation_id``),
and filters by ``src_col._parent`` to avoid multi-table column
name collisions.

Falls back to :meth:`_get_column_lineage` when the parser returns
no column lineage (e.g. SQL too complex, parsing failed, or no
aliases present).

Issue #26670.
"""
column_lineage: list[ColumnLineage] = []

for col_pair in lineage_parser.column_lineage or []:
# Guard: parser may return single-element tuples in edge cases
if len(col_pair) < 2:
continue

src_col = col_pair[0]
tgt_col = col_pair[-1]

parent = getattr(src_col, "_parent", None)
# _parent may be a single Table, an iterable of Tables, or an
# empty iterable when the parser could not infer the source table.
# Treat empty iterables the same as missing parent info.
if parent:
parents = list(parent) if hasattr(parent, "__iter__") and not isinstance(parent, str) else [parent]
else:
parents = []

if parents:
entity_table = from_entity.name.root.lower()
# Accept the pair only when at least one parent table
# matches the current upstream entity being processed.
if not any(str(p).replace("<default>.", "").split(".")[-1].lower() == entity_table for p in parents):
continue
else:
# _parent is missing or empty — parser could not determine source
# table. Log for visibility but allow the pair through;
# get_column_fqn provides a secondary guard (column must
# exist in entity).
logger.debug(
"No parent table info for column %s; skipping parent-table filter",
src_col.raw_name,
)

# raw_name may be fully-qualified (e.g. 'schema.table.col')
# Extract just the column name portion.
src_col_name = src_col.raw_name.split(".")[-1]
tgt_col_name = tgt_col.raw_name.split(".")[-1]

try:
from_col_fqn = get_column_fqn(table_entity=from_entity, column=src_col_name)
to_col_fqn = self._get_data_model_column_fqn(
data_model_entity=data_model_entity,
column=tgt_col_name,
)
if from_col_fqn and to_col_fqn:
column_lineage.append(
ColumnLineage(
fromColumns=[from_col_fqn],
toColumn=to_col_fqn,
)
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(f"Failed to build column lineage for {src_col_name} -> {tgt_col_name}: {exc}")
logger.debug(traceback.format_exc())

# Only fall back to name-based matching when the parser found
# NO column lineage globally (parse failure, too complex, no aliases).
# If the parser DID produce lineage but none matched this specific
# from_entity (multi-table query), return an empty list rather than
# manufacturing incorrect cross-table lineage.
if not column_lineage and not lineage_parser.column_lineage:
columns = [col.name.root for col in data_model_entity.columns]
return self._get_column_lineage(from_entity, data_model_entity, columns)

return column_lineage

def _yield_lineage_from_query(
self,
data_model_entity,
Expand Down Expand Up @@ -308,8 +406,9 @@ def _yield_lineage_from_query(
)
for from_entity in from_entities or []:
if from_entity is not None and data_model_entity is not None:
columns = [col.name.root for col in data_model_entity.columns]
column_lineage = self._get_column_lineage(from_entity, data_model_entity, columns)
column_lineage = self._build_column_lineage_from_parser(
lineage_parser, from_entity, data_model_entity
)
lineage_details.columnsLineage = column_lineage
yield Either(
right=AddLineageRequest(
Expand Down
226 changes: 226 additions & 0 deletions ingestion/tests/unit/topology/dashboard/test_quicksight.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ def describe_data_set_side_effect(**kwargs):
col_names_b = {col.name.root for col in dm_b.columns}
assert col_names_b == {"email", "created_at"}

@pytest.mark.order(9)
def test_chart_source_state_populated(self):
"""Verify register_record_chart populates chart_source_state after yield_dashboard_chart."""
dashboard_details = DashboardDetail(**{**MOCK_DASHBOARD_DETAILS, "Version": mock_data["Version"]})
Expand All @@ -372,3 +373,228 @@ def test_chart_source_state_populated(self):
assert len(self.quicksight.chart_source_state) == len(mock_data["Version"]["Sheets"])
for fqn in self.quicksight.chart_source_state:
assert "quicksight_source_test" in fqn

@pytest.mark.order(10)
def test_build_column_lineage_from_parser_resolves_alias(self):
"""
When CustomSql uses SELECT src_col AS alias_col,
_build_column_lineage_from_parser must map src_col (source)
to alias_col (data model column) — not match by name.
Issue #26670.
"""
src_col = MagicMock()
src_col.raw_name = "id"
src_col._parent = None # No parent — single-table query

tgt_col = MagicMock()
tgt_col.raw_name = "relation_id"

mock_parser = MagicMock()
mock_parser.column_lineage = [(src_col, tgt_col)]

src_fqn = "postgres.public.relation_table.id"
alias_fqn = "quicksight_service.dataset.relation_id"

mock_from_entity = MagicMock()
mock_from_entity.name.root = "relation_table"
mock_data_model = MagicMock()

with (
patch(
"metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn",
return_value=src_fqn,
) as mock_get_col_fqn,
patch.object(
self.quicksight,
"_get_data_model_column_fqn",
return_value=alias_fqn,
) as mock_get_dm_col_fqn,
):
result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model)

mock_get_col_fqn.assert_called_once_with(table_entity=mock_from_entity, column="id")
mock_get_dm_col_fqn.assert_called_once_with(data_model_entity=mock_data_model, column="relation_id")
assert len(result) == 1
assert result[0].fromColumns[0].root == src_fqn
assert result[0].toColumn.root == alias_fqn

@pytest.mark.order(11)
def test_build_column_lineage_from_parser_multi_table_filters_correctly(self):
"""
When CustomSql joins multiple tables with shared column names
(e.g. t1.id and t2.id), _build_column_lineage_from_parser must
only emit lineage for columns belonging to from_entity — not
columns from other tables with the same name.
Issue #26670.
"""
# Column from the correct upstream table
src_col_correct = MagicMock()
src_col_correct.raw_name = "id"
src_col_correct._parent = type("_FakeTable", (), {"__str__": lambda self: "relation_table"})()

tgt_col_correct = MagicMock()
tgt_col_correct.raw_name = "relation_id"

# Column from a DIFFERENT table with same name 'id'
src_col_wrong = MagicMock()
src_col_wrong.raw_name = "id"
src_col_wrong._parent = type("_FakeTable", (), {"__str__": lambda self: "other_table"})()

Comment thread
mohitjeswani01 marked this conversation as resolved.
tgt_col_wrong = MagicMock()
tgt_col_wrong.raw_name = "other_relation_id"

mock_parser = MagicMock()
mock_parser.column_lineage = [
(src_col_correct, tgt_col_correct),
(src_col_wrong, tgt_col_wrong),
]

src_fqn = "postgres.public.relation_table.id"
alias_fqn = "quicksight_service.dataset.relation_id"

mock_from_entity = MagicMock()
mock_from_entity.name.root = "relation_table"
mock_data_model = MagicMock()

with (
patch(
"metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn",
return_value=src_fqn,
),
patch.object(
self.quicksight,
"_get_data_model_column_fqn",
return_value=alias_fqn,
),
):
result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model)

# Only 1 result — the wrong table's column must be filtered out
assert len(result) == 1
assert result[0].fromColumns[0].root == src_fqn
assert result[0].toColumn.root == alias_fqn

@pytest.mark.order(12)
def test_build_column_lineage_no_fallback_when_parser_has_global_lineage(self):
"""
Regression test for the multi-table fallback bug (Issue #26670).

When lineage_parser.column_lineage is non-empty (parser succeeded)
but none of the pairs match from_entity (because they belong to a
different upstream table in a multi-table JOIN), the method must
return an empty list and must NOT call _get_column_lineage (the
name-based fallback). Calling the fallback here would manufacture
incorrect cross-table column lineage.
"""
# Parser found lineage for a DIFFERENT table, not our from_entity
other_src_col = MagicMock()
other_src_col.raw_name = "user_id"
other_src_col._parent = type("_FakeTable", (), {"__str__": lambda self: "users_table"})()

other_tgt_col = MagicMock()
other_tgt_col.raw_name = "uid"

mock_parser = MagicMock()
# Parser globally found lineage — but only for 'users_table'
mock_parser.column_lineage = [(other_src_col, other_tgt_col)]

mock_from_entity = MagicMock()
# Our from_entity is 'orders_table' — no parser pairs match it
Comment thread
mohitjeswani01 marked this conversation as resolved.
mock_from_entity.name.root = "orders_table"
mock_data_model = MagicMock()

with patch.object(
self.quicksight,
"_get_column_lineage",
) as mock_fallback:
result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model)

# Must NOT have called the name-based fallback
mock_fallback.assert_not_called()
# Must return an empty list — no manufactured lineage
assert result == []

@pytest.mark.order(13)
def test_build_column_lineage_from_parser_iterable_parent(self):
"""
When src_col._parent is an iterable of parent tables (as some
parser outputs produce), _build_column_lineage_from_parser must
correctly match against from_entity when any parent matches.
Issue #26670.
"""
# Parent is an iterable (list) of Table objects
parent_table_mock = MagicMock()
parent_table_mock.__str__ = MagicMock(return_value="relation_table")

src_col = MagicMock()
src_col.raw_name = "id"
# _parent is a list — simulates iterable parser output
src_col._parent = [parent_table_mock]

tgt_col = MagicMock()
tgt_col.raw_name = "relation_id"

mock_parser = MagicMock()
mock_parser.column_lineage = [(src_col, tgt_col)]

src_fqn = "postgres.public.relation_table.id"
alias_fqn = "quicksight_service.dataset.relation_id"

mock_from_entity = MagicMock()
mock_from_entity.name.root = "relation_table"
mock_data_model = MagicMock()

with (
patch(
"metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn",
return_value=src_fqn,
),
patch.object(
self.quicksight,
"_get_data_model_column_fqn",
return_value=alias_fqn,
),
):
result = self.quicksight._build_column_lineage_from_parser(
mock_parser,
mock_from_entity,
mock_data_model,
)

assert len(result) == 1
assert result[0].fromColumns[0].root == src_fqn
assert result[0].toColumn.root == alias_fqn

@pytest.mark.order(14)
def test_build_column_lineage_from_parser_falls_back_when_empty(self):
"""
When lineage_parser.column_lineage is empty (parser failed or
no aliases), _build_column_lineage_from_parser must fall back
to name-based matching via _get_column_lineage.
Issue #26670.
"""
mock_parser = MagicMock()
mock_parser.column_lineage = []

fallback_lineage = [MagicMock()]
mock_from_entity = MagicMock()
mock_from_entity.name.root = "relation_table"

# Build mock columns properly — avoid MagicMock name kwarg trap
mock_col = MagicMock()
mock_col.name = MagicMock()
mock_col.name.root = "col_a"

mock_data_model = MagicMock()
mock_data_model.columns = [mock_col]

with patch.object(
self.quicksight,
"_get_column_lineage",
return_value=fallback_lineage,
) as mock_get_col_lineage:
result = self.quicksight._build_column_lineage_from_parser(mock_parser, mock_from_entity, mock_data_model)

# Verify fallback was called with correct column names
mock_get_col_lineage.assert_called_once_with(mock_from_entity, mock_data_model, ["col_a"])
assert result is fallback_lineage
Loading