Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@
SourceUrl,
Uuid,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
LINEAGE_MAP,
Expand Down Expand Up @@ -350,11 +355,8 @@ def _yield_lineage_from_query(
)
for from_entity in from_entities or []:
if from_entity is not None and data_model_entity is not None:
columns = [
col.name.root for col in data_model_entity.columns
]
column_lineage = self._get_column_lineage(
from_entity, data_model_entity, columns
column_lineage = self._build_column_lineage_from_parser(
lineage_parser, from_entity, data_model_entity
)
lineage_details.columnsLineage = column_lineage
yield Either(
Expand Down Expand Up @@ -539,9 +541,11 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
for datamodel in self.data_models or []:
try:
data_model_entity = self._get_datamodel(
datamodel_id=datamodel.dataset_id
if datamodel.dataset_id is not None
else datamodel.DataSource.DataSourceId
datamodel_id=(
datamodel.dataset_id
if datamodel.dataset_id is not None
else datamodel.DataSource.DataSourceId
)
)
if isinstance(
datamodel.DataSource.data_source_resp, DataSourceRespQuery
Expand Down Expand Up @@ -574,6 +578,49 @@ def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
)
)

def _build_column_lineage_from_parser(
self,
lineage_parser: LineageParser,
from_entity: Table,
data_model_entity: DashboardDataModel,
) -> List[ColumnLineage]:
"""
Build column lineage using SQL-parsed source→target column mappings.

When the CustomSql query uses column aliases (e.g. SELECT a AS b), the
lineage parser resolves the original column name and alias. This allows
correct lineage when the data model columns carry alias names that do not
exist in the source table.

Falls back to name-based matching when no parser mappings are available.
"""
column_lineage = []
for col_pair in lineage_parser.column_lineage or []:
try:
src_col = col_pair[0]
tgt_col = col_pair[-1]
from_col = get_column_fqn(
table_entity=from_entity, column=src_col.raw_name
)
to_col = self._get_data_model_column_fqn(
data_model_entity=data_model_entity,
column=tgt_col.raw_name,
)
Comment on lines +598 to +608
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_build_column_lineage_from_parser matches parsed columns to from_entity using only src_col.raw_name. In multi-table CustomSql queries, different source tables often share column names (e.g., id), so this can incorrectly attach a lineage pair to the wrong from_entity (because get_column_fqn will happily resolve id on any table that has it). Consider filtering lineage_parser.column_lineage by the source column’s parent table (e.g., src_col._parent) or by passing the current source table context into this method and using get_column_fqn(..., table=..., schema=..., database=...) so only pairs belonging to the current upstream table are emitted.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SaaiAravindhRaja this is a valid comment

if from_col and to_col:
column_lineage.append(
ColumnLineage(fromColumns=[from_col], toColumn=to_col)
)
except Exception as exc:
logger.debug(
f"Failed to resolve column lineage for pair {col_pair}: {exc}"
)
if not column_lineage:
columns = [col.name.root for col in data_model_entity.columns]
column_lineage = (
self._get_column_lineage(from_entity, data_model_entity, columns) or []
)
return column_lineage

def _get_column_info(self, data_model: DescribeDataSourceResponse):
"""Get column info"""
datasource_columns = []
Expand Down Expand Up @@ -684,9 +731,9 @@ def yield_datamodel(
Each QuickSight dataset produces a separate DataModel entity,
identified by dataset_id rather than datasource_id.
"""
self.data_models: List[
DescribeDataSourceResponse
] = self._get_dashboard_datamodels(dashboard_details)
self.data_models: List[DescribeDataSourceResponse] = (
self._get_dashboard_datamodels(dashboard_details)
)
dataset_groups: dict[str, List[DescribeDataSourceResponse]] = defaultdict(list)
for data_model in self.data_models:
key = (
Expand Down
75 changes: 75 additions & 0 deletions ingestion/tests/unit/topology/dashboard/test_quicksight.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,78 @@ def describe_data_set_side_effect(**kwargs):

col_names_b = {col.name.root for col in dm_b.columns}
assert col_names_b == {"email", "created_at"}

@pytest.mark.order(9)
def test_build_column_lineage_from_parser_uses_aliases(self):
"""
When a CustomSql query uses column aliases (SELECT src_col AS alias_col),
_build_column_lineage_from_parser must resolve the source column name and
map it to the aliased data model column, not drop the lineage pair.
"""
from unittest.mock import MagicMock

src_col = MagicMock()
src_col.raw_name = "original_col"

tgt_col = MagicMock()
tgt_col.raw_name = "alias_col"

mock_parser = MagicMock()
mock_parser.column_lineage = [(src_col, tgt_col)]

src_fqn = "db.schema.my_table.original_col"
Comment on lines +401 to +410
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test sets src_col.raw_name to a bare column name (original_col), but in real LineageParser.column_lineage output raw_name is frequently fully-qualified (e.g. testdb.public.users.id). As written, the test can pass even if _build_column_lineage_from_parser fails to handle fully-qualified raw_name values (and would then fall back to name-based matching).

To prevent regressions, consider updating the fixture to use a fully-qualified raw_name (and/or a quoted identifier) and assert that the implementation still resolves the correct source column FQN.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SaaiAravindhRaja valid comment as well

alias_fqn = "qs_service.dataset.alias_col"

mock_from_entity = MagicMock()
mock_data_model = MagicMock()

with patch(
"metadata.ingestion.source.dashboard.quicksight.metadata.get_column_fqn",
return_value=src_fqn,
) as mock_get_col_fqn:
with patch.object(
self.quicksight,
"_get_data_model_column_fqn",
return_value=alias_fqn,
) as mock_get_dm_col_fqn:
result = self.quicksight._build_column_lineage_from_parser(
mock_parser, mock_from_entity, mock_data_model
)

mock_get_col_fqn.assert_called_once_with(
table_entity=mock_from_entity, column="original_col"
)
mock_get_dm_col_fqn.assert_called_once_with(
data_model_entity=mock_data_model, column="alias_col"
)
assert len(result) == 1
assert result[0].fromColumns == [src_fqn]
assert result[0].toColumn == alias_fqn

@pytest.mark.order(10)
def test_build_column_lineage_from_parser_falls_back_when_no_parser_results(self):
"""
When lineage_parser.column_lineage is empty, _build_column_lineage_from_parser
must fall back to name-based matching via _get_column_lineage.
"""
mock_parser = MagicMock()
mock_parser.column_lineage = []

fallback_lineage = [MagicMock()]
mock_from_entity = MagicMock()
mock_data_model = MagicMock()
mock_col = MagicMock()
mock_col.name = MagicMock(root="col_a")
mock_data_model.columns = [mock_col]

with patch.object(
self.quicksight,
"_get_column_lineage",
return_value=fallback_lineage,
) as mock_get_col_lineage:
result = self.quicksight._build_column_lineage_from_parser(
mock_parser, mock_from_entity, mock_data_model
)

mock_get_col_lineage.assert_called_once()
assert result is fallback_lineage
Comment on lines +392 to +465
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added unit tests validate alias mapping and the empty-parser fallback, but they don’t cover the most failure-prone scenario for this change: a CustomSql query that references multiple upstream tables where the same source column name exists in more than one table (e.g., t1.id and t2.id). Adding a test that asserts _build_column_lineage_from_parser only produces column lineage for the correct upstream table would prevent regressions of incorrect column-level lineage resolution.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SaaiAravindhRaja valid comment as well

Loading