Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
162bcb1
fix(ingestion): make Trino cross db lineage case insensitive
hassaansaleem28 Apr 17, 2026
3a4b7cc
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 17, 2026
674b6d4
[200~fix(trino-lineage): scope case-insensitive cross-db matching to…
hassaansaleem28 Apr 18, 2026
3edcaf7
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 18, 2026
8124088
fix(trino-lineage): resolve cross-db schema FQN case-insensitively
hassaansaleem28 Apr 18, 2026
1d5e140
Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py
hassaansaleem28 Apr 18, 2026
ef9ea9c
Update ingestion/tests/unit/source/database/trino/test_lineage.py
hassaansaleem28 Apr 18, 2026
edd702f
refactor & formatiing issue resolved
hassaansaleem28 Apr 18, 2026
a1da213
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 18, 2026
35fe5e0
fix bot suggestions
hassaansaleem28 Apr 19, 2026
f40276e
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 19, 2026
9695ea0
Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py
hassaansaleem28 Apr 19, 2026
490f046
fix bot suggestions
hassaansaleem28 Apr 19, 2026
a8c9f3a
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 21, 2026
8bcc662
fix formatting
hassaansaleem28 Apr 21, 2026
ef17a63
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 21, 2026
5498457
avoid list_all scan
hassaansaleem28 Apr 23, 2026
09a8783
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 23, 2026
3bcfcd4
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 23, 2026
ce92229
Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py
hassaansaleem28 Apr 23, 2026
7701067
refactor and fix format
hassaansaleem28 Apr 23, 2026
9d54b93
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 23, 2026
490f884
resolve bot suggestions
hassaansaleem28 Apr 23, 2026
55f984c
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 23, 2026
6dcf668
Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py
hassaansaleem28 Apr 23, 2026
db9d8c2
Merge branch 'main' into fix/27419-lineage-ingestion
hassaansaleem28 Apr 23, 2026
d737420
Update ingestion/src/metadata/ingestion/source/database/trino/lineage.py
hassaansaleem28 Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 183 additions & 27 deletions ingestion/src/metadata/ingestion/source/database/trino/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
"""
Trino lineage module
"""

import traceback
from typing import Iterable, Iterator, List
from typing import Dict, Iterable, Iterator, List, Optional

from sqlalchemy import text

Expand All @@ -30,6 +31,7 @@
TRINO_QUERY_BATCH_SIZE,
TrinoQueryParserSource,
)
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand Down Expand Up @@ -112,9 +114,122 @@
"""
Method to check whether the table1 and table2 are same
"""
return table1.name.root == table2.name.root and {
column.name.root for column in table1.columns
} == {column.name.root for column in table2.columns}
if table1.name.root.lower() != table2.name.root.lower():
return False

if not table1.columns and not table2.columns:
return True

if not table1.columns or not table2.columns:
return False
return {column.name.root.lower() for column in table1.columns} == {
column.name.root.lower() for column in table2.columns
}

def _get_cross_database_schema_fqn(

Check failure on line 129 in ingestion/src/metadata/ingestion/source/database/trino/lineage.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 21 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ25eUv73UNAwJQZ7Krx&open=AZ25eUv73UNAwJQZ7Krx&pullRequest=27495
self,
cross_database_fqn: str,
trino_table: Table,
cross_database_schema_mapping: Dict[str, Dict[str, str]],
) -> Optional[str]:
trino_schema_name = None
if trino_table.databaseSchema and trino_table.databaseSchema.name:
trino_schema_name = trino_table.databaseSchema.name.root

if (
not trino_schema_name
and trino_table.fullyQualifiedName
and trino_table.fullyQualifiedName.root
):
trino_table_fqn_parts = fqn.split(trino_table.fullyQualifiedName.root)
if len(trino_table_fqn_parts) >= 4:
trino_schema_name = trino_table_fqn_parts[-2]

Comment thread
hassaansaleem28 marked this conversation as resolved.
if not trino_schema_name:
return None

if cross_database_fqn not in cross_database_schema_mapping:
cross_database_schema_mapping[cross_database_fqn] = {}

cross_database_schema_fqn = cross_database_schema_mapping[
cross_database_fqn
].get(trino_schema_name.lower())
if cross_database_schema_fqn:
return cross_database_schema_fqn

cross_database_fqn_parts = fqn.split(cross_database_fqn)
if len(cross_database_fqn_parts) == 2:
cross_database_service_name, cross_database_name = cross_database_fqn_parts
cross_database_schemas = fqn.search_database_schema_from_es(
metadata=self.metadata,
database_name=cross_database_name,
Comment thread
gitar-bot[bot] marked this conversation as resolved.
schema_name=trino_schema_name,
service_name=cross_database_service_name,
fetch_multiple_entities=True,
fields="fullyQualifiedName,name",
)
if cross_database_schemas:
for cross_database_schema in cross_database_schemas:
if (
cross_database_schema.name
and cross_database_schema.fullyQualifiedName
):
cross_database_schema_mapping[cross_database_fqn][
cross_database_schema.name.root.lower()
] = cross_database_schema.fullyQualifiedName.root

