diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 6c502f7f9452..3ee66b92cc8d 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -11,8 +11,9 @@ """ Trino lineage module """ + import traceback -from typing import Iterable, Iterator, List +from typing import Dict, Iterable, Iterator, List, Optional from sqlalchemy import text @@ -30,6 +31,7 @@ TRINO_QUERY_BATCH_SIZE, TrinoQueryParserSource, ) +from metadata.utils import fqn from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -112,9 +114,122 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: """ Method to check whether the table1 and table2 are same """ - return table1.name.root == table2.name.root and { - column.name.root for column in table1.columns - } == {column.name.root for column in table2.columns} + if table1.name.root.lower() != table2.name.root.lower(): + return False + + if not table1.columns and not table2.columns: + return True + + if not table1.columns or not table2.columns: + return False + return {column.name.root.lower() for column in table1.columns} == { + column.name.root.lower() for column in table2.columns + } + + def _get_cross_database_schema_fqn( + self, + cross_database_fqn: str, + trino_table: Table, + cross_database_schema_mapping: Dict[str, Dict[str, str]], + ) -> Optional[str]: + trino_schema_name = None + if trino_table.databaseSchema and trino_table.databaseSchema.name: + trino_schema_name = trino_table.databaseSchema.name.root + + if ( + not trino_schema_name + and trino_table.fullyQualifiedName + and trino_table.fullyQualifiedName.root + ): + trino_table_fqn_parts = fqn.split(trino_table.fullyQualifiedName.root) + if len(trino_table_fqn_parts) >= 4: + trino_schema_name = trino_table_fqn_parts[-2] + + if not trino_schema_name: + return None + + if cross_database_fqn not in cross_database_schema_mapping: + cross_database_schema_mapping[cross_database_fqn] = {} + + cross_database_schema_fqn = cross_database_schema_mapping[ + cross_database_fqn + ].get(trino_schema_name.lower()) + if cross_database_schema_fqn: + return cross_database_schema_fqn + + cross_database_fqn_parts = fqn.split(cross_database_fqn) + if len(cross_database_fqn_parts) == 2: + cross_database_service_name, cross_database_name = cross_database_fqn_parts + cross_database_schemas = fqn.search_database_schema_from_es( + metadata=self.metadata, + database_name=cross_database_name, + schema_name=trino_schema_name, + service_name=cross_database_service_name, + fetch_multiple_entities=True, + fields="fullyQualifiedName,name", + ) + if cross_database_schemas: + for cross_database_schema in cross_database_schemas: + if ( + cross_database_schema.name + and cross_database_schema.fullyQualifiedName + ): + cross_database_schema_mapping[cross_database_fqn][ + cross_database_schema.name.root.lower() + ] = cross_database_schema.fullyQualifiedName.root + + return ( + cross_database_schema_mapping[cross_database_fqn].get( + trino_schema_name.lower() + ) + or f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}" + ) + + def _get_case_insensitive_cross_database_table( + self, + cross_database_schema_fqn: str, + trino_table: Table, + cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]], + ) -> Optional[Table]: + if cross_database_schema_fqn not in cross_database_table_schema_mapping: + cross_database_table_schema_mapping[cross_database_schema_fqn] = {} + + table_key = trino_table.name.root.lower() + if ( + table_key + not in cross_database_table_schema_mapping[cross_database_schema_fqn] + ): + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = [] + cross_database_schema_fqn_parts = fqn.split(cross_database_schema_fqn) + if len(cross_database_schema_fqn_parts) == 3: + ( + cross_database_service_name, + cross_database_name, + cross_database_schema_name, + ) = cross_database_schema_fqn_parts + cross_database_tables = fqn.search_table_from_es( + metadata=self.metadata, + database_name=cross_database_name, + schema_name=cross_database_schema_name, + service_name=cross_database_service_name, + table_name=table_key, + fetch_multiple_entities=True, + fields="fullyQualifiedName,name,columns,databaseSchema", + ) + if cross_database_tables: + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = cross_database_tables + + for cross_database_table in cross_database_table_schema_mapping[ + cross_database_schema_fqn + ].get(table_key, []): + if self.check_same_table(trino_table, cross_database_table): + return cross_database_table + + return None def get_cross_database_lineage( self, from_table: Table, to_table: Table @@ -131,10 +246,64 @@ def get_cross_database_lineage( from_entity=from_table, to_entity=to_table, column_lineage=column_lineage ) + def _get_cross_database_lineage_for_table( + self, + trino_database_fqn: str, + trino_table: Table, + *, + all_cross_database_fqns: List[str], + cross_database_table_fqn_mapping: Dict[str, Optional[Table]], + cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]], + cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]], + ) -> Optional[Either[AddLineageRequest]]: + trino_table_fqn = trino_table.fullyQualifiedName.root + trino_database_prefix = f"{trino_database_fqn}." + if not trino_table_fqn.startswith(trino_database_prefix): + return None + + trino_table_suffix = trino_table_fqn[len(trino_database_fqn) :] + for cross_database_fqn in all_cross_database_fqns: + cross_database_table_fqn = f"{cross_database_fqn}{trino_table_suffix}" + if cross_database_table_fqn not in cross_database_table_fqn_mapping: + cross_database_table = self.metadata.get_by_name( + Table, fqn=cross_database_table_fqn + ) + if not cross_database_table: + cross_database_schema_fqn = self._get_cross_database_schema_fqn( + cross_database_fqn, + trino_table, + cross_database_schema_fqn_mapping, + ) + if cross_database_schema_fqn: + cross_database_table = ( + self._get_case_insensitive_cross_database_table( + cross_database_schema_fqn, + trino_table, + cross_database_table_schema_mapping, + ) + ) + cross_database_table_fqn_mapping[ + cross_database_table_fqn + ] = cross_database_table + + cross_database_table = cross_database_table_fqn_mapping[ + cross_database_table_fqn + ] + if cross_database_table and self.check_same_table( + trino_table, cross_database_table + ): + return self.get_cross_database_lineage( + cross_database_table, trino_table + ) + + return None + def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: try: all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() cross_database_table_fqn_mapping = {} + cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]] = {} + cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]] = {} # Get all databases for the specified Trino service trino_databases = self.metadata.list_all_entities( @@ -149,29 +318,16 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: ) # NOTE: Currently, tables in system-defined schemas will also be checked for lineage. for trino_table in trino_tables: - trino_table_fqn = trino_table.fullyQualifiedName.root - for cross_database_fqn in all_cross_database_fqns: - # Construct the FQN for cross-database tables - cross_database_table_fqn = trino_table_fqn.replace( - trino_database_fqn, cross_database_fqn - ) - # Cache cross-database table against its FQN to avoid repeated API calls - cross_database_table = cross_database_table_fqn_mapping[ - cross_database_table_fqn - ] = cross_database_table_fqn_mapping.get( - cross_database_table_fqn, - self.metadata.get_by_name( - Table, fqn=cross_database_table_fqn - ), - ) - # Create cross database lineage request if both tables are same - if cross_database_table and self.check_same_table( - trino_table, cross_database_table - ): - yield self.get_cross_database_lineage( - cross_database_table, trino_table - ) - break + cross_database_lineage = self._get_cross_database_lineage_for_table( + trino_database_fqn=trino_database_fqn, + trino_table=trino_table, + all_cross_database_fqns=all_cross_database_fqns, + cross_database_table_fqn_mapping=cross_database_table_fqn_mapping, + cross_database_schema_fqn_mapping=cross_database_schema_fqn_mapping, + cross_database_table_schema_mapping=cross_database_table_schema_mapping, + ) + if cross_database_lineage: + yield cross_database_lineage except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py new file mode 100644 index 000000000000..98a7aad9a2e2 --- /dev/null +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -0,0 +1,161 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Regression tests for Trino cross-database lineage (Issue #27419).""" + +from unittest.mock import MagicMock, patch + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.api.models import Either +from metadata.ingestion.source.database.trino.lineage import TrinoLineageSource + + +class TrinoLineageSourceTestDouble(TrinoLineageSource): + """Minimal Trino lineage source for unit testing.""" + + def __init__(self, metadata): + self.metadata = metadata + self.config = MagicMock() + self.config.serviceName = "repro_trino" + self.source_config = MagicMock() + self.source_config.crossDatabaseServiceNames = ["repro_postgres"] + + +def _mock_column(column_name): + column = MagicMock() + column.name.root = column_name + return column + + +def test_check_same_table_is_case_insensitive_for_names_and_columns(): + """Issue #27419: table and column comparisons should ignore case.""" + metadata = MagicMock() + lineage_source = TrinoLineageSourceTestDouble(metadata) + + source_table = MagicMock() + source_table.name.root = "CUSTOMER" + source_table.columns = [_mock_column("ID"), _mock_column("NAME")] + + target_table = MagicMock() + target_table.name.root = "customer" + target_table.columns = [_mock_column("id"), _mock_column("name")] + + assert lineage_source.check_same_table(source_table, target_table) + + +def test_yield_cross_database_lineage_finds_uppercase_source_table(): + """Issue #27419: resolve uppercase Postgres source table in cross-db lineage.""" + metadata = MagicMock() + + trino_database = MagicMock() + trino_database.fullyQualifiedName.root = "repro_trino.postgres" + + source_database = MagicMock() + source_database.fullyQualifiedName.root = "repro_postgres.source_db" + + source_schema = MagicMock() + source_schema.name.root = "SOURCE_SCHEMA" + source_schema.fullyQualifiedName.root = "repro_postgres.source_db.SOURCE_SCHEMA" + + trino_table = MagicMock() + trino_table.id.root = "11111111-1111-1111-1111-111111111111" + trino_table.fullyQualifiedName.root = "repro_trino.postgres.source_schema.customer" + trino_table.name.root = "customer" + trino_table.databaseSchema.name.root = "source_schema" + trino_table.databaseSchema.fullyQualifiedName.root = ( + "repro_trino.postgres.source_schema" + ) + trino_table.columns = [_mock_column("id"), _mock_column("name")] + + source_table = MagicMock() + source_table.id.root = "22222222-2222-2222-2222-222222222222" + source_table.fullyQualifiedName.root = ( + "repro_postgres.source_db.SOURCE_SCHEMA.CUSTOMER" + ) + source_table.name.root = "CUSTOMER" + source_table.databaseSchema.name.root = "SOURCE_SCHEMA" + source_table.databaseSchema.fullyQualifiedName.root = ( + "repro_postgres.source_db.SOURCE_SCHEMA" + ) + source_table.columns = [_mock_column("id"), _mock_column("name")] + + def list_all_entities_side_effect(entity, params=None, **_kwargs): + if entity is Database and params == {"service": "repro_trino"}: + return [trino_database] + if entity is Database and params == {"service": "repro_postgres"}: + return [source_database] + if entity is Table and params == {"database": "repro_trino.postgres"}: + return [trino_table] + return [] + + metadata.list_all_entities.side_effect = list_all_entities_side_effect + metadata.get_by_name.return_value = None + + lineage_source = TrinoLineageSourceTestDouble(metadata) + + with patch.object( + TrinoLineageSource, + "get_cross_database_lineage", + return_value=Either(right="cross-database-edge"), + ) as mock_get_cross_database_lineage, patch( + "metadata.ingestion.source.database.trino.lineage.fqn.search_database_schema_from_es", + return_value=[source_schema], + ) as mock_search_database_schema, patch( + "metadata.ingestion.source.database.trino.lineage.fqn.search_table_from_es", + return_value=[source_table], + ) as mock_search_table: + result = list(lineage_source.yield_cross_database_lineage()) + + assert len(result) == 1 + assert result[0].right == "cross-database-edge" + mock_get_cross_database_lineage.assert_called_once_with(source_table, trino_table) + mock_search_database_schema.assert_called_once_with( + metadata=metadata, + database_name="source_db", + schema_name="source_schema", + service_name="repro_postgres", + fetch_multiple_entities=True, + fields="fullyQualifiedName,name", + ) + mock_search_table.assert_called_once_with( + metadata=metadata, + database_name="source_db", + schema_name="SOURCE_SCHEMA", + service_name="repro_postgres", + table_name="customer", + fetch_multiple_entities=True, + fields="fullyQualifiedName,name,columns,databaseSchema", + ) + + +def test_get_cross_database_schema_fqn_parses_quoted_schema_from_fqn(): + """Issue #27419: parse quoted schema names with dots from table FQNs.""" + metadata = MagicMock() + + trino_table = MagicMock() + trino_table.databaseSchema = None + trino_table.fullyQualifiedName.root = ( + 'repro_trino.postgres."source.schema".customer' + ) + + lineage_source = TrinoLineageSourceTestDouble(metadata) + + with patch( + "metadata.ingestion.source.database.trino.lineage.fqn.search_database_schema_from_es", + return_value=None, + ): + result = lineage_source._get_cross_database_schema_fqn( + "repro_postgres.source_db", + trino_table, + {}, + ) + + assert result == 'repro_postgres.source_db."source.schema"'