-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fixes #27419: Trino cross-database lineage for case-insensitive table names #27495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 26 commits
162bcb1
3a4b7cc
674b6d4
3edcaf7
8124088
1d5e140
ef9ea9c
edd702f
a1da213
35fe5e0
f40276e
9695ea0
490f046
a8c9f3a
8bcc662
ef17a63
5498457
09a8783
3bcfcd4
ce92229
7701067
9d54b93
490f884
55f984c
6dcf668
db9d8c2
d737420
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,8 +11,9 @@ | |
| """ | ||
| Trino lineage module | ||
| """ | ||
|
|
||
| import traceback | ||
| from typing import Iterable, Iterator, List | ||
| from typing import Dict, Iterable, Iterator, List, Optional | ||
|
|
||
| from sqlalchemy import text | ||
|
|
||
|
|
@@ -30,6 +31,7 @@ | |
| TRINO_QUERY_BATCH_SIZE, | ||
| TrinoQueryParserSource, | ||
| ) | ||
| from metadata.utils import fqn | ||
| from metadata.utils.logger import ingestion_logger | ||
|
|
||
| logger = ingestion_logger() | ||
|
|
@@ -112,9 +114,122 @@ def check_same_table(self, table1: Table, table2: Table) -> bool: | |
| """ | ||
| Method to check whether the table1 and table2 are same | ||
| """ | ||
| return table1.name.root == table2.name.root and { | ||
| column.name.root for column in table1.columns | ||
| } == {column.name.root for column in table2.columns} | ||
| if table1.name.root.lower() != table2.name.root.lower(): | ||
| return False | ||
|
|
||
| if not table1.columns and not table2.columns: | ||
| return True | ||
|
|
||
| if not table1.columns or not table2.columns: | ||
| return False | ||
| return {column.name.root.lower() for column in table1.columns} == { | ||
| column.name.root.lower() for column in table2.columns | ||
| } | ||
|
|
||
| def _get_cross_database_schema_fqn( | ||
| self, | ||
| cross_database_fqn: str, | ||
| trino_table: Table, | ||
| cross_database_schema_mapping: Dict[str, Dict[str, str]], | ||
| ) -> Optional[str]: | ||
| trino_schema_name = None | ||
| if trino_table.databaseSchema and trino_table.databaseSchema.name: | ||
| trino_schema_name = trino_table.databaseSchema.name.root | ||
|
|
||
| if ( | ||
| not trino_schema_name | ||
| and trino_table.fullyQualifiedName | ||
| and trino_table.fullyQualifiedName.root | ||
| ): | ||
| trino_table_fqn_parts = fqn.split(trino_table.fullyQualifiedName.root) | ||
| if len(trino_table_fqn_parts) >= 4: | ||
| trino_schema_name = trino_table_fqn_parts[-2] | ||
|
|
||
| if not trino_schema_name: | ||
| return None | ||
|
|
||
| if cross_database_fqn not in cross_database_schema_mapping: | ||
| cross_database_schema_mapping[cross_database_fqn] = {} | ||
|
|
||
| cross_database_schema_fqn = cross_database_schema_mapping[ | ||
| cross_database_fqn | ||
| ].get(trino_schema_name.lower()) | ||
| if cross_database_schema_fqn: | ||
| return cross_database_schema_fqn | ||
|
Comment on lines
+154
to
+158
|
||
|
|
||
| cross_database_fqn_parts = fqn.split(cross_database_fqn) | ||
| if len(cross_database_fqn_parts) == 2: | ||
| cross_database_service_name, cross_database_name = cross_database_fqn_parts | ||
| cross_database_schemas = fqn.search_database_schema_from_es( | ||
| metadata=self.metadata, | ||
| database_name=cross_database_name, | ||
|
gitar-bot[bot] marked this conversation as resolved.
|
||
| schema_name=trino_schema_name, | ||
| service_name=cross_database_service_name, | ||
| fetch_multiple_entities=True, | ||
| fields="fullyQualifiedName,name", | ||
| ) | ||
| if cross_database_schemas: | ||
| for cross_database_schema in cross_database_schemas: | ||
| if ( | ||
| cross_database_schema.name | ||
| and cross_database_schema.fullyQualifiedName | ||
| ): | ||
| cross_database_schema_mapping[cross_database_fqn][ | ||
| cross_database_schema.name.root.lower() | ||
| ] = cross_database_schema.fullyQualifiedName.root | ||
|
|
||
| return ( | ||
| cross_database_schema_mapping[cross_database_fqn].get( | ||
| trino_schema_name.lower() | ||
| ) | ||
| or f"{cross_database_fqn}.{fqn.quote_name(trino_schema_name)}" | ||
| ) | ||
|
|
||
| def _get_case_insensitive_cross_database_table( | ||
| self, | ||
| cross_database_schema_fqn: str, | ||
| trino_table: Table, | ||
| cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]], | ||
| ) -> Optional[Table]: | ||
| if cross_database_schema_fqn not in cross_database_table_schema_mapping: | ||
| cross_database_table_schema_mapping[cross_database_schema_fqn] = {} | ||
|
|
||
| table_key = trino_table.name.root.lower() | ||
| if ( | ||
| table_key | ||
| not in cross_database_table_schema_mapping[cross_database_schema_fqn] | ||
| ): | ||
| cross_database_table_schema_mapping[cross_database_schema_fqn][ | ||
| table_key | ||
| ] = [] | ||
| cross_database_schema_fqn_parts = fqn.split(cross_database_schema_fqn) | ||
| if len(cross_database_schema_fqn_parts) == 3: | ||
| ( | ||
| cross_database_service_name, | ||
| cross_database_name, | ||
| cross_database_schema_name, | ||
| ) = cross_database_schema_fqn_parts | ||
| cross_database_tables = fqn.search_table_from_es( | ||
| metadata=self.metadata, | ||
| database_name=cross_database_name, | ||
| schema_name=cross_database_schema_name, | ||
| service_name=cross_database_service_name, | ||
| table_name=trino_table.name.root, | ||
|
hassaansaleem28 marked this conversation as resolved.
Outdated
|
||
| fetch_multiple_entities=True, | ||
| fields="fullyQualifiedName,name,columns,databaseSchema", | ||
| ) | ||
| if cross_database_tables: | ||
| cross_database_table_schema_mapping[cross_database_schema_fqn][ | ||
| table_key | ||
| ] = cross_database_tables | ||
|
|
||
| for cross_database_table in cross_database_table_schema_mapping[ | ||
| cross_database_schema_fqn | ||
| ].get(table_key, []): | ||
| if self.check_same_table(trino_table, cross_database_table): | ||
| return cross_database_table | ||
|
|
||
| return None | ||
|
hassaansaleem28 marked this conversation as resolved.
|
||
|
|
||
| def get_cross_database_lineage( | ||
| self, from_table: Table, to_table: Table | ||
|
|
@@ -131,10 +246,64 @@ def get_cross_database_lineage( | |
| from_entity=from_table, to_entity=to_table, column_lineage=column_lineage | ||
| ) | ||
|
|
||
| def _get_cross_database_lineage_for_table( | ||
| self, | ||
| trino_database_fqn: str, | ||
| trino_table: Table, | ||
| *, | ||
| all_cross_database_fqns: List[str], | ||
| cross_database_table_fqn_mapping: Dict[str, Optional[Table]], | ||
| cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]], | ||
| cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]], | ||
| ) -> Optional[Either[AddLineageRequest]]: | ||
| trino_table_fqn = trino_table.fullyQualifiedName.root | ||
| trino_database_prefix = f"{trino_database_fqn}." | ||
| if not trino_table_fqn.startswith(trino_database_prefix): | ||
| return None | ||
|
|
||
| trino_table_suffix = trino_table_fqn[len(trino_database_fqn) :] | ||
|
hassaansaleem28 marked this conversation as resolved.
|
||
| for cross_database_fqn in all_cross_database_fqns: | ||
| cross_database_table_fqn = f"{cross_database_fqn}{trino_table_suffix}" | ||
| if cross_database_table_fqn not in cross_database_table_fqn_mapping: | ||
| cross_database_table = self.metadata.get_by_name( | ||
| Table, fqn=cross_database_table_fqn | ||
| ) | ||
| if not cross_database_table: | ||
| cross_database_schema_fqn = self._get_cross_database_schema_fqn( | ||
| cross_database_fqn, | ||
| trino_table, | ||
| cross_database_schema_fqn_mapping, | ||
| ) | ||
| if cross_database_schema_fqn: | ||
| cross_database_table = ( | ||
| self._get_case_insensitive_cross_database_table( | ||
| cross_database_schema_fqn, | ||
| trino_table, | ||
| cross_database_table_schema_mapping, | ||
| ) | ||
| ) | ||
| cross_database_table_fqn_mapping[ | ||
| cross_database_table_fqn | ||
| ] = cross_database_table | ||
|
|
||
| cross_database_table = cross_database_table_fqn_mapping[ | ||
| cross_database_table_fqn | ||
| ] | ||
| if cross_database_table and self.check_same_table( | ||
| trino_table, cross_database_table | ||
| ): | ||
| return self.get_cross_database_lineage( | ||
| cross_database_table, trino_table | ||
| ) | ||
|
|
||
| return None | ||
|
|
||
| def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: | ||
| try: | ||
| all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() | ||
| cross_database_table_fqn_mapping = {} | ||
| cross_database_schema_fqn_mapping: Dict[str, Dict[str, str]] = {} | ||
| cross_database_table_schema_mapping: Dict[str, Dict[str, List[Table]]] = {} | ||
|
|
||
| # Get all databases for the specified Trino service | ||
| trino_databases = self.metadata.list_all_entities( | ||
|
|
@@ -149,29 +318,16 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: | |
| ) | ||
| # NOTE: Currently, tables in system-defined schemas will also be checked for lineage. | ||
| for trino_table in trino_tables: | ||
| trino_table_fqn = trino_table.fullyQualifiedName.root | ||
| for cross_database_fqn in all_cross_database_fqns: | ||
| # Construct the FQN for cross-database tables | ||
| cross_database_table_fqn = trino_table_fqn.replace( | ||
| trino_database_fqn, cross_database_fqn | ||
| ) | ||
| # Cache cross-database table against its FQN to avoid repeated API calls | ||
| cross_database_table = cross_database_table_fqn_mapping[ | ||
| cross_database_table_fqn | ||
| ] = cross_database_table_fqn_mapping.get( | ||
| cross_database_table_fqn, | ||
| self.metadata.get_by_name( | ||
| Table, fqn=cross_database_table_fqn | ||
| ), | ||
| ) | ||
| # Create cross database lineage request if both tables are same | ||
| if cross_database_table and self.check_same_table( | ||
| trino_table, cross_database_table | ||
| ): | ||
| yield self.get_cross_database_lineage( | ||
| cross_database_table, trino_table | ||
| ) | ||
| break | ||
| cross_database_lineage = self._get_cross_database_lineage_for_table( | ||
| trino_database_fqn=trino_database_fqn, | ||
| trino_table=trino_table, | ||
| all_cross_database_fqns=all_cross_database_fqns, | ||
| cross_database_table_fqn_mapping=cross_database_table_fqn_mapping, | ||
| cross_database_schema_fqn_mapping=cross_database_schema_fqn_mapping, | ||
| cross_database_table_schema_mapping=cross_database_table_schema_mapping, | ||
| ) | ||
| if cross_database_lineage: | ||
| yield cross_database_lineage | ||
| except Exception as exc: | ||
| yield Either( | ||
| left=StackTraceError( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.