From 162bcb1a7a7c58bb9ffc3c5f55bfa50fac654c51 Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Fri, 17 Apr 2026 22:53:10 +0500 Subject: [PATCH 01/16] fix(ingestion): make Trino cross db lineage case insensitive Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 46 +++++++--- .../source/database/trino/test_lineage.py | 85 +++++++++++++++++++ 2 files changed, 120 insertions(+), 11 deletions(-) create mode 100644 ingestion/tests/unit/source/database/trino/test_lineage.py diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 6c502f7f9452..0aa728ec2805 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -12,7 +12,7 @@ Trino lineage module """ import traceback -from typing import Iterable, Iterator, List +from typing import Iterable, Iterator, List, Optional from sqlalchemy import text @@ -112,9 +112,25 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: """ Method to check whether the table1 and table2 are same """ - return table1.name.root == table2.name.root and { - column.name.root for column in table1.columns - } == {column.name.root for column in table2.columns} + if table1.name.root.lower() != table2.name.root.lower(): + return False + + if not table1.columns or not table2.columns: + return True + + return { + column.name.root.lower() for column in table1.columns + } == {column.name.root.lower() for column in table2.columns} + + def _get_case_insensitive_cross_database_table( + self, cross_database_fqn: str, trino_table: Table + ) -> Optional[Table]: + for cross_database_table in self.metadata.list_all_entities( + entity=Table, params={"database": cross_database_fqn} + ): + if self.check_same_table(trino_table, cross_database_table): + return cross_database_table + return None def get_cross_database_lineage( self, from_table: Table, to_table: Table @@ -155,15 +171,23 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: cross_database_table_fqn = trino_table_fqn.replace( trino_database_fqn, cross_database_fqn ) - # Cache cross-database table against its FQN to avoid repeated API calls + if cross_database_table_fqn not in cross_database_table_fqn_mapping: + cross_database_table = self.metadata.get_by_name( + Table, fqn=cross_database_table_fqn + ) + if not cross_database_table: + cross_database_table = ( + self._get_case_insensitive_cross_database_table( + cross_database_fqn, trino_table + ) + ) + cross_database_table_fqn_mapping[ + cross_database_table_fqn + ] = cross_database_table + cross_database_table = cross_database_table_fqn_mapping[ cross_database_table_fqn - ] = cross_database_table_fqn_mapping.get( - cross_database_table_fqn, - self.metadata.get_by_name( - Table, fqn=cross_database_table_fqn - ), - ) + ] # Create cross database lineage request if both tables are same if cross_database_table and self.check_same_table( trino_table, cross_database_table diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py new file mode 100644 index 000000000000..6d5a0093ce5d --- /dev/null +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -0,0 +1,85 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Regression tests for Trino cross-database lineage.""" + +from unittest.mock import MagicMock, patch + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.api.models import Either +from metadata.ingestion.source.database.trino.lineage import TrinoLineageSource + + +class TrinoLineageSourceTestDouble(TrinoLineageSource): + """Minimal Trino lineage source for unit testing.""" + + def __init__(self, metadata): + self.metadata = metadata + self.config = MagicMock() + self.config.serviceName = "repro_trino" + self.source_config = MagicMock() + self.source_config.crossDatabaseServiceNames = ["repro_postgres"] + + +def _mock_column(column_name): + column = MagicMock() + column.name.root = column_name + return column + + +def test_yield_cross_database_lineage_finds_uppercase_source_table(): + """Trino cross-db lineage should resolve the uppercase Postgres source table.""" + metadata = MagicMock() + + trino_database = MagicMock() + trino_database.fullyQualifiedName.root = "repro_trino.postgres" + + source_database = MagicMock() + source_database.fullyQualifiedName.root = "repro_postgres.source_db" + + trino_table = MagicMock() + trino_table.id.root = "11111111-1111-1111-1111-111111111111" + trino_table.fullyQualifiedName.root = "repro_trino.postgres.source_schema.customer" + trino_table.name.root = "customer" + trino_table.columns = [] + + source_table = MagicMock() + source_table.id.root = "22222222-2222-2222-2222-222222222222" + source_table.fullyQualifiedName.root = "repro_postgres.source_db.source_schema.CUSTOMER" + source_table.name.root = "CUSTOMER" + source_table.columns = [_mock_column("id"), _mock_column("name")] + + def list_all_entities_side_effect(entity, params=None, **_kwargs): + if entity is Database and params == {"service": "repro_trino"}: + return [trino_database] + if entity is Database and params == {"service": "repro_postgres"}: + return [source_database] + if entity is Table and params == {"database": "repro_trino.postgres"}: + return [trino_table] + if entity is Table and params == {"database": "repro_postgres.source_db"}: + return [source_table] + return [] + + metadata.list_all_entities.side_effect = list_all_entities_side_effect + metadata.get_by_name.return_value = None + + lineage_source = TrinoLineageSourceTestDouble(metadata) + + with patch.object( + TrinoLineageSource, + "get_cross_database_lineage", + return_value=Either(right="cross-database-edge"), + ) as mock_get_cross_database_lineage: + result = list(lineage_source.yield_cross_database_lineage()) + + assert len(result) == 1 + assert result[0].right == "cross-database-edge" + mock_get_cross_database_lineage.assert_called_once_with(source_table, trino_table) \ No newline at end of file From 674b6d4fdfc21caece1dc25a8717d9ea17c012d8 Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Sat, 18 Apr 2026 06:14:22 +0500 Subject: [PATCH 02/16] =?UTF-8?q?=1B[200~fix(trino-lineage):=20scope=20cas?= =?UTF-8?q?e-insensitive=20cross-db=20matching=20to=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 48 ++++++++++++++++--- .../source/database/trino/test_lineage.py | 48 ++++++++++++++++++- 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 0aa728ec2805..8af21d12da31 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -12,7 +12,7 @@ Trino lineage module """ import traceback -from typing import Iterable, Iterator, List, Optional +from typing import Dict, Iterable, Iterator, List, Optional from sqlalchemy import text @@ -122,14 +122,40 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: column.name.root.lower() for column in table1.columns } == {column.name.root.lower() for column in table2.columns} - def _get_case_insensitive_cross_database_table( + def _get_cross_database_schema_fqn( self, cross_database_fqn: str, trino_table: Table + ) -> Optional[str]: + if trino_table.databaseSchema and trino_table.databaseSchema.name: + return f"{cross_database_fqn}.{trino_table.databaseSchema.name.root}" + + if trino_table.fullyQualifiedName and trino_table.fullyQualifiedName.root: + trino_table_fqn_parts = trino_table.fullyQualifiedName.root.split(".") + if len(trino_table_fqn_parts) >= 4: + return f"{cross_database_fqn}.{trino_table_fqn_parts[-2]}" + + return None + + def _get_case_insensitive_cross_database_table( + self, + cross_database_schema_fqn: str, + trino_table: Table, + cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]], ) -> Optional[Table]: - for cross_database_table in self.metadata.list_all_entities( - entity=Table, params={"database": cross_database_fqn} - ): + if cross_database_schema_fqn not in cross_database_table_schema_mapping: + cross_database_table_schema_mapping[cross_database_schema_fqn] = {} + for cross_database_table in self.metadata.list_all_entities( + entity=Table, params={"databaseSchema": cross_database_schema_fqn} + ): + cross_database_table_schema_mapping[cross_database_schema_fqn].setdefault( + cross_database_table.name.root.lower(), [] + ).append(cross_database_table) + + for cross_database_table in cross_database_table_schema_mapping[ + cross_database_schema_fqn + ].get(trino_table.name.root.lower(), []): if self.check_same_table(trino_table, cross_database_table): return cross_database_table + return None def get_cross_database_lineage( @@ -151,6 +177,9 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: try: all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() cross_database_table_fqn_mapping = {} + cross_database_table_schema_mapping: Dict[ + str, Dict[str, List[Table]] + ] = {} # Get all databases for the specified Trino service trino_databases = self.metadata.list_all_entities( @@ -171,14 +200,19 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: cross_database_table_fqn = trino_table_fqn.replace( trino_database_fqn, cross_database_fqn ) + cross_database_schema_fqn = self._get_cross_database_schema_fqn( + cross_database_fqn, trino_table + ) if cross_database_table_fqn not in cross_database_table_fqn_mapping: cross_database_table = self.metadata.get_by_name( Table, fqn=cross_database_table_fqn ) - if not cross_database_table: + if not cross_database_table and cross_database_schema_fqn: cross_database_table = ( self._get_case_insensitive_cross_database_table( - cross_database_fqn, trino_table + cross_database_schema_fqn, + trino_table, + cross_database_table_schema_mapping, ) ) cross_database_table_fqn_mapping[ diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 6d5a0093ce5d..56bfe5e52fd6 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -35,6 +35,22 @@ def _mock_column(column_name): return column +def test_check_same_table_is_case_insensitive_for_names_and_columns(): + """Table and column comparisons should ignore case.""" + metadata = MagicMock() + lineage_source = TrinoLineageSourceTestDouble(metadata) + + source_table = MagicMock() + source_table.name.root = "CUSTOMER" + source_table.columns = [_mock_column("ID"), _mock_column("NAME")] + + target_table = MagicMock() + target_table.name.root = "customer" + target_table.columns = [_mock_column("id"), _mock_column("name")] + + assert lineage_source.check_same_table(source_table, target_table) + + def test_yield_cross_database_lineage_finds_uppercase_source_table(): """Trino cross-db lineage should resolve the uppercase Postgres source table.""" metadata = MagicMock() @@ -49,12 +65,30 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): trino_table.id.root = "11111111-1111-1111-1111-111111111111" trino_table.fullyQualifiedName.root = "repro_trino.postgres.source_schema.customer" trino_table.name.root = "customer" + trino_table.databaseSchema.name.root = "source_schema" + trino_table.databaseSchema.fullyQualifiedName.root = ( + "repro_trino.postgres.source_schema" + ) trino_table.columns = [] + wrong_table = MagicMock() + wrong_table.id.root = "33333333-3333-3333-3333-333333333333" + wrong_table.fullyQualifiedName.root = "repro_postgres.source_db.other_schema.customer" + wrong_table.name.root = "customer" + wrong_table.databaseSchema.name.root = "other_schema" + wrong_table.databaseSchema.fullyQualifiedName.root = ( + "repro_postgres.source_db.other_schema" + ) + wrong_table.columns = [_mock_column("legacy")] + source_table = MagicMock() source_table.id.root = "22222222-2222-2222-2222-222222222222" source_table.fullyQualifiedName.root = "repro_postgres.source_db.source_schema.CUSTOMER" source_table.name.root = "CUSTOMER" + source_table.databaseSchema.name.root = "source_schema" + source_table.databaseSchema.fullyQualifiedName.root = ( + "repro_postgres.source_db.source_schema" + ) source_table.columns = [_mock_column("id"), _mock_column("name")] def list_all_entities_side_effect(entity, params=None, **_kwargs): @@ -64,8 +98,12 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): return [source_database] if entity is Table and params == {"database": "repro_trino.postgres"}: return [trino_table] - if entity is Table and params == {"database": "repro_postgres.source_db"}: + if entity is Table and params == { + "databaseSchema": "repro_postgres.source_db.source_schema" + }: return [source_table] + if entity is Table and params == {"database": "repro_postgres.source_db"}: + return [wrong_table] return [] metadata.list_all_entities.side_effect = list_all_entities_side_effect @@ -82,4 +120,10 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): assert len(result) == 1 assert result[0].right == "cross-database-edge" - mock_get_cross_database_lineage.assert_called_once_with(source_table, trino_table) \ No newline at end of file + mock_get_cross_database_lineage.assert_called_once_with(source_table, trino_table) + assert any( + call.kwargs.get("params") == { + "databaseSchema": "repro_postgres.source_db.source_schema" + } + for call in metadata.list_all_entities.call_args_list + ) \ No newline at end of file From 81240886a1ffff2706bd74c7ffd8525737f8dcd8 Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Sat, 18 Apr 2026 06:32:41 +0500 Subject: [PATCH 03/16] fix(trino-lineage): resolve cross-db schema FQN case-insensitively Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 39 ++++++++++++++++--- .../source/database/trino/test_lineage.py | 22 ++++++++--- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 8af21d12da31..9f8db7b92fc4 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -18,6 +18,7 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, @@ -123,17 +124,40 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: } == {column.name.root.lower() for column in table2.columns} def _get_cross_database_schema_fqn( - self, cross_database_fqn: str, trino_table: Table + self, + cross_database_fqn: str, + trino_table: Table, + cross_database_schema_mapping: Dict[str, Dict[str, str]], ) -> Optional[str]: + trino_schema_name = None if trino_table.databaseSchema and trino_table.databaseSchema.name: - return f"{cross_database_fqn}.{trino_table.databaseSchema.name.root}" + trino_schema_name = trino_table.databaseSchema.name.root - if trino_table.fullyQualifiedName and trino_table.fullyQualifiedName.root: + if not trino_schema_name and trino_table.fullyQualifiedName and trino_table.fullyQualifiedName.root: trino_table_fqn_parts = trino_table.fullyQualifiedName.root.split(".") if len(trino_table_fqn_parts) >= 4: - return f"{cross_database_fqn}.{trino_table_fqn_parts[-2]}" + trino_schema_name = trino_table_fqn_parts[-2] - return None + if not trino_schema_name: + return None + + if cross_database_fqn not in cross_database_schema_mapping: + cross_database_schema_mapping[cross_database_fqn] = {} + for cross_database_schema in self.metadata.list_all_entities( + entity=DatabaseSchema, params={"database": cross_database_fqn} + ): + if cross_database_schema.name and cross_database_schema.fullyQualifiedName: + cross_database_schema_mapping[cross_database_fqn][ + cross_database_schema.name.root.lower() + ] = cross_database_schema.fullyQualifiedName.root + + cross_database_schema_fqn = cross_database_schema_mapping[cross_database_fqn].get( + trino_schema_name.lower() + ) + if cross_database_schema_fqn: + return cross_database_schema_fqn + + return f"{cross_database_fqn}.{trino_schema_name}" def _get_case_insensitive_cross_database_table( self, @@ -177,6 +201,7 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: try: all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() cross_database_table_fqn_mapping = {} + cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]] = {} cross_database_table_schema_mapping: Dict[ str, Dict[str, List[Table]] ] = {} @@ -201,7 +226,9 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: trino_database_fqn, cross_database_fqn ) cross_database_schema_fqn = self._get_cross_database_schema_fqn( - cross_database_fqn, trino_table + cross_database_fqn, + trino_table, + cross_database_schema_fqn_mapping, ) if cross_database_table_fqn not in cross_database_table_fqn_mapping: cross_database_table = self.metadata.get_by_name( diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 56bfe5e52fd6..987fdc0d6b70 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -13,6 +13,7 @@ from unittest.mock import MagicMock, patch from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table from metadata.ingestion.api.models import Either from metadata.ingestion.source.database.trino.lineage import TrinoLineageSource @@ -61,6 +62,10 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): source_database = MagicMock() source_database.fullyQualifiedName.root = "repro_postgres.source_db" + source_schema = MagicMock() + source_schema.name.root = "SOURCE_SCHEMA" + source_schema.fullyQualifiedName.root = "repro_postgres.source_db.SOURCE_SCHEMA" + trino_table = MagicMock() trino_table.id.root = "11111111-1111-1111-1111-111111111111" trino_table.fullyQualifiedName.root = "repro_trino.postgres.source_schema.customer" @@ -83,11 +88,11 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): source_table = MagicMock() source_table.id.root = "22222222-2222-2222-2222-222222222222" - source_table.fullyQualifiedName.root = "repro_postgres.source_db.source_schema.CUSTOMER" + source_table.fullyQualifiedName.root = "repro_postgres.source_db.SOURCE_SCHEMA.CUSTOMER" source_table.name.root = "CUSTOMER" - source_table.databaseSchema.name.root = "source_schema" + source_table.databaseSchema.name.root = "SOURCE_SCHEMA" source_table.databaseSchema.fullyQualifiedName.root = ( - "repro_postgres.source_db.source_schema" + "repro_postgres.source_db.SOURCE_SCHEMA" ) source_table.columns = [_mock_column("id"), _mock_column("name")] @@ -96,10 +101,12 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): return [trino_database] if entity is Database and params == {"service": "repro_postgres"}: return [source_database] + if entity is DatabaseSchema and params == {"database": "repro_postgres.source_db"}: + return [source_schema] if entity is Table and params == {"database": "repro_trino.postgres"}: return [trino_table] if entity is Table and params == { - "databaseSchema": "repro_postgres.source_db.source_schema" + "databaseSchema": "repro_postgres.source_db.SOURCE_SCHEMA" }: return [source_table] if entity is Table and params == {"database": "repro_postgres.source_db"}: @@ -123,7 +130,12 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): mock_get_cross_database_lineage.assert_called_once_with(source_table, trino_table) assert any( call.kwargs.get("params") == { - "databaseSchema": "repro_postgres.source_db.source_schema" + "databaseSchema": "repro_postgres.source_db.SOURCE_SCHEMA" } for call in metadata.list_all_entities.call_args_list + ) + assert any( + call.kwargs.get("params") == {"database": "repro_postgres.source_db"} + and call.kwargs.get("entity") is DatabaseSchema + for call in metadata.list_all_entities.call_args_list ) \ No newline at end of file From 1d5e140824e5a61df9086ea2423e320c41ad1826 Mon Sep 17 00:00:00 2001 From: Muhammad Hassaan Saleem Date: Sat, 18 Apr 2026 06:33:29 +0500 Subject: [PATCH 04/16] Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../src/metadata/ingestion/source/database/trino/lineage.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 9f8db7b92fc4..2d4f40b37c2b 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -116,9 +116,11 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: if table1.name.root.lower() != table2.name.root.lower(): return False - if not table1.columns or not table2.columns: + if not table1.columns and not table2.columns: return True + if not table1.columns or not table2.columns: + return False return { column.name.root.lower() for column in table1.columns } == {column.name.root.lower() for column in table2.columns} From ef9ea9c9352b402b14c969f9c7c441f595bcb91b Mon Sep 17 00:00:00 2001 From: Muhammad Hassaan Saleem Date: Sat, 18 Apr 2026 06:38:28 +0500 Subject: [PATCH 05/16] Update ingestion/tests/unit/source/database/trino/test_lineage.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ingestion/tests/unit/source/database/trino/test_lineage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 987fdc0d6b70..3800e6f14ec0 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -74,7 +74,7 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): trino_table.databaseSchema.fullyQualifiedName.root = ( "repro_trino.postgres.source_schema" ) - trino_table.columns = [] + trino_table.columns = [_mock_column("id"), _mock_column("name")] wrong_table = MagicMock() wrong_table.id.root = "33333333-3333-3333-3333-333333333333" From edd702f30d962231a5dcc9c892e06034af27fbf9 Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Sat, 18 Apr 2026 07:15:29 +0500 Subject: [PATCH 06/16] refactor & formatiing issue resolved Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 132 +++++++++++------- .../source/database/trino/test_lineage.py | 19 ++- 2 files changed, 92 insertions(+), 59 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 2d4f40b37c2b..10702da8e52f 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -11,6 +11,7 @@ """ Trino lineage module """ + import traceback from typing import Dict, Iterable, Iterator, List, Optional @@ -121,9 +122,9 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: if not table1.columns or not table2.columns: return False - return { - column.name.root.lower() for column in table1.columns - } == {column.name.root.lower() for column in table2.columns} + return {column.name.root.lower() for column in table1.columns} == { + column.name.root.lower() for column in table2.columns + } def _get_cross_database_schema_fqn( self, @@ -135,7 +136,11 @@ def _get_cross_database_schema_fqn( if trino_table.databaseSchema and trino_table.databaseSchema.name: trino_schema_name = trino_table.databaseSchema.name.root - if not trino_schema_name and trino_table.fullyQualifiedName and trino_table.fullyQualifiedName.root: + if ( + not trino_schema_name + and trino_table.fullyQualifiedName + and trino_table.fullyQualifiedName.root + ): trino_table_fqn_parts = trino_table.fullyQualifiedName.root.split(".") if len(trino_table_fqn_parts) >= 4: trino_schema_name = trino_table_fqn_parts[-2] @@ -148,14 +153,17 @@ def _get_cross_database_schema_fqn( for cross_database_schema in self.metadata.list_all_entities( entity=DatabaseSchema, params={"database": cross_database_fqn} ): - if cross_database_schema.name and cross_database_schema.fullyQualifiedName: + if ( + cross_database_schema.name + and cross_database_schema.fullyQualifiedName + ): cross_database_schema_mapping[cross_database_fqn][ cross_database_schema.name.root.lower() ] = cross_database_schema.fullyQualifiedName.root - cross_database_schema_fqn = cross_database_schema_mapping[cross_database_fqn].get( - trino_schema_name.lower() - ) + cross_database_schema_fqn = cross_database_schema_mapping[ + cross_database_fqn + ].get(trino_schema_name.lower()) if cross_database_schema_fqn: return cross_database_schema_fqn @@ -172,9 +180,11 @@ def _get_case_insensitive_cross_database_table( for cross_database_table in self.metadata.list_all_entities( entity=Table, params={"databaseSchema": cross_database_schema_fqn} ): - cross_database_table_schema_mapping[cross_database_schema_fqn].setdefault( - cross_database_table.name.root.lower(), [] - ).append(cross_database_table) + cross_database_table_schema_mapping[ + cross_database_schema_fqn + ].setdefault(cross_database_table.name.root.lower(), []).append( + cross_database_table + ) for cross_database_table in cross_database_table_schema_mapping[ cross_database_schema_fqn @@ -199,14 +209,60 @@ def get_cross_database_lineage( from_entity=from_table, to_entity=to_table, column_lineage=column_lineage ) + def _get_cross_database_lineage_for_table( + self, + trino_database_fqn: str, + trino_table: Table, + *, + all_cross_database_fqns: List[str], + cross_database_table_fqn_mapping: Dict[str, Optional[Table]], + cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]], + cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]], + ) -> Optional[Either[AddLineageRequest]]: + trino_table_fqn = trino_table.fullyQualifiedName.root + for cross_database_fqn in all_cross_database_fqns: + cross_database_table_fqn = trino_table_fqn.replace( + trino_database_fqn, cross_database_fqn + ) + cross_database_schema_fqn = self._get_cross_database_schema_fqn( + cross_database_fqn, + trino_table, + cross_database_schema_fqn_mapping, + ) + if cross_database_table_fqn not in cross_database_table_fqn_mapping: + cross_database_table = self.metadata.get_by_name( + Table, fqn=cross_database_table_fqn + ) + if not cross_database_table and cross_database_schema_fqn: + cross_database_table = ( + self._get_case_insensitive_cross_database_table( + cross_database_schema_fqn, + trino_table, + cross_database_table_schema_mapping, + ) + ) + cross_database_table_fqn_mapping[cross_database_table_fqn] = ( + cross_database_table + ) + + cross_database_table = cross_database_table_fqn_mapping[ + cross_database_table_fqn + ] + if cross_database_table and self.check_same_table( + trino_table, cross_database_table + ): + return self.get_cross_database_lineage( + cross_database_table, trino_table + ) + + return None + def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: try: all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() cross_database_table_fqn_mapping = {} cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]] = {} - cross_database_table_schema_mapping: Dict[ - str, Dict[str, List[Table]] - ] = {} + cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]] = {} # Get all databases for the specified Trino service trino_databases = self.metadata.list_all_entities( @@ -221,44 +277,16 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: ) # NOTE: Currently, tables in system-defined schemas will also be checked for lineage. for trino_table in trino_tables: - trino_table_fqn = trino_table.fullyQualifiedName.root - for cross_database_fqn in all_cross_database_fqns: - # Construct the FQN for cross-database tables - cross_database_table_fqn = trino_table_fqn.replace( - trino_database_fqn, cross_database_fqn - ) - cross_database_schema_fqn = self._get_cross_database_schema_fqn( - cross_database_fqn, - trino_table, - cross_database_schema_fqn_mapping, - ) - if cross_database_table_fqn not in cross_database_table_fqn_mapping: - cross_database_table = self.metadata.get_by_name( - Table, fqn=cross_database_table_fqn - ) - if not cross_database_table and cross_database_schema_fqn: - cross_database_table = ( - self._get_case_insensitive_cross_database_table( - cross_database_schema_fqn, - trino_table, - cross_database_table_schema_mapping, - ) - ) - cross_database_table_fqn_mapping[ - cross_database_table_fqn - ] = cross_database_table - - cross_database_table = cross_database_table_fqn_mapping[ - cross_database_table_fqn - ] - # Create cross database lineage request if both tables are same - if cross_database_table and self.check_same_table( - trino_table, cross_database_table - ): - yield self.get_cross_database_lineage( - cross_database_table, trino_table - ) - break + cross_database_lineage = self._get_cross_database_lineage_for_table( + trino_database_fqn=trino_database_fqn, + trino_table=trino_table, + all_cross_database_fqns=all_cross_database_fqns, + cross_database_table_fqn_mapping=cross_database_table_fqn_mapping, + cross_database_schema_fqn_mapping=cross_database_schema_fqn_mapping, + cross_database_table_schema_mapping=cross_database_table_schema_mapping, + ) + if cross_database_lineage: + yield cross_database_lineage except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 3800e6f14ec0..15d5269834ad 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -78,7 +78,9 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): wrong_table = MagicMock() wrong_table.id.root = "33333333-3333-3333-3333-333333333333" - wrong_table.fullyQualifiedName.root = "repro_postgres.source_db.other_schema.customer" + wrong_table.fullyQualifiedName.root = ( + "repro_postgres.source_db.other_schema.customer" + ) wrong_table.name.root = "customer" wrong_table.databaseSchema.name.root = "other_schema" wrong_table.databaseSchema.fullyQualifiedName.root = ( @@ -88,7 +90,9 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): source_table = MagicMock() source_table.id.root = "22222222-2222-2222-2222-222222222222" - source_table.fullyQualifiedName.root = "repro_postgres.source_db.SOURCE_SCHEMA.CUSTOMER" + source_table.fullyQualifiedName.root = ( + "repro_postgres.source_db.SOURCE_SCHEMA.CUSTOMER" + ) source_table.name.root = "CUSTOMER" source_table.databaseSchema.name.root = "SOURCE_SCHEMA" source_table.databaseSchema.fullyQualifiedName.root = ( @@ -101,7 +105,9 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): return [trino_database] if entity is Database and params == {"service": "repro_postgres"}: return [source_database] - if entity is DatabaseSchema and params == {"database": "repro_postgres.source_db"}: + if entity is DatabaseSchema and params == { + "database": "repro_postgres.source_db" + }: return [source_schema] if entity is Table and params == {"database": "repro_trino.postgres"}: return [trino_table] @@ -129,13 +135,12 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): assert result[0].right == "cross-database-edge" mock_get_cross_database_lineage.assert_called_once_with(source_table, trino_table) assert any( - call.kwargs.get("params") == { - "databaseSchema": "repro_postgres.source_db.SOURCE_SCHEMA" - } + call.kwargs.get("params") + == {"databaseSchema": "repro_postgres.source_db.SOURCE_SCHEMA"} for call in metadata.list_all_entities.call_args_list ) assert any( call.kwargs.get("params") == {"database": "repro_postgres.source_db"} and call.kwargs.get("entity") is DatabaseSchema for call in metadata.list_all_entities.call_args_list - ) \ No newline at end of file + ) From 35fe5e0eb66dbe4289fafb529c1eecbb9a7ef2b8 Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Sun, 19 Apr 2026 06:02:44 +0500 Subject: [PATCH 07/16] fix bot suggestions Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 14 +++++++----- .../source/database/trino/test_lineage.py | 22 +++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 10702da8e52f..a5c6b71f5762 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -32,6 +32,7 @@ TRINO_QUERY_BATCH_SIZE, TrinoQueryParserSource, ) +from metadata.utils import fqn from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -141,7 +142,7 @@ def _get_cross_database_schema_fqn( and trino_table.fullyQualifiedName and trino_table.fullyQualifiedName.root ): - trino_table_fqn_parts = trino_table.fullyQualifiedName.root.split(".") + trino_table_fqn_parts = fqn.split(trino_table.fullyQualifiedName.root) if len(trino_table_fqn_parts) >= 4: trino_schema_name = trino_table_fqn_parts[-2] @@ -167,7 +168,7 @@ def _get_cross_database_schema_fqn( if cross_database_schema_fqn: return cross_database_schema_fqn - return f"{cross_database_fqn}.{trino_schema_name}" + return f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}" def _get_case_insensitive_cross_database_table( self, @@ -220,10 +221,13 @@ def _get_cross_database_lineage_for_table( cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]], ) -> Optional[Either[AddLineageRequest]]: trino_table_fqn = trino_table.fullyQualifiedName.root + trino_database_prefix = f"{trino_database_fqn}." + if not trino_table_fqn.startswith(trino_database_prefix): + return None + + trino_table_suffix = trino_table_fqn[len(trino_database_fqn) :] for cross_database_fqn in all_cross_database_fqns: - cross_database_table_fqn = trino_table_fqn.replace( - trino_database_fqn, cross_database_fqn - ) + cross_database_table_fqn = f"{cross_database_fqn}{trino_table_suffix}" cross_database_schema_fqn = self._get_cross_database_schema_fqn( cross_database_fqn, trino_table, diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 15d5269834ad..9994aecc71fc 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -144,3 +144,25 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): and call.kwargs.get("entity") is DatabaseSchema for call in metadata.list_all_entities.call_args_list ) + + +def test_get_cross_database_schema_fqn_parses_quoted_schema_from_fqn(): + """Quoted schema names with dots should be parsed from table FQN safely.""" + metadata = MagicMock() + metadata.list_all_entities.return_value = [] + + trino_table = MagicMock() + trino_table.databaseSchema = None + trino_table.fullyQualifiedName.root = ( + 'repro_trino.postgres."source.schema".customer' + ) + + lineage_source = TrinoLineageSourceTestDouble(metadata) + + result = lineage_source._get_cross_database_schema_fqn( + "repro_postgres.source_db", + trino_table, + {}, + ) + + assert result == 'repro_postgres.source_db."source.schema"' From 9695ea0a7031c8583d195235503d4fa8ac6c8ad5 Mon Sep 17 00:00:00 2001 From: Muhammad Hassaan Saleem Date: Sun, 19 Apr 2026 06:12:23 +0500 Subject: [PATCH 08/16] Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../source/database/trino/lineage.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index a5c6b71f5762..a456e3007586 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -228,23 +228,24 @@ def _get_cross_database_lineage_for_table( trino_table_suffix = trino_table_fqn[len(trino_database_fqn) :] for cross_database_fqn in all_cross_database_fqns: cross_database_table_fqn = f"{cross_database_fqn}{trino_table_suffix}" - cross_database_schema_fqn = self._get_cross_database_schema_fqn( - cross_database_fqn, - trino_table, - cross_database_schema_fqn_mapping, - ) if cross_database_table_fqn not in cross_database_table_fqn_mapping: cross_database_table = self.metadata.get_by_name( Table, fqn=cross_database_table_fqn ) - if not cross_database_table and cross_database_schema_fqn: - cross_database_table = ( - self._get_case_insensitive_cross_database_table( - cross_database_schema_fqn, - trino_table, - cross_database_table_schema_mapping, - ) + if not cross_database_table: + cross_database_schema_fqn = self._get_cross_database_schema_fqn( + cross_database_fqn, + trino_table, + cross_database_schema_fqn_mapping, ) + if cross_database_schema_fqn: + cross_database_table = ( + self._get_case_insensitive_cross_database_table( + cross_database_schema_fqn, + trino_table, + cross_database_table_schema_mapping, + ) + ) cross_database_table_fqn_mapping[cross_database_table_fqn] = ( cross_database_table ) From 490f04684ff5b4e805ba81e52bfc35375201d100 Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Sun, 19 Apr 2026 06:14:59 +0500 Subject: [PATCH 09/16] fix bot suggestions Signed-off-by: hassaansaleem28 --- .../tests/unit/source/database/trino/test_lineage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 9994aecc71fc..d31f529c505b 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -8,7 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Regression tests for Trino cross-database lineage.""" +"""Regression tests for Trino cross-database lineage (Issue #27419).""" from unittest.mock import MagicMock, patch @@ -37,7 +37,7 @@ def _mock_column(column_name): def test_check_same_table_is_case_insensitive_for_names_and_columns(): - """Table and column comparisons should ignore case.""" + """Issue #27419: table and column comparisons should ignore case.""" metadata = MagicMock() lineage_source = TrinoLineageSourceTestDouble(metadata) @@ -53,7 +53,7 @@ def test_check_same_table_is_case_insensitive_for_names_and_columns(): def test_yield_cross_database_lineage_finds_uppercase_source_table(): - """Trino cross-db lineage should resolve the uppercase Postgres source table.""" + """Issue #27419: resolve uppercase Postgres source table in cross-db lineage.""" metadata = MagicMock() trino_database = MagicMock() @@ -147,7 +147,7 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): def test_get_cross_database_schema_fqn_parses_quoted_schema_from_fqn(): - """Quoted schema names with dots should be parsed from table FQN safely.""" + """Issue #27419: parse quoted schema names with dots from table FQNs.""" metadata = MagicMock() metadata.list_all_entities.return_value = [] From 8bcc6624840abce4e1a470edf8109571aac12b4e Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Tue, 21 Apr 2026 15:10:20 +0500 Subject: [PATCH 10/16] fix formatting Signed-off-by: hassaansaleem28 --- .../src/metadata/ingestion/source/database/trino/lineage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index a456e3007586..64bc336da707 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -246,9 +246,9 @@ def _get_cross_database_lineage_for_table( cross_database_table_schema_mapping, ) ) - cross_database_table_fqn_mapping[cross_database_table_fqn] = ( - cross_database_table - ) + cross_database_table_fqn_mapping[ + cross_database_table_fqn + ] = cross_database_table cross_database_table = cross_database_table_fqn_mapping[ cross_database_table_fqn From 54984576b8b59d6a87c85425b337cf006f10560f Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Thu, 23 Apr 2026 12:22:08 +0500 Subject: [PATCH 11/16] avoid list_all scan Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 81 ++++++++++++++----- .../source/database/trino/test_lineage.py | 69 +++++++--------- 2 files changed, 92 insertions(+), 58 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 64bc336da707..3184f10bb224 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -19,7 +19,6 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, @@ -151,16 +150,6 @@ def _get_cross_database_schema_fqn( if cross_database_fqn not in cross_database_schema_mapping: cross_database_schema_mapping[cross_database_fqn] = {} - for cross_database_schema in self.metadata.list_all_entities( - entity=DatabaseSchema, params={"database": cross_database_fqn} - ): - if ( - cross_database_schema.name - and cross_database_schema.fullyQualifiedName - ): - cross_database_schema_mapping[cross_database_fqn][ - cross_database_schema.name.root.lower() - ] = cross_database_schema.fullyQualifiedName.root cross_database_schema_fqn = cross_database_schema_mapping[ cross_database_fqn @@ -168,7 +157,33 @@ def _get_cross_database_schema_fqn( if cross_database_schema_fqn: return cross_database_schema_fqn - return f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}" + cross_database_fqn_parts = fqn.split(cross_database_fqn) + if len(cross_database_fqn_parts) == 2: + cross_database_service_name, cross_database_name = cross_database_fqn_parts + cross_database_schemas = fqn.search_database_schema_from_es( + metadata=self.metadata, + database_name=cross_database_name, + schema_name=trino_schema_name, + service_name=cross_database_service_name, + fetch_multiple_entities=True, + fields="fullyQualifiedName,name", + ) + if cross_database_schemas: + for cross_database_schema in cross_database_schemas: + if ( + cross_database_schema.name + and cross_database_schema.fullyQualifiedName + ): + cross_database_schema_mapping[cross_database_fqn][ + cross_database_schema.name.root.lower() + ] = cross_database_schema.fullyQualifiedName.root + + return ( + cross_database_schema_mapping[cross_database_fqn].get( + trino_schema_name.lower() + ) + or f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}" + ) def _get_case_insensitive_cross_database_table( self, @@ -178,18 +193,44 @@ def _get_case_insensitive_cross_database_table( ) -> Optional[Table]: if cross_database_schema_fqn not in cross_database_table_schema_mapping: cross_database_table_schema_mapping[cross_database_schema_fqn] = {} - for cross_database_table in self.metadata.list_all_entities( - entity=Table, params={"databaseSchema": cross_database_schema_fqn} - ): - cross_database_table_schema_mapping[ - cross_database_schema_fqn - ].setdefault(cross_database_table.name.root.lower(), []).append( - cross_database_table + + table_key = trino_table.name.root.lower() + if ( + table_key + not in cross_database_table_schema_mapping[cross_database_schema_fqn] + ): + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = [] + cross_database_schema_fqn_parts = fqn.split(cross_database_schema_fqn) + if len(cross_database_schema_fqn_parts) == 3: + ( + cross_database_service_name, + cross_database_name, + cross_database_schema_name, + ) = cross_database_schema_fqn_parts + cross_database_tables = fqn.search_table_from_es( + metadata=self.metadata, + database_name=cross_database_name, + schema_name=cross_database_schema_name, + service_name=cross_database_service_name, + table_name=trino_table.name.root, + fetch_multiple_entities=True, + fields="fullyQualifiedName,name,columns,databaseSchema", ) + if cross_database_tables: + if isinstance(cross_database_tables, list): + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = cross_database_tables + else: + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = [cross_database_tables] for cross_database_table in cross_database_table_schema_mapping[ cross_database_schema_fqn - ].get(trino_table.name.root.lower(), []): + ].get(table_key, []): if self.check_same_table(trino_table, cross_database_table): return cross_database_table diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index d31f529c505b..98a7aad9a2e2 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -13,7 +13,6 @@ from unittest.mock import MagicMock, patch from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table from metadata.ingestion.api.models import Either from metadata.ingestion.source.database.trino.lineage import TrinoLineageSource @@ -76,18 +75,6 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): ) trino_table.columns = [_mock_column("id"), _mock_column("name")] - wrong_table = MagicMock() - wrong_table.id.root = "33333333-3333-3333-3333-333333333333" - wrong_table.fullyQualifiedName.root = ( - "repro_postgres.source_db.other_schema.customer" - ) - wrong_table.name.root = "customer" - wrong_table.databaseSchema.name.root = "other_schema" - wrong_table.databaseSchema.fullyQualifiedName.root = ( - "repro_postgres.source_db.other_schema" - ) - wrong_table.columns = [_mock_column("legacy")] - source_table = MagicMock() source_table.id.root = "22222222-2222-2222-2222-222222222222" source_table.fullyQualifiedName.root = ( @@ -105,18 +92,8 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): return [trino_database] if entity is Database and params == {"service": "repro_postgres"}: return [source_database] - if entity is DatabaseSchema and params == { - "database": "repro_postgres.source_db" - }: - return [source_schema] if entity is Table and params == {"database": "repro_trino.postgres"}: return [trino_table] - if entity is Table and params == { - "databaseSchema": "repro_postgres.source_db.SOURCE_SCHEMA" - }: - return [source_table] - if entity is Table and params == {"database": "repro_postgres.source_db"}: - return [wrong_table] return [] metadata.list_all_entities.side_effect = list_all_entities_side_effect @@ -128,28 +105,40 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): TrinoLineageSource, "get_cross_database_lineage", return_value=Either(right="cross-database-edge"), - ) as mock_get_cross_database_lineage: + ) as mock_get_cross_database_lineage, patch( + "metadata.ingestion.source.database.trino.lineage.fqn.search_database_schema_from_es", + return_value=[source_schema], + ) as mock_search_database_schema, patch( + "metadata.ingestion.source.database.trino.lineage.fqn.search_table_from_es", + return_value=[source_table], + ) as mock_search_table: result = list(lineage_source.yield_cross_database_lineage()) assert len(result) == 1 assert result[0].right == "cross-database-edge" mock_get_cross_database_lineage.assert_called_once_with(source_table, trino_table) - assert any( - call.kwargs.get("params") - == {"databaseSchema": "repro_postgres.source_db.SOURCE_SCHEMA"} - for call in metadata.list_all_entities.call_args_list + mock_search_database_schema.assert_called_once_with( + metadata=metadata, + database_name="source_db", + schema_name="source_schema", + service_name="repro_postgres", + fetch_multiple_entities=True, + fields="fullyQualifiedName,name", ) - assert any( - call.kwargs.get("params") == {"database": "repro_postgres.source_db"} - and call.kwargs.get("entity") is DatabaseSchema - for call in metadata.list_all_entities.call_args_list + mock_search_table.assert_called_once_with( + metadata=metadata, + database_name="source_db", + schema_name="SOURCE_SCHEMA", + service_name="repro_postgres", + table_name="customer", + fetch_multiple_entities=True, + fields="fullyQualifiedName,name,columns,databaseSchema", ) def test_get_cross_database_schema_fqn_parses_quoted_schema_from_fqn(): """Issue #27419: parse quoted schema names with dots from table FQNs.""" metadata = MagicMock() - metadata.list_all_entities.return_value = [] trino_table = MagicMock() trino_table.databaseSchema = None @@ -159,10 +148,14 @@ def test_get_cross_database_schema_fqn_parses_quoted_schema_from_fqn(): lineage_source = TrinoLineageSourceTestDouble(metadata) - result = lineage_source._get_cross_database_schema_fqn( - "repro_postgres.source_db", - trino_table, - {}, - ) + with patch( + "metadata.ingestion.source.database.trino.lineage.fqn.search_database_schema_from_es", + return_value=None, + ): + result = lineage_source._get_cross_database_schema_fqn( + "repro_postgres.source_db", + trino_table, + {}, + ) assert result == 'repro_postgres.source_db."source.schema"' From ce92229d8fc681696f09ab6f28078a41dafe3f16 Mon Sep 17 00:00:00 2001 From: Muhammad Hassaan Saleem Date: Thu, 23 Apr 2026 12:32:25 +0500 Subject: [PATCH 12/16] Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../src/metadata/ingestion/source/database/trino/lineage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 3184f10bb224..f2b3aa3d90e4 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -266,7 +266,7 @@ def _get_cross_database_lineage_for_table( if not trino_table_fqn.startswith(trino_database_prefix): return None - trino_table_suffix = trino_table_fqn[len(trino_database_fqn) :] + trino_table_suffix = trino_table_fqn[len(trino_database_fqn):] for cross_database_fqn in all_cross_database_fqns: cross_database_table_fqn = f"{cross_database_fqn}{trino_table_suffix}" if cross_database_table_fqn not in cross_database_table_fqn_mapping: From 7701067731254b817d8add926d8b3e9e648ef1e9 Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Thu, 23 Apr 2026 15:22:46 +0500 Subject: [PATCH 13/16] refactor and fix format Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 96 ++++++++++++------- .../source/database/trino/test_lineage.py | 32 ++++--- 2 files changed, 80 insertions(+), 48 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index f2b3aa3d90e4..f2dd1cd23e74 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -105,16 +105,23 @@ def get_cross_database_fqn_from_service_names(self) -> List[str]: return [ database.fullyQualifiedName.root for service in database_service_names - for database in self.metadata.list_all_entities( - entity=Database, params={"service": service} + for database in fqn.search_database_from_es( + metadata=self.metadata, + service_name=service, + database_name="*", + fetch_multiple_entities=True, ) + or [] ] def check_same_table(self, table1: Table, table2: Table) -> bool: """ Method to check whether the table1 and table2 are same """ - if table1.name.root.lower() != table2.name.root.lower(): + if ( + fqn.unquote_name(table1.name.root).lower() + != fqn.unquote_name(table2.name.root).lower() + ): return False if not table1.columns and not table2.columns: @@ -122,9 +129,9 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: if not table1.columns or not table2.columns: return False - return {column.name.root.lower() for column in table1.columns} == { - column.name.root.lower() for column in table2.columns - } + return { + fqn.unquote_name(column.name.root).lower() for column in table1.columns + } == {fqn.unquote_name(column.name.root).lower() for column in table2.columns} def _get_cross_database_schema_fqn( self, @@ -148,14 +155,17 @@ def _get_cross_database_schema_fqn( if not trino_schema_name: return None + unquoted_trino_schema = fqn.unquote_name(trino_schema_name) + schema_key = unquoted_trino_schema.lower() + if cross_database_fqn not in cross_database_schema_mapping: cross_database_schema_mapping[cross_database_fqn] = {} - cross_database_schema_fqn = cross_database_schema_mapping[ - cross_database_fqn - ].get(trino_schema_name.lower()) - if cross_database_schema_fqn: - return cross_database_schema_fqn + if schema_key in cross_database_schema_mapping[cross_database_fqn]: + return cross_database_schema_mapping[cross_database_fqn][schema_key] + + fallback_fqn = f"{cross_database_fqn}.{fqn.quote_name(unquoted_trino_schema)}" + cross_database_schema_mapping[cross_database_fqn][schema_key] = fallback_fqn cross_database_fqn_parts = fqn.split(cross_database_fqn) if len(cross_database_fqn_parts) == 2: @@ -163,7 +173,7 @@ def _get_cross_database_schema_fqn( cross_database_schemas = fqn.search_database_schema_from_es( metadata=self.metadata, database_name=cross_database_name, - schema_name=trino_schema_name, + schema_name=unquoted_trino_schema, service_name=cross_database_service_name, fetch_multiple_entities=True, fields="fullyQualifiedName,name", @@ -174,16 +184,14 @@ def _get_cross_database_schema_fqn( cross_database_schema.name and cross_database_schema.fullyQualifiedName ): + schema_name_key = fqn.unquote_name( + cross_database_schema.name.root + ).lower() cross_database_schema_mapping[cross_database_fqn][ - cross_database_schema.name.root.lower() + schema_name_key ] = cross_database_schema.fullyQualifiedName.root - return ( - cross_database_schema_mapping[cross_database_fqn].get( - trino_schema_name.lower() - ) - or f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}" - ) + return cross_database_schema_mapping[cross_database_fqn][schema_key] def _get_case_insensitive_cross_database_table( self, @@ -194,7 +202,8 @@ def _get_case_insensitive_cross_database_table( if cross_database_schema_fqn not in cross_database_table_schema_mapping: cross_database_table_schema_mapping[cross_database_schema_fqn] = {} - table_key = trino_table.name.root.lower() + unquoted_table_name = fqn.unquote_name(trino_table.name.root) + table_key = unquoted_table_name.lower() if ( table_key not in cross_database_table_schema_mapping[cross_database_schema_fqn] @@ -214,19 +223,25 @@ def _get_case_insensitive_cross_database_table( database_name=cross_database_name, schema_name=cross_database_schema_name, service_name=cross_database_service_name, - table_name=trino_table.name.root, + table_name=unquoted_table_name, fetch_multiple_entities=True, fields="fullyQualifiedName,name,columns,databaseSchema", ) if cross_database_tables: - if isinstance(cross_database_tables, list): - cross_database_table_schema_mapping[cross_database_schema_fqn][ - table_key - ] = cross_database_tables - else: - cross_database_table_schema_mapping[cross_database_schema_fqn][ - table_key - ] = [cross_database_tables] + if not isinstance(cross_database_tables, list): + cross_database_tables = [cross_database_tables] + for cross_database_table in cross_database_tables: + if cross_database_table.name and cross_database_table.name.root: + cross_database_table_schema_mapping[ + cross_database_schema_fqn + ].setdefault( + fqn.unquote_name( + cross_database_table.name.root + ).lower(), + [], + ).append( + cross_database_table + ) for cross_database_table in cross_database_table_schema_mapping[ cross_database_schema_fqn @@ -311,15 +326,30 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]] = {} # Get all databases for the specified Trino service - trino_databases = self.metadata.list_all_entities( - entity=Database, params={"service": self.config.serviceName} + trino_databases = ( + fqn.search_database_from_es( + metadata=self.metadata, + service_name=self.config.serviceName, + database_name="*", + fetch_multiple_entities=True, + ) + or [] ) for trino_database in trino_databases: trino_database_fqn = trino_database.fullyQualifiedName.root # Get all tables for the specified Trino database schema - trino_tables = self.metadata.list_all_entities( - entity=Table, params={"database": trino_database_fqn} + trino_tables = ( + fqn.search_table_from_es( + metadata=self.metadata, + service_name=self.config.serviceName, + database_name=trino_database.name.root, + schema_name="*", + table_name="*", + fetch_multiple_entities=True, + fields="fullyQualifiedName,name,columns,databaseSchema", + ) + or [] ) # NOTE: Currently, tables in system-defined schemas will also be checked for lineage. for trino_table in trino_tables: diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 98a7aad9a2e2..7fcfecd956b6 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -87,16 +87,23 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): ) source_table.columns = [_mock_column("id"), _mock_column("name")] - def list_all_entities_side_effect(entity, params=None, **_kwargs): - if entity is Database and params == {"service": "repro_trino"}: + def search_database_from_es_side_effect(**kwargs): + service_name = kwargs.get("service_name") + if service_name == "repro_trino": return [trino_database] - if entity is Database and params == {"service": "repro_postgres"}: + if service_name == "repro_postgres": return [source_database] - if entity is Table and params == {"database": "repro_trino.postgres"}: + return [] + + def search_table_from_es_side_effect(**kwargs): + service_name = kwargs.get("service_name") + table_name = kwargs.get("table_name") + if service_name == "repro_trino" and table_name == "*": return [trino_table] + if service_name == "repro_postgres" and table_name == "customer": + return [source_table] return [] - metadata.list_all_entities.side_effect = list_all_entities_side_effect metadata.get_by_name.return_value = None lineage_source = TrinoLineageSourceTestDouble(metadata) @@ -109,8 +116,11 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): "metadata.ingestion.source.database.trino.lineage.fqn.search_database_schema_from_es", return_value=[source_schema], ) as mock_search_database_schema, patch( + "metadata.ingestion.source.database.trino.lineage.fqn.search_database_from_es", + side_effect=search_database_from_es_side_effect, + ) as mock_search_database, patch( "metadata.ingestion.source.database.trino.lineage.fqn.search_table_from_es", - return_value=[source_table], + side_effect=search_table_from_es_side_effect, ) as mock_search_table: result = list(lineage_source.yield_cross_database_lineage()) @@ -125,15 +135,7 @@ def list_all_entities_side_effect(entity, params=None, **_kwargs): fetch_multiple_entities=True, fields="fullyQualifiedName,name", ) - mock_search_table.assert_called_once_with( - metadata=metadata, - database_name="source_db", - schema_name="SOURCE_SCHEMA", - service_name="repro_postgres", - table_name="customer", - fetch_multiple_entities=True, - fields="fullyQualifiedName,name,columns,databaseSchema", - ) + # mock_search_table is called multiple times, so we don't assert_called_once_with def test_get_cross_database_schema_fqn_parses_quoted_schema_from_fqn(): From 490f8841fc5b3257dfaedbefa23bdb85ff38528a Mon Sep 17 00:00:00 2001 From: hassaansaleem28 Date: Thu, 23 Apr 2026 18:22:55 +0500 Subject: [PATCH 14/16] resolve bot suggestions Signed-off-by: hassaansaleem28 --- .../source/database/trino/lineage.py | 98 +++++++------------ .../source/database/trino/test_lineage.py | 32 +++--- 2 files changed, 49 insertions(+), 81 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index f2dd1cd23e74..3184f10bb224 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -105,23 +105,16 @@ def get_cross_database_fqn_from_service_names(self) -> List[str]: return [ database.fullyQualifiedName.root for service in database_service_names - for database in fqn.search_database_from_es( - metadata=self.metadata, - service_name=service, - database_name="*", - fetch_multiple_entities=True, + for database in self.metadata.list_all_entities( + entity=Database, params={"service": service} ) - or [] ] def check_same_table(self, table1: Table, table2: Table) -> bool: """ Method to check whether the table1 and table2 are same """ - if ( - fqn.unquote_name(table1.name.root).lower() - != fqn.unquote_name(table2.name.root).lower() - ): + if table1.name.root.lower() != table2.name.root.lower(): return False if not table1.columns and not table2.columns: @@ -129,9 +122,9 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: if not table1.columns or not table2.columns: return False - return { - fqn.unquote_name(column.name.root).lower() for column in table1.columns - } == {fqn.unquote_name(column.name.root).lower() for column in table2.columns} + return {column.name.root.lower() for column in table1.columns} == { + column.name.root.lower() for column in table2.columns + } def _get_cross_database_schema_fqn( self, @@ -155,17 +148,14 @@ def _get_cross_database_schema_fqn( if not trino_schema_name: return None - unquoted_trino_schema = fqn.unquote_name(trino_schema_name) - schema_key = unquoted_trino_schema.lower() - if cross_database_fqn not in cross_database_schema_mapping: cross_database_schema_mapping[cross_database_fqn] = {} - if schema_key in cross_database_schema_mapping[cross_database_fqn]: - return cross_database_schema_mapping[cross_database_fqn][schema_key] - - fallback_fqn = f"{cross_database_fqn}.{fqn.quote_name(unquoted_trino_schema)}" - cross_database_schema_mapping[cross_database_fqn][schema_key] = fallback_fqn + cross_database_schema_fqn = cross_database_schema_mapping[ + cross_database_fqn + ].get(trino_schema_name.lower()) + if cross_database_schema_fqn: + return cross_database_schema_fqn cross_database_fqn_parts = fqn.split(cross_database_fqn) if len(cross_database_fqn_parts) == 2: @@ -173,7 +163,7 @@ def _get_cross_database_schema_fqn( cross_database_schemas = fqn.search_database_schema_from_es( metadata=self.metadata, database_name=cross_database_name, - schema_name=unquoted_trino_schema, + schema_name=trino_schema_name, service_name=cross_database_service_name, fetch_multiple_entities=True, fields="fullyQualifiedName,name", @@ -184,14 +174,16 @@ def _get_cross_database_schema_fqn( cross_database_schema.name and cross_database_schema.fullyQualifiedName ): - schema_name_key = fqn.unquote_name( - cross_database_schema.name.root - ).lower() cross_database_schema_mapping[cross_database_fqn][ - schema_name_key + cross_database_schema.name.root.lower() ] = cross_database_schema.fullyQualifiedName.root - return cross_database_schema_mapping[cross_database_fqn][schema_key] + return ( + cross_database_schema_mapping[cross_database_fqn].get( + trino_schema_name.lower() + ) + or f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}" + ) def _get_case_insensitive_cross_database_table( self, @@ -202,8 +194,7 @@ def _get_case_insensitive_cross_database_table( if cross_database_schema_fqn not in cross_database_table_schema_mapping: cross_database_table_schema_mapping[cross_database_schema_fqn] = {} - unquoted_table_name = fqn.unquote_name(trino_table.name.root) - table_key = unquoted_table_name.lower() + table_key = trino_table.name.root.lower() if ( table_key not in cross_database_table_schema_mapping[cross_database_schema_fqn] @@ -223,25 +214,19 @@ def _get_case_insensitive_cross_database_table( database_name=cross_database_name, schema_name=cross_database_schema_name, service_name=cross_database_service_name, - table_name=unquoted_table_name, + table_name=trino_table.name.root, fetch_multiple_entities=True, fields="fullyQualifiedName,name,columns,databaseSchema", ) if cross_database_tables: - if not isinstance(cross_database_tables, list): - cross_database_tables = [cross_database_tables] - for cross_database_table in cross_database_tables: - if cross_database_table.name and cross_database_table.name.root: - cross_database_table_schema_mapping[ - cross_database_schema_fqn - ].setdefault( - fqn.unquote_name( - cross_database_table.name.root - ).lower(), - [], - ).append( - cross_database_table - ) + if isinstance(cross_database_tables, list): + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = cross_database_tables + else: + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = [cross_database_tables] for cross_database_table in cross_database_table_schema_mapping[ cross_database_schema_fqn @@ -281,7 +266,7 @@ def _get_cross_database_lineage_for_table( if not trino_table_fqn.startswith(trino_database_prefix): return None - trino_table_suffix = trino_table_fqn[len(trino_database_fqn):] + trino_table_suffix = trino_table_fqn[len(trino_database_fqn) :] for cross_database_fqn in all_cross_database_fqns: cross_database_table_fqn = f"{cross_database_fqn}{trino_table_suffix}" if cross_database_table_fqn not in cross_database_table_fqn_mapping: @@ -326,30 +311,15 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]] = {} # Get all databases for the specified Trino service - trino_databases = ( - fqn.search_database_from_es( - metadata=self.metadata, - service_name=self.config.serviceName, - database_name="*", - fetch_multiple_entities=True, - ) - or [] + trino_databases = self.metadata.list_all_entities( + entity=Database, params={"service": self.config.serviceName} ) for trino_database in trino_databases: trino_database_fqn = trino_database.fullyQualifiedName.root # Get all tables for the specified Trino database schema - trino_tables = ( - fqn.search_table_from_es( - metadata=self.metadata, - service_name=self.config.serviceName, - database_name=trino_database.name.root, - schema_name="*", - table_name="*", - fetch_multiple_entities=True, - fields="fullyQualifiedName,name,columns,databaseSchema", - ) - or [] + trino_tables = self.metadata.list_all_entities( + entity=Table, params={"database": trino_database_fqn} ) # NOTE: Currently, tables in system-defined schemas will also be checked for lineage. for trino_table in trino_tables: diff --git a/ingestion/tests/unit/source/database/trino/test_lineage.py b/ingestion/tests/unit/source/database/trino/test_lineage.py index 7fcfecd956b6..98a7aad9a2e2 100644 --- a/ingestion/tests/unit/source/database/trino/test_lineage.py +++ b/ingestion/tests/unit/source/database/trino/test_lineage.py @@ -87,23 +87,16 @@ def test_yield_cross_database_lineage_finds_uppercase_source_table(): ) source_table.columns = [_mock_column("id"), _mock_column("name")] - def search_database_from_es_side_effect(**kwargs): - service_name = kwargs.get("service_name") - if service_name == "repro_trino": + def list_all_entities_side_effect(entity, params=None, **_kwargs): + if entity is Database and params == {"service": "repro_trino"}: return [trino_database] - if service_name == "repro_postgres": + if entity is Database and params == {"service": "repro_postgres"}: return [source_database] - return [] - - def search_table_from_es_side_effect(**kwargs): - service_name = kwargs.get("service_name") - table_name = kwargs.get("table_name") - if service_name == "repro_trino" and table_name == "*": + if entity is Table and params == {"database": "repro_trino.postgres"}: return [trino_table] - if service_name == "repro_postgres" and table_name == "customer": - return [source_table] return [] + metadata.list_all_entities.side_effect = list_all_entities_side_effect metadata.get_by_name.return_value = None lineage_source = TrinoLineageSourceTestDouble(metadata) @@ -116,11 +109,8 @@ def search_table_from_es_side_effect(**kwargs): "metadata.ingestion.source.database.trino.lineage.fqn.search_database_schema_from_es", return_value=[source_schema], ) as mock_search_database_schema, patch( - "metadata.ingestion.source.database.trino.lineage.fqn.search_database_from_es", - side_effect=search_database_from_es_side_effect, - ) as mock_search_database, patch( "metadata.ingestion.source.database.trino.lineage.fqn.search_table_from_es", - side_effect=search_table_from_es_side_effect, + return_value=[source_table], ) as mock_search_table: result = list(lineage_source.yield_cross_database_lineage()) @@ -135,7 +125,15 @@ def search_table_from_es_side_effect(**kwargs): fetch_multiple_entities=True, fields="fullyQualifiedName,name", ) - # mock_search_table is called multiple times, so we don't assert_called_once_with + mock_search_table.assert_called_once_with( + metadata=metadata, + database_name="source_db", + schema_name="SOURCE_SCHEMA", + service_name="repro_postgres", + table_name="customer", + fetch_multiple_entities=True, + fields="fullyQualifiedName,name,columns,databaseSchema", + ) def test_get_cross_database_schema_fqn_parses_quoted_schema_from_fqn(): From 6dcf6685f879b310f36c9d1ae6b22138e27eae3f Mon Sep 17 00:00:00 2001 From: Muhammad Hassaan Saleem Date: Thu, 23 Apr 2026 18:31:08 +0500 Subject: [PATCH 15/16] Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../ingestion/source/database/trino/lineage.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 3184f10bb224..06e7baf8efc3 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -219,14 +219,9 @@ def _get_case_insensitive_cross_database_table( fields="fullyQualifiedName,name,columns,databaseSchema", ) if cross_database_tables: - if isinstance(cross_database_tables, list): - cross_database_table_schema_mapping[cross_database_schema_fqn][ - table_key - ] = cross_database_tables - else: - cross_database_table_schema_mapping[cross_database_schema_fqn][ - table_key - ] = [cross_database_tables] + cross_database_table_schema_mapping[cross_database_schema_fqn][ + table_key + ] = cross_database_tables for cross_database_table in cross_database_table_schema_mapping[ cross_database_schema_fqn From d737420c841ca9fee4a5d5939927b72c6024509c Mon Sep 17 00:00:00 2001 From: Muhammad Hassaan Saleem Date: Thu, 23 Apr 2026 18:42:14 +0500 Subject: [PATCH 16/16] Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../src/metadata/ingestion/source/database/trino/lineage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 06e7baf8efc3..3ee66b92cc8d 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -214,7 +214,7 @@ def _get_case_insensitive_cross_database_table( database_name=cross_database_name, schema_name=cross_database_schema_name, service_name=cross_database_service_name, - table_name=trino_table.name.root, + table_name=table_key, fetch_multiple_entities=True, fields="fullyQualifiedName,name,columns,databaseSchema", )