return (
cross_database_schema_mapping[cross_database_fqn].get(
trino_schema_name.lower()
)
or f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}"
)

def _get_case_insensitive_cross_database_table(
self,
cross_database_schema_fqn: str,
trino_table: Table,
cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]],
) -> Optional[Table]:
if cross_database_schema_fqn not in cross_database_table_schema_mapping:
cross_database_table_schema_mapping[cross_database_schema_fqn] = {}

table_key = trino_table.name.root.lower()
if (
table_key
not in cross_database_table_schema_mapping[cross_database_schema_fqn]
):
cross_database_table_schema_mapping[cross_database_schema_fqn][
table_key
] = []
cross_database_schema_fqn_parts = fqn.split(cross_database_schema_fqn)
if len(cross_database_schema_fqn_parts) == 3:
(
cross_database_service_name,
cross_database_name,
cross_database_schema_name,
) = cross_database_schema_fqn_parts
cross_database_tables = fqn.search_table_from_es(
metadata=self.metadata,
database_name=cross_database_name,
schema_name=cross_database_schema_name,
service_name=cross_database_service_name,
table_name=table_key,
fetch_multiple_entities=True,
fields="fullyQualifiedName,name,columns,databaseSchema",
)
if cross_database_tables:
cross_database_table_schema_mapping[cross_database_schema_fqn][
table_key
] = cross_database_tables

for cross_database_table in cross_database_table_schema_mapping[
cross_database_schema_fqn
].get(table_key, []):
if self.check_same_table(trino_table, cross_database_table):
return cross_database_table

return None

def get_cross_database_lineage(
self, from_table: Table, to_table: Table
Expand All @@ -131,10 +246,64 @@
from_entity=from_table, to_entity=to_table, column_lineage=column_lineage
)

def _get_cross_database_lineage_for_table(
self,
trino_database_fqn: str,
trino_table: Table,
*,
all_cross_database_fqns: List[str],
cross_database_table_fqn_mapping: Dict[str, Optional[Table]],
cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]],
cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]],
) -> Optional[Either[AddLineageRequest]]:
trino_table_fqn = trino_table.fullyQualifiedName.root
trino_database_prefix = f"{trino_database_fqn}."
if not trino_table_fqn.startswith(trino_database_prefix):
return None

trino_table_suffix = trino_table_fqn[len(trino_database_fqn) :]
for cross_database_fqn in all_cross_database_fqns:
cross_database_table_fqn = f"{cross_database_fqn}{trino_table_suffix}"
if cross_database_table_fqn not in cross_database_table_fqn_mapping:
cross_database_table = self.metadata.get_by_name(
Table, fqn=cross_database_table_fqn
)
if not cross_database_table:
cross_database_schema_fqn = self._get_cross_database_schema_fqn(
cross_database_fqn,
trino_table,
cross_database_schema_fqn_mapping,
)
if cross_database_schema_fqn:
cross_database_table = (
self._get_case_insensitive_cross_database_table(
cross_database_schema_fqn,
trino_table,
cross_database_table_schema_mapping,
)
)
cross_database_table_fqn_mapping[
cross_database_table_fqn
] = cross_database_table

cross_database_table = cross_database_table_fqn_mapping[
cross_database_table_fqn
]
if cross_database_table and self.check_same_table(
trino_table, cross_database_table
):
return self.get_cross_database_lineage(
cross_database_table, trino_table
)

return None

def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]:
try:
all_cross_database_fqns = self.get_cross_database_fqn_from_service_names()
cross_database_table_fqn_mapping = {}
cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]] = {}
cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]] = {}

# Get all databases for the specified Trino service
trino_databases = self.metadata.list_all_entities(
Expand All @@ -149,29 +318,16 @@
)
# NOTE: Currently, tables in system-defined schemas will also be checked for lineage.
for trino_table in trino_tables:
trino_table_fqn = trino_table.fullyQualifiedName.root
for cross_database_fqn in all_cross_database_fqns:
# Construct the FQN for cross-database tables
cross_database_table_fqn = trino_table_fqn.replace(
trino_database_fqn, cross_database_fqn
)
# Cache cross-database table against its FQN to avoid repeated API calls
cross_database_table = cross_database_table_fqn_mapping[
cross_database_table_fqn
] = cross_database_table_fqn_mapping.get(
cross_database_table_fqn,
self.metadata.get_by_name(
Table, fqn=cross_database_table_fqn
),
)
# Create cross database lineage request if both tables are same
if cross_database_table and self.check_same_table(
trino_table, cross_database_table
):
yield self.get_cross_database_lineage(
cross_database_table, trino_table
)
break
cross_database_lineage = self._get_cross_database_lineage_for_table(
trino_database_fqn=trino_database_fqn,
trino_table=trino_table,
all_cross_database_fqns=all_cross_database_fqns,
cross_database_table_fqn_mapping=cross_database_table_fqn_mapping,
cross_database_schema_fqn_mapping=cross_database_schema_fqn_mapping,
cross_database_table_schema_mapping=cross_database_table_schema_mapping,
)
if cross_database_lineage:
yield cross_database_lineage
except Exception as exc:
yield Either(
left=StackTraceError(
Expand Down
Loading
Loading